Test of recovery from abort
(t *testing.T)
| 842 | |
| 843 | // Test of recovery from abort |
| 844 | func TestAbortPartitionOffsetManager(t *testing.T) { |
| 845 | om, testClient, broker, coordinator := initOffsetManager(t, 0) |
| 846 | defer broker.Close() |
| 847 | pom := initPartitionOffsetManager(t, om, coordinator, 5, "meta") |
| 848 | |
| 849 | // this triggers an error in the CommitOffset request, |
| 850 | // which leads to the abort call |
| 851 | coordinator.Close() |
| 852 | |
| 853 | // Response to refresh coordinator request |
| 854 | newCoordinator := NewMockBroker(t, 3) |
| 855 | defer newCoordinator.Close() |
| 856 | broker.Returns(&ConsumerMetadataResponse{ |
| 857 | CoordinatorID: newCoordinator.BrokerID(), |
| 858 | CoordinatorHost: "127.0.0.1", |
| 859 | CoordinatorPort: newCoordinator.Port(), |
| 860 | }) |
| 861 | |
| 862 | ocResponse := new(OffsetCommitResponse) |
| 863 | ocResponse.AddError("my_topic", 0, ErrNoError) |
| 864 | newCoordinator.Returns(ocResponse) |
| 865 | |
| 866 | pom.MarkOffset(100, "modified_meta") |
| 867 | |
| 868 | safeClose(t, pom) |
| 869 | safeClose(t, om) |
| 870 | safeClose(t, testClient) |
| 871 | } |
| 872 | |
| 873 | // Validate that the constructRequest() method correctly maps Sarama's default for |
| 874 | // Config.Consumer.Offsets.Retention to the equivalent Kafka value. |
nothing calls this directly
no test coverage detected