MCPcopy
hub / github.com/segmentio/kafka-go / testReaderConsumerGroupReadContentAcrossPartitions

Function testReaderConsumerGroupReadContentAcrossPartitions

reader_test.go:999–1031  ·  view source on GitHub ↗
(t *testing.T, ctx context.Context, r *Reader)

Source from the content-addressed store, hash-verified

997}
998
999func testReaderConsumerGroupReadContentAcrossPartitions(t *testing.T, ctx context.Context, r *Reader) {
1000 const N = 12
1001
1002 client, shutdown := newLocalClient()
1003 defer shutdown()
1004
1005 writer := &Writer{
1006 Addr: TCP(r.config.Brokers...),
1007 Topic: r.config.Topic,
1008 Balancer: &RoundRobin{},
1009 BatchSize: 1,
1010 Transport: client.Transport,
1011 }
1012 if err := writer.WriteMessages(ctx, makeTestSequence(N)...); err != nil {
1013 t.Fatalf("bad write messages: %v", err)
1014 }
1015 if err := writer.Close(); err != nil {
1016 t.Fatalf("bad write err: %v", err)
1017 }
1018
1019 partitions := map[int]struct{}{}
1020 for i := 0; i < N; i++ {
1021 m, err := r.FetchMessage(ctx)
1022 if err != nil {
1023 t.Errorf("bad error: %s", err)
1024 }
1025 partitions[m.Partition] = struct{}{}
1026 }
1027
1028 if v := len(partitions); v != 3 {
1029 t.Errorf("expected messages across 3 partitions; got messages across %v partitions", v)
1030 }
1031}
1032
1033func testReaderConsumerGroupRebalance(t *testing.T, ctx context.Context, r *Reader) {
1034 r2 := NewReader(r.config)

Callers

nothing calls this directly

Calls 6

WriteMessagesMethod · 0.95
CloseMethod · 0.95
TCPFunction · 0.85
makeTestSequenceFunction · 0.85
FetchMessageMethod · 0.80
newLocalClientFunction · 0.70

Tested by

no test coverage detected