(t *testing.T, ctx context.Context, r *Reader)
| 1069 | } |
| 1070 | |
| 1071 | func testReaderConsumerGroupRebalanceAcrossTopics(t *testing.T, ctx context.Context, r *Reader) { |
| 1072 | // create a second reader that shares the groupID, but reads from a different topic |
| 1073 | client, topic2, shutdown := newLocalClientAndTopic() |
| 1074 | defer shutdown() |
| 1075 | |
| 1076 | r2 := NewReader(ReaderConfig{ |
| 1077 | Brokers: r.config.Brokers, |
| 1078 | Topic: topic2, |
| 1079 | GroupID: r.config.GroupID, |
| 1080 | HeartbeatInterval: r.config.HeartbeatInterval, |
| 1081 | SessionTimeout: r.config.SessionTimeout, |
| 1082 | RetentionTime: r.config.RetentionTime, |
| 1083 | MinBytes: r.config.MinBytes, |
| 1084 | MaxBytes: r.config.MaxBytes, |
| 1085 | Logger: r.config.Logger, |
| 1086 | }) |
| 1087 | defer r.Close() |
| 1088 | prepareReader(t, ctx, r2, makeTestSequence(1)...) |
| 1089 | |
| 1090 | const ( |
| 1091 | N = 12 |
| 1092 | ) |
| 1093 | |
| 1094 | // write messages across both partitions |
| 1095 | writer := &Writer{ |
| 1096 | Addr: TCP(r.config.Brokers...), |
| 1097 | Topic: r.config.Topic, |
| 1098 | Balancer: &RoundRobin{}, |
| 1099 | BatchSize: 1, |
| 1100 | Transport: client.Transport, |
| 1101 | } |
| 1102 | if err := writer.WriteMessages(ctx, makeTestSequence(N)...); err != nil { |
| 1103 | t.Fatalf("bad write messages: %v", err) |
| 1104 | } |
| 1105 | if err := writer.Close(); err != nil { |
| 1106 | t.Fatalf("bad write err: %v", err) |
| 1107 | } |
| 1108 | |
| 1109 | // after rebalance, r2 should read topic2 and r1 should read ALL of the original topic |
| 1110 | if _, err := r2.FetchMessage(ctx); err != nil { |
| 1111 | t.Errorf("expect to read from reader 2") |
| 1112 | } |
| 1113 | |
| 1114 | // all N messages on the original topic should be read by the original reader |
| 1115 | for i := 0; i < N; i++ { |
| 1116 | if _, err := r.FetchMessage(ctx); err != nil { |
| 1117 | t.Errorf("expect to read from reader 1") |
| 1118 | } |
| 1119 | } |
| 1120 | } |
| 1121 | |
| 1122 | func testReaderConsumerGroupRebalanceAcrossManyPartitionsAndConsumers(t *testing.T, ctx context.Context, r *Reader) { |
| 1123 | // I've rebalanced up to 100 servers, but the rebalance can take upwards |
nothing calls this directly
no test coverage detected