(t *testing.T, ctx context.Context, client *Client)
| 128 | } |
| 129 | |
| 130 | func testConsumerGroupFetchOffsets(t *testing.T, ctx context.Context, client *Client) { |
| 131 | const totalMessages = 144 |
| 132 | const partitions = 12 |
| 133 | const msgPerPartition = totalMessages / partitions |
| 134 | |
| 135 | topic := makeTopic() |
| 136 | if err := clientCreateTopic(client, topic, partitions); err != nil { |
| 137 | t.Fatal(err) |
| 138 | } |
| 139 | |
| 140 | groupId := makeGroupID() |
| 141 | brokers := []string{"localhost:9092"} |
| 142 | |
| 143 | writer := &Writer{ |
| 144 | Addr: TCP(brokers...), |
| 145 | Topic: topic, |
| 146 | Balancer: &RoundRobin{}, |
| 147 | BatchSize: 1, |
| 148 | Transport: client.Transport, |
| 149 | } |
| 150 | if err := writer.WriteMessages(ctx, makeTestSequence(totalMessages)...); err != nil { |
| 151 | t.Fatalf("bad write messages: %v", err) |
| 152 | } |
| 153 | if err := writer.Close(); err != nil { |
| 154 | t.Fatalf("bad write err: %v", err) |
| 155 | } |
| 156 | |
| 157 | r := NewReader(ReaderConfig{ |
| 158 | Brokers: brokers, |
| 159 | Topic: topic, |
| 160 | GroupID: groupId, |
| 161 | MinBytes: 1, |
| 162 | MaxBytes: 10e6, |
| 163 | MaxWait: 100 * time.Millisecond, |
| 164 | }) |
| 165 | defer r.Close() |
| 166 | |
| 167 | for i := 0; i < totalMessages; i++ { |
| 168 | m, err := r.FetchMessage(ctx) |
| 169 | if err != nil { |
| 170 | t.Fatalf("error fetching message: %s", err) |
| 171 | } |
| 172 | if err := r.CommitMessages(context.Background(), m); err != nil { |
| 173 | t.Fatal(err) |
| 174 | } |
| 175 | } |
| 176 | |
| 177 | offsets, err := client.ConsumerOffsets(ctx, TopicAndGroup{GroupId: groupId, Topic: topic}) |
| 178 | if err != nil { |
| 179 | t.Fatal(err) |
| 180 | } |
| 181 | |
| 182 | if len(offsets) != partitions { |
| 183 | t.Fatalf("expected %d partitions but only received offsets for %d", partitions, len(offsets)) |
| 184 | } |
| 185 | |
| 186 | for i := 0; i < partitions; i++ { |
| 187 | committedOffset := offsets[i] |
nothing calls this directly
no test coverage detected