(t *testing.T, ctx context.Context, r *Reader)
| 997 | } |
| 998 | |
| 999 | func testReaderConsumerGroupReadContentAcrossPartitions(t *testing.T, ctx context.Context, r *Reader) { |
| 1000 | const N = 12 |
| 1001 | |
| 1002 | client, shutdown := newLocalClient() |
| 1003 | defer shutdown() |
| 1004 | |
| 1005 | writer := &Writer{ |
| 1006 | Addr: TCP(r.config.Brokers...), |
| 1007 | Topic: r.config.Topic, |
| 1008 | Balancer: &RoundRobin{}, |
| 1009 | BatchSize: 1, |
| 1010 | Transport: client.Transport, |
| 1011 | } |
| 1012 | if err := writer.WriteMessages(ctx, makeTestSequence(N)...); err != nil { |
| 1013 | t.Fatalf("bad write messages: %v", err) |
| 1014 | } |
| 1015 | if err := writer.Close(); err != nil { |
| 1016 | t.Fatalf("bad write err: %v", err) |
| 1017 | } |
| 1018 | |
| 1019 | partitions := map[int]struct{}{} |
| 1020 | for i := 0; i < N; i++ { |
| 1021 | m, err := r.FetchMessage(ctx) |
| 1022 | if err != nil { |
| 1023 | t.Errorf("bad error: %s", err) |
| 1024 | } |
| 1025 | partitions[m.Partition] = struct{}{} |
| 1026 | } |
| 1027 | |
| 1028 | if v := len(partitions); v != 3 { |
| 1029 | t.Errorf("expected messages across 3 partitions; got messages across %v partitions", v) |
| 1030 | } |
| 1031 | } |
| 1032 | |
| 1033 | func testReaderConsumerGroupRebalance(t *testing.T, ctx context.Context, r *Reader) { |
| 1034 | r2 := NewReader(r.config) |
nothing calls this directly
no test coverage detected