(t *testing.T)
| 365 | } |
| 366 | |
| 367 | func TestNewOffsetManagerOffsetsAutoCommit(t *testing.T) { |
| 368 | // Tests to validate configuration of `Consumer.Offsets.AutoCommit.Enable` |
| 369 | for _, tt := range offsetsautocommitTestTable { |
| 370 | t.Run(tt.name, func(t *testing.T) { |
| 371 | config := NewTestConfig() |
| 372 | if tt.set { |
| 373 | config.Consumer.Offsets.AutoCommit.Enable = tt.enable |
| 374 | } |
| 375 | om, testClient, broker, coordinator := initOffsetManagerWithBackoffFunc(t, 0, nil, config) |
| 376 | defer broker.Close() |
| 377 | defer coordinator.Close() |
| 378 | pom := initPartitionOffsetManager(t, om, coordinator, 5, "original_meta") |
| 379 | |
| 380 | // Wait long enough for the test not to fail.. |
| 381 | timeout := 50 * config.Consumer.Offsets.AutoCommit.Interval |
| 382 | |
| 383 | called := make(chan none) |
| 384 | |
| 385 | ocResponse := new(OffsetCommitResponse) |
| 386 | ocResponse.AddError("my_topic", 0, ErrNoError) |
| 387 | handler := func(req *request) (res encoderWithHeader) { |
| 388 | close(called) |
| 389 | return ocResponse |
| 390 | } |
| 391 | coordinator.setHandler(handler) |
| 392 | |
| 393 | // Should force an offset commit, if auto-commit is enabled. |
| 394 | expected := int64(1) |
| 395 | pom.ResetOffset(expected, "modified_meta") |
| 396 | _, _ = pom.NextOffset() |
| 397 | |
| 398 | select { |
| 399 | case <-called: |
| 400 | // OffsetManager called on the wire. |
| 401 | if !config.Consumer.Offsets.AutoCommit.Enable { |
| 402 | t.Errorf("Received request for: %s when AutoCommit is disabled", tt.name) |
| 403 | } |
| 404 | case <-time.After(timeout): |
| 405 | // Timeout waiting for OffsetManager to call on the wire. |
| 406 | if config.Consumer.Offsets.AutoCommit.Enable { |
| 407 | t.Errorf("No request received for: %s after waiting for %v", tt.name, timeout) |
| 408 | } |
| 409 | } |
| 410 | |
| 411 | // !! om must be closed before the pom so pom.release() is called before pom.Close() |
| 412 | safeClose(t, om) |
| 413 | safeClose(t, pom) |
| 414 | safeClose(t, testClient) |
| 415 | }) |
| 416 | } |
| 417 | } |
| 418 | |
| 419 | func TestNewOffsetManagerOffsetsManualCommit(t *testing.T) { |
| 420 | // Tests to validate configuration when `Consumer.Offsets.AutoCommit.Enable` is false |
nothing calls this directly
no test coverage detected