MCPcopy
hub / github.com/segmentio/kafka-go / TestConsumerGroupWithTopic

Function TestConsumerGroupWithTopic

reader_test.go:1465–1515  ·  view source on GitHub ↗
(t *testing.T)

Source from the content-addressed store, hash-verified

1463}
1464
1465func TestConsumerGroupWithTopic(t *testing.T) {
1466 ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
1467 defer cancel()
1468
1469 conf := ReaderConfig{
1470 Brokers: []string{"localhost:9092"},
1471 GroupID: makeGroupID(),
1472 Topic: makeTopic(),
1473 MaxWait: time.Second,
1474 PartitionWatchInterval: 100 * time.Millisecond,
1475 WatchPartitionChanges: true,
1476 Logger: newTestKafkaLogger(t, "Reader:"),
1477 }
1478
1479 r := NewReader(conf)
1480 defer r.Close()
1481
1482 recvErr := make(chan error, len(conf.GroupTopics))
1483 go func() {
1484 msg, err := r.ReadMessage(ctx)
1485 t.Log(msg)
1486 recvErr <- err
1487 }()
1488
1489 time.Sleep(conf.MaxWait)
1490
1491 client, shutdown := newLocalClientWithTopic(conf.Topic, 1)
1492 defer shutdown()
1493
1494 w := &Writer{
1495 Addr: TCP(r.config.Brokers...),
1496 Topic: conf.Topic,
1497 BatchTimeout: 10 * time.Millisecond,
1498 BatchSize: 1,
1499 Transport: client.Transport,
1500 Logger: newTestKafkaLogger(t, "Writer:"),
1501 }
1502 defer w.Close()
1503 if err := w.WriteMessages(ctx, Message{Value: []byte(conf.Topic)}); err != nil {
1504 t.Fatalf("write error: %+v", err)
1505 }
1506
1507 if err := <-recvErr; err != nil {
1508 t.Fatalf("read error: %+v", err)
1509 }
1510
1511 nMsgs := r.Stats().Messages
1512 if nMsgs != 1 {
1513 t.Fatalf("expected to receive 1 message, but got %d", nMsgs)
1514 }
1515}
1516
1517func TestConsumerGroupWithGroupTopicsSingle(t *testing.T) {
1518 ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)

Callers

nothing calls this directly

Calls 11

CloseMethod · 0.95
ReadMessageMethod · 0.95
CloseMethod · 0.95
WriteMessagesMethod · 0.95
StatsMethod · 0.95
makeGroupIDFunction · 0.85
newTestKafkaLoggerFunction · 0.85
NewReaderFunction · 0.85
TCPFunction · 0.85
makeTopicFunction · 0.70
newLocalClientWithTopicFunction · 0.70

Tested by

no test coverage detected

Used in the wild real call sites across dependent graphs

searching dependent graphs…