MCPcopy
hub / github.com/segmentio/kafka-go / TestConsumerGroupWithGroupTopicsSingle

Function TestConsumerGroupWithGroupTopicsSingle

reader_test.go:1517–1569  ·  view source on GitHub ↗
(t *testing.T)

Source from the content-addressed store, hash-verified

1515}
1516
1517func TestConsumerGroupWithGroupTopicsSingle(t *testing.T) {
1518 ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
1519 defer cancel()
1520
1521 conf := ReaderConfig{
1522 Brokers: []string{"localhost:9092"},
1523 GroupID: makeGroupID(),
1524 GroupTopics: []string{makeTopic()},
1525 MaxWait: time.Second,
1526 PartitionWatchInterval: 100 * time.Millisecond,
1527 WatchPartitionChanges: true,
1528 Logger: newTestKafkaLogger(t, "Reader:"),
1529 }
1530
1531 r := NewReader(conf)
1532 defer r.Close()
1533
1534 recvErr := make(chan error, len(conf.GroupTopics))
1535 go func() {
1536 msg, err := r.ReadMessage(ctx)
1537 t.Log(msg)
1538 recvErr <- err
1539 }()
1540
1541 time.Sleep(conf.MaxWait)
1542
1543 for i, topic := range conf.GroupTopics {
1544 client, shutdown := newLocalClientWithTopic(topic, 1)
1545 defer shutdown()
1546
1547 w := &Writer{
1548 Addr: TCP(r.config.Brokers...),
1549 Topic: topic,
1550 BatchTimeout: 10 * time.Millisecond,
1551 BatchSize: 1,
1552 Transport: client.Transport,
1553 Logger: newTestKafkaLogger(t, fmt.Sprintf("Writer(%d):", i)),
1554 }
1555 defer w.Close()
1556 if err := w.WriteMessages(ctx, Message{Value: []byte(topic)}); err != nil {
1557 t.Fatalf("write error: %+v", err)
1558 }
1559 }
1560
1561 if err := <-recvErr; err != nil {
1562 t.Fatalf("read error: %+v", err)
1563 }
1564
1565 nMsgs := r.Stats().Messages
1566 if nMsgs != int64(len(conf.GroupTopics)) {
1567 t.Fatalf("expected to receive %d messages, but got %d", len(conf.GroupTopics), nMsgs)
1568 }
1569}
1570
1571func TestConsumerGroupWithGroupTopicsMultiple(t *testing.T) {
1572 ctx, cancel := context.WithTimeout(context.Background(), 20*time.Second)

Callers

nothing calls this directly

Calls 11

CloseMethod · 0.95
ReadMessageMethod · 0.95
CloseMethod · 0.95
WriteMessagesMethod · 0.95
StatsMethod · 0.95
makeGroupIDFunction · 0.85
newTestKafkaLoggerFunction · 0.85
NewReaderFunction · 0.85
TCPFunction · 0.85
makeTopicFunction · 0.70
newLocalClientWithTopicFunction · 0.70

Tested by

no test coverage detected

Used in the wild real call sites across dependent graphs

searching dependent graphs…