(t *testing.T)
| 1569 | } |
| 1570 | |
| 1571 | func TestConsumerGroupWithGroupTopicsMultiple(t *testing.T) { |
| 1572 | ctx, cancel := context.WithTimeout(context.Background(), 20*time.Second) |
| 1573 | defer cancel() |
| 1574 | |
| 1575 | client, shutdown := newLocalClient() |
| 1576 | defer shutdown() |
| 1577 | t1 := makeTopic() |
| 1578 | createTopic(t, t1, 1) |
| 1579 | defer deleteTopic(t, t1) |
| 1580 | t2 := makeTopic() |
| 1581 | createTopic(t, t2, 1) |
| 1582 | defer deleteTopic(t, t2) |
| 1583 | conf := ReaderConfig{ |
| 1584 | Brokers: []string{"localhost:9092"}, |
| 1585 | GroupID: makeGroupID(), |
| 1586 | GroupTopics: []string{t1, t2}, |
| 1587 | MaxWait: time.Second, |
| 1588 | PartitionWatchInterval: 100 * time.Millisecond, |
| 1589 | WatchPartitionChanges: true, |
| 1590 | Logger: newTestKafkaLogger(t, "Reader:"), |
| 1591 | } |
| 1592 | |
| 1593 | r := NewReader(conf) |
| 1594 | |
| 1595 | w := &Writer{ |
| 1596 | Addr: TCP(r.config.Brokers...), |
| 1597 | BatchTimeout: 10 * time.Millisecond, |
| 1598 | BatchSize: 1, |
| 1599 | Transport: client.Transport, |
| 1600 | Logger: newTestKafkaLogger(t, "Writer:"), |
| 1601 | } |
| 1602 | defer w.Close() |
| 1603 | |
| 1604 | time.Sleep(time.Second) |
| 1605 | |
| 1606 | msgs := make([]Message, 0, len(conf.GroupTopics)) |
| 1607 | for _, topic := range conf.GroupTopics { |
| 1608 | msgs = append(msgs, Message{Topic: topic}) |
| 1609 | } |
| 1610 | if err := w.WriteMessages(ctx, msgs...); err != nil { |
| 1611 | t.Logf("write error: %+v", err) |
| 1612 | } |
| 1613 | |
| 1614 | wg := new(sync.WaitGroup) |
| 1615 | wg.Add(len(msgs)) |
| 1616 | |
| 1617 | go func() { |
| 1618 | wg.Wait() |
| 1619 | t.Log("closing reader") |
| 1620 | r.Close() |
| 1621 | }() |
| 1622 | |
| 1623 | for { |
| 1624 | msg, err := r.ReadMessage(ctx) |
| 1625 | if err != nil { |
| 1626 | if errors.Is(err, io.EOF) { |
| 1627 | t.Log("reader closed") |
| 1628 | break |
nothing calls this directly
no test coverage detected