MCPcopy
hub / github.com/IBM/sarama / TestNewOffsetManagerOffsetsAutoCommit

Function TestNewOffsetManagerOffsetsAutoCommit

offset_manager_test.go:367–417  ·  view source on GitHub ↗
(t *testing.T)

Source from the content-addressed store, hash-verified

365}
366
367func TestNewOffsetManagerOffsetsAutoCommit(t *testing.T) {
368 // Tests to validate configuration of `Consumer.Offsets.AutoCommit.Enable`
369 for _, tt := range offsetsautocommitTestTable {
370 t.Run(tt.name, func(t *testing.T) {
371 config := NewTestConfig()
372 if tt.set {
373 config.Consumer.Offsets.AutoCommit.Enable = tt.enable
374 }
375 om, testClient, broker, coordinator := initOffsetManagerWithBackoffFunc(t, 0, nil, config)
376 defer broker.Close()
377 defer coordinator.Close()
378 pom := initPartitionOffsetManager(t, om, coordinator, 5, "original_meta")
379
380 // Wait long enough for the test not to fail..
381 timeout := 50 * config.Consumer.Offsets.AutoCommit.Interval
382
383 called := make(chan none)
384
385 ocResponse := new(OffsetCommitResponse)
386 ocResponse.AddError("my_topic", 0, ErrNoError)
387 handler := func(req *request) (res encoderWithHeader) {
388 close(called)
389 return ocResponse
390 }
391 coordinator.setHandler(handler)
392
393 // Should force an offset commit, if auto-commit is enabled.
394 expected := int64(1)
395 pom.ResetOffset(expected, "modified_meta")
396 _, _ = pom.NextOffset()
397
398 select {
399 case <-called:
400 // OffsetManager called on the wire.
401 if !config.Consumer.Offsets.AutoCommit.Enable {
402 t.Errorf("Received request for: %s when AutoCommit is disabled", tt.name)
403 }
404 case <-time.After(timeout):
405 // Timeout waiting for OffsetManager to call on the wire.
406 if config.Consumer.Offsets.AutoCommit.Enable {
407 t.Errorf("No request received for: %s after waiting for %v", tt.name, timeout)
408 }
409 }
410
411 // !! om must be closed before the pom so pom.release() is called before pom.Close()
412 safeClose(t, om)
413 safeClose(t, pom)
414 safeClose(t, testClient)
415 })
416 }
417}
418
419func TestNewOffsetManagerOffsetsManualCommit(t *testing.T) {
420 // Tests to validate configuration when `Consumer.Offsets.AutoCommit.Enable` is false

Callers

nothing calls this directly

Calls 11

RunMethod · 0.80
setHandlerMethod · 0.80
NewTestConfigFunction · 0.70
safeCloseFunction · 0.70
CloseMethod · 0.65
ResetOffsetMethod · 0.65
NextOffsetMethod · 0.65
ErrorfMethod · 0.65
AddErrorMethod · 0.45

Tested by

no test coverage detected