MCPcopy
hub / github.com/IBM/sarama / TestPartitionOffsetManagerMarkOffsetWithRetention

Function TestPartitionOffsetManagerMarkOffsetWithRetention

offset_manager_test.go:747–780  ·  view source on GitHub ↗
(t *testing.T)

Source from the content-addressed store, hash-verified

745}
746
747func TestPartitionOffsetManagerMarkOffsetWithRetention(t *testing.T) {
748 om, testClient, broker, coordinator := initOffsetManager(t, time.Hour)
749 defer broker.Close()
750 defer coordinator.Close()
751 pom := initPartitionOffsetManager(t, om, coordinator, 5, "original_meta")
752
753 ocResponse := new(OffsetCommitResponse)
754 ocResponse.AddError("my_topic", 0, ErrNoError)
755 handler := func(req *request) (res encoderWithHeader) {
756 if req.body.version() != 2 {
757 t.Errorf("Expected to be using version 2. Actual: %v", req.body.version())
758 }
759 offsetCommitRequest := req.body.(*OffsetCommitRequest)
760 if offsetCommitRequest.RetentionTime != (60 * 60 * 1000) {
761 t.Errorf("Expected an hour retention time. Actual: %v", offsetCommitRequest.RetentionTime)
762 }
763 return ocResponse
764 }
765 coordinator.setHandler(handler)
766
767 pom.MarkOffset(100, "modified_meta")
768 offset, meta := pom.NextOffset()
769
770 if offset != 100 {
771 t.Errorf("Expected offset 100. Actual: %v", offset)
772 }
773 if meta != "modified_meta" {
774 t.Errorf("Expected metadata \"modified_meta\". Actual: %q", meta)
775 }
776
777 safeClose(t, pom)
778 safeClose(t, om)
779 safeClose(t, testClient)
780}
781
782func TestPartitionOffsetManagerCommitErr(t *testing.T) {
783 om, testClient, broker, coordinator := initOffsetManager(t, 0)

Callers

nothing calls this directly

Calls 10

initOffsetManagerFunction · 0.85
setHandlerMethod · 0.80
safeCloseFunction · 0.70
CloseMethod · 0.65
versionMethod · 0.65
ErrorfMethod · 0.65
MarkOffsetMethod · 0.65
NextOffsetMethod · 0.65
AddErrorMethod · 0.45

Tested by

no test coverage detected