(t *testing.T)
| 1515 | } |
| 1516 | |
| 1517 | func TestConsumerGroupWithGroupTopicsSingle(t *testing.T) { |
| 1518 | ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) |
| 1519 | defer cancel() |
| 1520 | |
| 1521 | conf := ReaderConfig{ |
| 1522 | Brokers: []string{"localhost:9092"}, |
| 1523 | GroupID: makeGroupID(), |
| 1524 | GroupTopics: []string{makeTopic()}, |
| 1525 | MaxWait: time.Second, |
| 1526 | PartitionWatchInterval: 100 * time.Millisecond, |
| 1527 | WatchPartitionChanges: true, |
| 1528 | Logger: newTestKafkaLogger(t, "Reader:"), |
| 1529 | } |
| 1530 | |
| 1531 | r := NewReader(conf) |
| 1532 | defer r.Close() |
| 1533 | |
| 1534 | recvErr := make(chan error, len(conf.GroupTopics)) |
| 1535 | go func() { |
| 1536 | msg, err := r.ReadMessage(ctx) |
| 1537 | t.Log(msg) |
| 1538 | recvErr <- err |
| 1539 | }() |
| 1540 | |
| 1541 | time.Sleep(conf.MaxWait) |
| 1542 | |
| 1543 | for i, topic := range conf.GroupTopics { |
| 1544 | client, shutdown := newLocalClientWithTopic(topic, 1) |
| 1545 | defer shutdown() |
| 1546 | |
| 1547 | w := &Writer{ |
| 1548 | Addr: TCP(r.config.Brokers...), |
| 1549 | Topic: topic, |
| 1550 | BatchTimeout: 10 * time.Millisecond, |
| 1551 | BatchSize: 1, |
| 1552 | Transport: client.Transport, |
| 1553 | Logger: newTestKafkaLogger(t, fmt.Sprintf("Writer(%d):", i)), |
| 1554 | } |
| 1555 | defer w.Close() |
| 1556 | if err := w.WriteMessages(ctx, Message{Value: []byte(topic)}); err != nil { |
| 1557 | t.Fatalf("write error: %+v", err) |
| 1558 | } |
| 1559 | } |
| 1560 | |
| 1561 | if err := <-recvErr; err != nil { |
| 1562 | t.Fatalf("read error: %+v", err) |
| 1563 | } |
| 1564 | |
| 1565 | nMsgs := r.Stats().Messages |
| 1566 | if nMsgs != int64(len(conf.GroupTopics)) { |
| 1567 | t.Fatalf("expected to receive %d messages, but got %d", len(conf.GroupTopics), nMsgs) |
| 1568 | } |
| 1569 | } |
| 1570 | |
| 1571 | func TestConsumerGroupWithGroupTopicsMultiple(t *testing.T) { |
| 1572 | ctx, cancel := context.WithTimeout(context.Background(), 20*time.Second) |
nothing calls this directly
no test coverage detected
searching dependent graphs…