(t *testing.T)
| 417 | } |
| 418 | |
| 419 | func TestNewOffsetManagerOffsetsManualCommit(t *testing.T) { |
| 420 | // Tests to validate configuration when `Consumer.Offsets.AutoCommit.Enable` is false |
| 421 | config := NewTestConfig() |
| 422 | config.Consumer.Offsets.AutoCommit.Enable = false |
| 423 | |
| 424 | om, testClient, broker, coordinator := initOffsetManagerWithBackoffFunc(t, 0, nil, config) |
| 425 | defer broker.Close() |
| 426 | defer coordinator.Close() |
| 427 | pom := initPartitionOffsetManager(t, om, coordinator, 5, "original_meta") |
| 428 | |
| 429 | // Wait long enough for the test not to fail.. |
| 430 | timeout := 50 * config.Consumer.Offsets.AutoCommit.Interval |
| 431 | |
| 432 | ocResponse := new(OffsetCommitResponse) |
| 433 | ocResponse.AddError("my_topic", 0, ErrNoError) |
| 434 | called := make(chan none) |
| 435 | handler := func(req *request) (res encoderWithHeader) { |
| 436 | close(called) |
| 437 | return ocResponse |
| 438 | } |
| 439 | coordinator.setHandler(handler) |
| 440 | |
| 441 | // Should not trigger an auto-commit |
| 442 | expected := int64(1) |
| 443 | pom.ResetOffset(expected, "modified_meta") |
| 444 | _, _ = pom.NextOffset() |
| 445 | |
| 446 | select { |
| 447 | case <-called: |
| 448 | // OffsetManager called on the wire. |
| 449 | t.Errorf("Received request when AutoCommit is disabled") |
| 450 | case <-time.After(timeout): |
| 451 | // Timeout waiting for OffsetManager to call on the wire. |
| 452 | // OK |
| 453 | } |
| 454 | |
| 455 | // Setup again to test manual commit |
| 456 | called = make(chan none) |
| 457 | |
| 458 | om.Commit() |
| 459 | |
| 460 | select { |
| 461 | case <-called: |
| 462 | // OffsetManager called on the wire. |
| 463 | // OK |
| 464 | case <-time.After(timeout): |
| 465 | // Timeout waiting for OffsetManager to call on the wire. |
| 466 | t.Errorf("No request received for after waiting for %v", timeout) |
| 467 | } |
| 468 | |
| 469 | // !! om must be closed before the pom so pom.release() is called before pom.Close() |
| 470 | safeClose(t, om) |
| 471 | safeClose(t, pom) |
| 472 | safeClose(t, testClient) |
| 473 | } |
| 474 | |
| 475 | // Test recovery from ErrNotCoordinatorForConsumer |
| 476 | // on first fetchInitialOffset call |
nothing calls this directly
no test coverage detected