MCPcopy
hub / github.com/IBM/sarama / TestPartitionOffsetManagerResetOffsetWithRetention

Function TestPartitionOffsetManagerResetOffsetWithRetention

offset_manager_test.go:686–720  ·  view source on GitHub ↗
(t *testing.T)

Source from the content-addressed store, hash-verified

684}
685
686func TestPartitionOffsetManagerResetOffsetWithRetention(t *testing.T) {
687 om, testClient, broker, coordinator := initOffsetManager(t, time.Hour)
688 defer broker.Close()
689 defer coordinator.Close()
690 pom := initPartitionOffsetManager(t, om, coordinator, 5, "original_meta")
691
692 ocResponse := new(OffsetCommitResponse)
693 ocResponse.AddError("my_topic", 0, ErrNoError)
694 handler := func(req *request) (res encoderWithHeader) {
695 if req.body.version() != 2 {
696 t.Errorf("Expected to be using version 2. Actual: %v", req.body.version())
697 }
698 offsetCommitRequest := req.body.(*OffsetCommitRequest)
699 if offsetCommitRequest.RetentionTime != (60 * 60 * 1000) {
700 t.Errorf("Expected an hour retention time. Actual: %v", offsetCommitRequest.RetentionTime)
701 }
702 return ocResponse
703 }
704 coordinator.setHandler(handler)
705
706 expected := int64(1)
707 pom.ResetOffset(expected, "modified_meta")
708 actual, meta := pom.NextOffset()
709
710 if actual != expected {
711 t.Errorf("Expected offset %v. Actual: %v", expected, actual)
712 }
713 if meta != "modified_meta" {
714 t.Errorf("Expected metadata \"modified_meta\". Actual: %q", meta)
715 }
716
717 safeClose(t, pom)
718 safeClose(t, om)
719 safeClose(t, testClient)
720}
721
722func TestPartitionOffsetManagerMarkOffset(t *testing.T) {
723 om, testClient, broker, coordinator := initOffsetManager(t, 0)

Callers

nothing calls this directly

Calls 10

initOffsetManagerFunction · 0.85
setHandlerMethod · 0.80
safeCloseFunction · 0.70
CloseMethod · 0.65
versionMethod · 0.65
ErrorfMethod · 0.65
ResetOffsetMethod · 0.65
NextOffsetMethod · 0.65
AddErrorMethod · 0.45

Tested by

no test coverage detected