(t *testing.T)
| 1463 | } |
| 1464 | |
| 1465 | func TestConsumerGroupWithTopic(t *testing.T) { |
| 1466 | ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) |
| 1467 | defer cancel() |
| 1468 | |
| 1469 | conf := ReaderConfig{ |
| 1470 | Brokers: []string{"localhost:9092"}, |
| 1471 | GroupID: makeGroupID(), |
| 1472 | Topic: makeTopic(), |
| 1473 | MaxWait: time.Second, |
| 1474 | PartitionWatchInterval: 100 * time.Millisecond, |
| 1475 | WatchPartitionChanges: true, |
| 1476 | Logger: newTestKafkaLogger(t, "Reader:"), |
| 1477 | } |
| 1478 | |
| 1479 | r := NewReader(conf) |
| 1480 | defer r.Close() |
| 1481 | |
| 1482 | recvErr := make(chan error, len(conf.GroupTopics)) |
| 1483 | go func() { |
| 1484 | msg, err := r.ReadMessage(ctx) |
| 1485 | t.Log(msg) |
| 1486 | recvErr <- err |
| 1487 | }() |
| 1488 | |
| 1489 | time.Sleep(conf.MaxWait) |
| 1490 | |
| 1491 | client, shutdown := newLocalClientWithTopic(conf.Topic, 1) |
| 1492 | defer shutdown() |
| 1493 | |
| 1494 | w := &Writer{ |
| 1495 | Addr: TCP(r.config.Brokers...), |
| 1496 | Topic: conf.Topic, |
| 1497 | BatchTimeout: 10 * time.Millisecond, |
| 1498 | BatchSize: 1, |
| 1499 | Transport: client.Transport, |
| 1500 | Logger: newTestKafkaLogger(t, "Writer:"), |
| 1501 | } |
| 1502 | defer w.Close() |
| 1503 | if err := w.WriteMessages(ctx, Message{Value: []byte(conf.Topic)}); err != nil { |
| 1504 | t.Fatalf("write error: %+v", err) |
| 1505 | } |
| 1506 | |
| 1507 | if err := <-recvErr; err != nil { |
| 1508 | t.Fatalf("read error: %+v", err) |
| 1509 | } |
| 1510 | |
| 1511 | nMsgs := r.Stats().Messages |
| 1512 | if nMsgs != 1 { |
| 1513 | t.Fatalf("expected to receive 1 message, but got %d", nMsgs) |
| 1514 | } |
| 1515 | } |
| 1516 | |
| 1517 | func TestConsumerGroupWithGroupTopicsSingle(t *testing.T) { |
| 1518 | ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) |
nothing calls this directly
no test coverage detected
searching dependent graphs…