MCPcopy
hub / github.com/IBM/sarama / TestAsyncProducerPartitionUnmuting

Function TestAsyncProducerPartitionUnmuting

async_producer_test.go:3054–3194  ·  view source on GitHub ↗

TestAsyncProducerPartitionUnmuting verifies that partitions are properly unmuted in all error paths: send errors, NoResponse acks, etc. Without proper unmuting, partitions remain muted and subsequent messages would block indefinitely.

(t *testing.T)

Source from the content-addressed store, hash-verified

3052// in all error paths: send errors, NoResponse acks, etc. Without proper unmuting,
3053// partitions remain muted and subsequent messages would block indefinitely.
3054func TestAsyncProducerPartitionUnmuting(t *testing.T) {
3055 const topic = "test_topic"
3056
3057 t.Run("NoResponse acks unmute partitions", func(t *testing.T) {
3058 broker := NewMockBroker(t, 1)
3059 defer broker.Close()
3060
3061 metadataResponse := NewMockMetadataResponse(t).
3062 SetBroker(broker.Addr(), broker.BrokerID()).
3063 SetLeader(topic, 0, broker.BrokerID())
3064 broker.SetHandlerByMap(map[string]MockResponse{
3065 "MetadataRequest": metadataResponse,
3066 })
3067
3068 config := NewTestConfig()
3069 config.Producer.RequiredAcks = NoResponse
3070 config.Producer.Return.Successes = true
3071 config.Producer.Flush.Messages = 1
3072 config.Net.MaxOpenRequests = 5
3073
3074 producer, err := NewAsyncProducer([]string{broker.Addr()}, config)
3075 if err != nil {
3076 t.Fatal(err)
3077 }
3078
3079 for range 3 {
3080 producer.Input() <- &ProducerMessage{
3081 Topic: topic,
3082 Partition: 0,
3083 Value: StringEncoder("msg"),
3084 }
3085 }
3086
3087 successCount := 0
3088 for i := range 3 {
3089 select {
3090 case <-producer.Successes():
3091 successCount++
3092 case err := <-producer.Errors():
3093 t.Fatalf("unexpected error: %v", err)
3094 case <-time.After(5 * time.Second):
3095 t.Fatalf("timeout waiting for success %d (got %d) - partitions may not be unmuted", i+1, successCount)
3096 }
3097 }
3098
3099 if successCount != 3 {
3100 t.Errorf("expected 3 successes, got %d", successCount)
3101 }
3102
3103 closeProducer(t, producer)
3104 })
3105
3106 t.Run("retry keeps partition muted until queued", func(t *testing.T) {
3107 broker := NewMockBroker(t, 1)
3108 defer broker.Close()
3109
3110 metadataResponse := new(MetadataResponse)
3111 metadataResponse.AddBroker(broker.Addr(), broker.BrokerID())

Callers

nothing calls this directly

Calls 15

CloseMethod · 0.95
AddrMethod · 0.95
BrokerIDMethod · 0.95
SetHandlerByMapMethod · 0.95
InputMethod · 0.95
SuccessesMethod · 0.95
ErrorsMethod · 0.95
setHandlerMethod · 0.95
NewMockBrokerFunction · 0.85
NewMockMetadataResponseFunction · 0.85
StringEncoderTypeAlias · 0.85
closeProducerFunction · 0.85

Tested by

no test coverage detected