MCPcopy
hub / github.com/IBM/sarama / TestAbortPartitionOffsetManager

Function TestAbortPartitionOffsetManager

offset_manager_test.go:844–871  ·  view source on GitHub ↗

Test of recovery from abort

(t *testing.T)

Source from the content-addressed store, hash-verified

842
843// Test of recovery from abort
844func TestAbortPartitionOffsetManager(t *testing.T) {
845 om, testClient, broker, coordinator := initOffsetManager(t, 0)
846 defer broker.Close()
847 pom := initPartitionOffsetManager(t, om, coordinator, 5, "meta")
848
849 // this triggers an error in the CommitOffset request,
850 // which leads to the abort call
851 coordinator.Close()
852
853 // Response to refresh coordinator request
854 newCoordinator := NewMockBroker(t, 3)
855 defer newCoordinator.Close()
856 broker.Returns(&ConsumerMetadataResponse{
857 CoordinatorID: newCoordinator.BrokerID(),
858 CoordinatorHost: "127.0.0.1",
859 CoordinatorPort: newCoordinator.Port(),
860 })
861
862 ocResponse := new(OffsetCommitResponse)
863 ocResponse.AddError("my_topic", 0, ErrNoError)
864 newCoordinator.Returns(ocResponse)
865
866 pom.MarkOffset(100, "modified_meta")
867
868 safeClose(t, pom)
869 safeClose(t, om)
870 safeClose(t, testClient)
871}
872
873// Validate that the constructRequest() method correctly maps Sarama's default for
874// Config.Consumer.Offsets.Retention to the equivalent Kafka value.

Callers

nothing calls this directly

Calls 11

CloseMethod · 0.95
ReturnsMethod · 0.95
BrokerIDMethod · 0.95
PortMethod · 0.95
initOffsetManagerFunction · 0.85
NewMockBrokerFunction · 0.85
safeCloseFunction · 0.70
CloseMethod · 0.65
MarkOffsetMethod · 0.65
AddErrorMethod · 0.45

Tested by

no test coverage detected