Test that a reader won't continually rebalance when there are more consumers than partitions in a group. https://github.com/segmentio/kafka-go/issues/200
(t *testing.T)
| 1374 | // than partitions in a group. |
| 1375 | // https://github.com/segmentio/kafka-go/issues/200 |
| 1376 | func TestRebalanceTooManyConsumers(t *testing.T) { |
| 1377 | ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) |
| 1378 | defer cancel() |
| 1379 | |
| 1380 | conf := ReaderConfig{ |
| 1381 | Brokers: []string{"localhost:9092"}, |
| 1382 | GroupID: makeGroupID(), |
| 1383 | Topic: makeTopic(), |
| 1384 | MaxWait: time.Second, |
| 1385 | } |
| 1386 | |
| 1387 | // Create the first reader and wait for it to become the leader. |
| 1388 | r1 := NewReader(conf) |
| 1389 | |
| 1390 | // Give the reader some time to setup before reading a message |
| 1391 | time.Sleep(1 * time.Second) |
| 1392 | prepareReader(t, ctx, r1, makeTestSequence(1)...) |
| 1393 | |
| 1394 | _, err := r1.ReadMessage(ctx) |
| 1395 | if err != nil { |
| 1396 | t.Fatalf("failed to read message: %v", err) |
| 1397 | } |
| 1398 | // Clear the stats from the first rebalance. |
| 1399 | r1.Stats() |
| 1400 | |
| 1401 | // Second reader should cause one rebalance for each r1 and r2. |
| 1402 | r2 := NewReader(conf) |
| 1403 | |
| 1404 | // Wait for rebalances. |
| 1405 | time.Sleep(5 * time.Second) |
| 1406 | |
| 1407 | // Before the fix, r2 would cause continuous rebalances, |
| 1408 | // as it tried to handshake() repeatedly. |
| 1409 | rebalances := r1.Stats().Rebalances + r2.Stats().Rebalances |
| 1410 | if rebalances > 2 { |
| 1411 | t.Errorf("unexpected rebalances to first reader, got %d", rebalances) |
| 1412 | } |
| 1413 | } |
| 1414 | |
| 1415 | func TestConsumerGroupWithMissingTopic(t *testing.T) { |
| 1416 | t.Skip("this test doesn't work when the cluster is configured to auto-create topics") |
nothing calls this directly
no test coverage detected
searching dependent graphs…