MCPcopy
hub / github.com/IBM/sarama / TestNewOffsetManagerOffsetsManualCommit

Function TestNewOffsetManagerOffsetsManualCommit

offset_manager_test.go:419–473  ·  view source on GitHub ↗
(t *testing.T)

Source from the content-addressed store, hash-verified

417}
418
419func TestNewOffsetManagerOffsetsManualCommit(t *testing.T) {
420 // Tests to validate configuration when `Consumer.Offsets.AutoCommit.Enable` is false
421 config := NewTestConfig()
422 config.Consumer.Offsets.AutoCommit.Enable = false
423
424 om, testClient, broker, coordinator := initOffsetManagerWithBackoffFunc(t, 0, nil, config)
425 defer broker.Close()
426 defer coordinator.Close()
427 pom := initPartitionOffsetManager(t, om, coordinator, 5, "original_meta")
428
429 // Wait long enough for the test not to fail..
430 timeout := 50 * config.Consumer.Offsets.AutoCommit.Interval
431
432 ocResponse := new(OffsetCommitResponse)
433 ocResponse.AddError("my_topic", 0, ErrNoError)
434 called := make(chan none)
435 handler := func(req *request) (res encoderWithHeader) {
436 close(called)
437 return ocResponse
438 }
439 coordinator.setHandler(handler)
440
441 // Should not trigger an auto-commit
442 expected := int64(1)
443 pom.ResetOffset(expected, "modified_meta")
444 _, _ = pom.NextOffset()
445
446 select {
447 case <-called:
448 // OffsetManager called on the wire.
449 t.Errorf("Received request when AutoCommit is disabled")
450 case <-time.After(timeout):
451 // Timeout waiting for OffsetManager to call on the wire.
452 // OK
453 }
454
455 // Setup again to test manual commit
456 called = make(chan none)
457
458 om.Commit()
459
460 select {
461 case <-called:
462 // OffsetManager called on the wire.
463 // OK
464 case <-time.After(timeout):
465 // Timeout waiting for OffsetManager to call on the wire.
466 t.Errorf("No request received for after waiting for %v", timeout)
467 }
468
469 // !! om must be closed before the pom so pom.release() is called before pom.Close()
470 safeClose(t, om)
471 safeClose(t, pom)
472 safeClose(t, testClient)
473}
474
475// Test recovery from ErrNotCoordinatorForConsumer
476// on first fetchInitialOffset call

Callers

nothing calls this directly

Calls 11

setHandlerMethod · 0.80
NewTestConfigFunction · 0.70
safeCloseFunction · 0.70
CloseMethod · 0.65
ResetOffsetMethod · 0.65
NextOffsetMethod · 0.65
ErrorfMethod · 0.65
CommitMethod · 0.65
AddErrorMethod · 0.45

Tested by

no test coverage detected