MCPcopy
hub / github.com/segmentio/kafka-go / testReaderConsumerGroupRebalance

Function testReaderConsumerGroupRebalance

reader_test.go:1033–1069  ·  view source on GitHub ↗
(t *testing.T, ctx context.Context, r *Reader)

Source from the content-addressed store, hash-verified

1031}
1032
1033func testReaderConsumerGroupRebalance(t *testing.T, ctx context.Context, r *Reader) {
1034 r2 := NewReader(r.config)
1035 defer r.Close()
1036
1037 const (
1038 N = 12
1039 partitions = 2
1040 )
1041
1042 client, shutdown := newLocalClient()
1043 defer shutdown()
1044
1045 // rebalance should result in 12 message in each of the partitions
1046 writer := &Writer{
1047 Addr: TCP(r.config.Brokers...),
1048 Topic: r.config.Topic,
1049 Balancer: &RoundRobin{},
1050 BatchSize: 1,
1051 Transport: client.Transport,
1052 }
1053 if err := writer.WriteMessages(ctx, makeTestSequence(N*partitions)...); err != nil {
1054 t.Fatalf("bad write messages: %v", err)
1055 }
1056 if err := writer.Close(); err != nil {
1057 t.Fatalf("bad write err: %v", err)
1058 }
1059
1060 // after rebalance, each reader should have a partition to itself
1061 for i := 0; i < N; i++ {
1062 if _, err := r2.FetchMessage(ctx); err != nil {
1063 t.Errorf("expect to read from reader 2")
1064 }
1065 if _, err := r.FetchMessage(ctx); err != nil {
1066 t.Errorf("expect to read from reader 1")
1067 }
1068 }
1069}
1070
1071func testReaderConsumerGroupRebalanceAcrossTopics(t *testing.T, ctx context.Context, r *Reader) {
1072 // create a second reader that shares the groupID, but reads from a different topic

Callers

nothing calls this directly

Calls 8

WriteMessagesMethod · 0.95
CloseMethod · 0.95
FetchMessageMethod · 0.95
NewReaderFunction · 0.85
TCPFunction · 0.85
makeTestSequenceFunction · 0.85
newLocalClientFunction · 0.70
CloseMethod · 0.45

Tested by

no test coverage detected