TestAsyncProducerPartitionUnmuting verifies that partitions are properly unmuted in all error paths: send errors, NoResponse acks, etc. Without proper unmuting, partitions remain muted and subsequent messages would block indefinitely.
(t *testing.T)
| 3052 | // in all error paths: send errors, NoResponse acks, etc. Without proper unmuting, |
| 3053 | // partitions remain muted and subsequent messages would block indefinitely. |
| 3054 | func TestAsyncProducerPartitionUnmuting(t *testing.T) { |
| 3055 | const topic = "test_topic" |
| 3056 | |
| 3057 | t.Run("NoResponse acks unmute partitions", func(t *testing.T) { |
| 3058 | broker := NewMockBroker(t, 1) |
| 3059 | defer broker.Close() |
| 3060 | |
| 3061 | metadataResponse := NewMockMetadataResponse(t). |
| 3062 | SetBroker(broker.Addr(), broker.BrokerID()). |
| 3063 | SetLeader(topic, 0, broker.BrokerID()) |
| 3064 | broker.SetHandlerByMap(map[string]MockResponse{ |
| 3065 | "MetadataRequest": metadataResponse, |
| 3066 | }) |
| 3067 | |
| 3068 | config := NewTestConfig() |
| 3069 | config.Producer.RequiredAcks = NoResponse |
| 3070 | config.Producer.Return.Successes = true |
| 3071 | config.Producer.Flush.Messages = 1 |
| 3072 | config.Net.MaxOpenRequests = 5 |
| 3073 | |
| 3074 | producer, err := NewAsyncProducer([]string{broker.Addr()}, config) |
| 3075 | if err != nil { |
| 3076 | t.Fatal(err) |
| 3077 | } |
| 3078 | |
| 3079 | for range 3 { |
| 3080 | producer.Input() <- &ProducerMessage{ |
| 3081 | Topic: topic, |
| 3082 | Partition: 0, |
| 3083 | Value: StringEncoder("msg"), |
| 3084 | } |
| 3085 | } |
| 3086 | |
| 3087 | successCount := 0 |
| 3088 | for i := range 3 { |
| 3089 | select { |
| 3090 | case <-producer.Successes(): |
| 3091 | successCount++ |
| 3092 | case err := <-producer.Errors(): |
| 3093 | t.Fatalf("unexpected error: %v", err) |
| 3094 | case <-time.After(5 * time.Second): |
| 3095 | t.Fatalf("timeout waiting for success %d (got %d) - partitions may not be unmuted", i+1, successCount) |
| 3096 | } |
| 3097 | } |
| 3098 | |
| 3099 | if successCount != 3 { |
| 3100 | t.Errorf("expected 3 successes, got %d", successCount) |
| 3101 | } |
| 3102 | |
| 3103 | closeProducer(t, producer) |
| 3104 | }) |
| 3105 | |
| 3106 | t.Run("retry keeps partition muted until queued", func(t *testing.T) { |
| 3107 | broker := NewMockBroker(t, 1) |
| 3108 | defer broker.Close() |
| 3109 | |
| 3110 | metadataResponse := new(MetadataResponse) |
| 3111 | metadataResponse.AddBroker(broker.Addr(), broker.BrokerID()) |
nothing calls this directly
no test coverage detected