(t *testing.T, ctx context.Context, r *Reader)
| 1031 | } |
| 1032 | |
| 1033 | func testReaderConsumerGroupRebalance(t *testing.T, ctx context.Context, r *Reader) { |
| 1034 | r2 := NewReader(r.config) |
| 1035 | defer r.Close() |
| 1036 | |
| 1037 | const ( |
| 1038 | N = 12 |
| 1039 | partitions = 2 |
| 1040 | ) |
| 1041 | |
| 1042 | client, shutdown := newLocalClient() |
| 1043 | defer shutdown() |
| 1044 | |
| 1045 | // rebalance should result in 12 message in each of the partitions |
| 1046 | writer := &Writer{ |
| 1047 | Addr: TCP(r.config.Brokers...), |
| 1048 | Topic: r.config.Topic, |
| 1049 | Balancer: &RoundRobin{}, |
| 1050 | BatchSize: 1, |
| 1051 | Transport: client.Transport, |
| 1052 | } |
| 1053 | if err := writer.WriteMessages(ctx, makeTestSequence(N*partitions)...); err != nil { |
| 1054 | t.Fatalf("bad write messages: %v", err) |
| 1055 | } |
| 1056 | if err := writer.Close(); err != nil { |
| 1057 | t.Fatalf("bad write err: %v", err) |
| 1058 | } |
| 1059 | |
| 1060 | // after rebalance, each reader should have a partition to itself |
| 1061 | for i := 0; i < N; i++ { |
| 1062 | if _, err := r2.FetchMessage(ctx); err != nil { |
| 1063 | t.Errorf("expect to read from reader 2") |
| 1064 | } |
| 1065 | if _, err := r.FetchMessage(ctx); err != nil { |
| 1066 | t.Errorf("expect to read from reader 1") |
| 1067 | } |
| 1068 | } |
| 1069 | } |
| 1070 | |
| 1071 | func testReaderConsumerGroupRebalanceAcrossTopics(t *testing.T, ctx context.Context, r *Reader) { |
| 1072 | // create a second reader that shares the groupID, but reads from a different topic |
nothing calls this directly
no test coverage detected