MCPcopy
hub / github.com/segmentio/kafka-go / testReaderConsumerGroupRebalanceAcrossTopics

Function testReaderConsumerGroupRebalanceAcrossTopics

reader_test.go:1071–1120  ·  view source on GitHub ↗
(t *testing.T, ctx context.Context, r *Reader)

Source from the content-addressed store, hash-verified

1069}
1070
1071func testReaderConsumerGroupRebalanceAcrossTopics(t *testing.T, ctx context.Context, r *Reader) {
1072 // create a second reader that shares the groupID, but reads from a different topic
1073 client, topic2, shutdown := newLocalClientAndTopic()
1074 defer shutdown()
1075
1076 r2 := NewReader(ReaderConfig{
1077 Brokers: r.config.Brokers,
1078 Topic: topic2,
1079 GroupID: r.config.GroupID,
1080 HeartbeatInterval: r.config.HeartbeatInterval,
1081 SessionTimeout: r.config.SessionTimeout,
1082 RetentionTime: r.config.RetentionTime,
1083 MinBytes: r.config.MinBytes,
1084 MaxBytes: r.config.MaxBytes,
1085 Logger: r.config.Logger,
1086 })
1087 defer r.Close()
1088 prepareReader(t, ctx, r2, makeTestSequence(1)...)
1089
1090 const (
1091 N = 12
1092 )
1093
1094 // write messages across both partitions
1095 writer := &Writer{
1096 Addr: TCP(r.config.Brokers...),
1097 Topic: r.config.Topic,
1098 Balancer: &RoundRobin{},
1099 BatchSize: 1,
1100 Transport: client.Transport,
1101 }
1102 if err := writer.WriteMessages(ctx, makeTestSequence(N)...); err != nil {
1103 t.Fatalf("bad write messages: %v", err)
1104 }
1105 if err := writer.Close(); err != nil {
1106 t.Fatalf("bad write err: %v", err)
1107 }
1108
1109 // after rebalance, r2 should read topic2 and r1 should read ALL of the original topic
1110 if _, err := r2.FetchMessage(ctx); err != nil {
1111 t.Errorf("expect to read from reader 2")
1112 }
1113
1114 // all N messages on the original topic should be read by the original reader
1115 for i := 0; i < N; i++ {
1116 if _, err := r.FetchMessage(ctx); err != nil {
1117 t.Errorf("expect to read from reader 1")
1118 }
1119 }
1120}
1121
1122func testReaderConsumerGroupRebalanceAcrossManyPartitionsAndConsumers(t *testing.T, ctx context.Context, r *Reader) {
1123 // I've rebalanced up to 100 servers, but the rebalance can take upwards

Callers

nothing calls this directly

Calls 9

WriteMessagesMethod · 0.95
CloseMethod · 0.95
FetchMessageMethod · 0.95
NewReaderFunction · 0.85
prepareReaderFunction · 0.85
makeTestSequenceFunction · 0.85
TCPFunction · 0.85
newLocalClientAndTopicFunction · 0.70
CloseMethod · 0.45

Tested by

no test coverage detected