MCPcopy
hub / github.com/segmentio/kafka-go / TestConsumerGroupWithGroupTopicsMultiple

Function TestConsumerGroupWithGroupTopicsMultiple

reader_test.go:1571–1642  ·  view source on GitHub ↗
(t *testing.T)

Source from the content-addressed store, hash-verified

1569}
1570
1571func TestConsumerGroupWithGroupTopicsMultiple(t *testing.T) {
1572 ctx, cancel := context.WithTimeout(context.Background(), 20*time.Second)
1573 defer cancel()
1574
1575 client, shutdown := newLocalClient()
1576 defer shutdown()
1577 t1 := makeTopic()
1578 createTopic(t, t1, 1)
1579 defer deleteTopic(t, t1)
1580 t2 := makeTopic()
1581 createTopic(t, t2, 1)
1582 defer deleteTopic(t, t2)
1583 conf := ReaderConfig{
1584 Brokers: []string{"localhost:9092"},
1585 GroupID: makeGroupID(),
1586 GroupTopics: []string{t1, t2},
1587 MaxWait: time.Second,
1588 PartitionWatchInterval: 100 * time.Millisecond,
1589 WatchPartitionChanges: true,
1590 Logger: newTestKafkaLogger(t, "Reader:"),
1591 }
1592
1593 r := NewReader(conf)
1594
1595 w := &Writer{
1596 Addr: TCP(r.config.Brokers...),
1597 BatchTimeout: 10 * time.Millisecond,
1598 BatchSize: 1,
1599 Transport: client.Transport,
1600 Logger: newTestKafkaLogger(t, "Writer:"),
1601 }
1602 defer w.Close()
1603
1604 time.Sleep(time.Second)
1605
1606 msgs := make([]Message, 0, len(conf.GroupTopics))
1607 for _, topic := range conf.GroupTopics {
1608 msgs = append(msgs, Message{Topic: topic})
1609 }
1610 if err := w.WriteMessages(ctx, msgs...); err != nil {
1611 t.Logf("write error: %+v", err)
1612 }
1613
1614 wg := new(sync.WaitGroup)
1615 wg.Add(len(msgs))
1616
1617 go func() {
1618 wg.Wait()
1619 t.Log("closing reader")
1620 r.Close()
1621 }()
1622
1623 for {
1624 msg, err := r.ReadMessage(ctx)
1625 if err != nil {
1626 if errors.Is(err, io.EOF) {
1627 t.Log("reader closed")
1628 break

Callers

nothing calls this directly

Calls 14

CloseMethod · 0.95
WriteMessagesMethod · 0.95
CloseMethod · 0.95
ReadMessageMethod · 0.95
StatsMethod · 0.95
createTopicFunction · 0.85
deleteTopicFunction · 0.85
makeGroupIDFunction · 0.85
newTestKafkaLoggerFunction · 0.85
NewReaderFunction · 0.85
TCPFunction · 0.85
DoneMethod · 0.80

Tested by

no test coverage detected