(t *testing.T)
| 2625 | } |
| 2626 | |
| 2627 | func TestTxnCanAbort(t *testing.T) { |
| 2628 | broker := NewMockBroker(t, 1) |
| 2629 | defer broker.Close() |
| 2630 | |
| 2631 | config := NewTestConfig() |
| 2632 | config.Producer.Idempotent = true |
| 2633 | config.Producer.Transaction.ID = "test" |
| 2634 | config.Version = V0_11_0_0 |
| 2635 | config.Producer.RequiredAcks = WaitForAll |
| 2636 | config.Producer.Return.Errors = false |
| 2637 | config.Producer.Return.Successes = true |
| 2638 | config.Producer.Retry.Backoff = 0 |
| 2639 | config.Producer.Flush.Messages = 1 |
| 2640 | config.Producer.Retry.Max = 1 |
| 2641 | config.Net.MaxOpenRequests = 1 |
| 2642 | |
| 2643 | var ( |
| 2644 | mu sync.Mutex |
| 2645 | addPartitionsCount int |
| 2646 | produceRequestsCount int |
| 2647 | ) |
| 2648 | |
| 2649 | broker.SetHandlerFuncByMap(map[string]requestHandlerFunc{ |
| 2650 | "MetadataRequest": func(req *request) encoderWithHeader { |
| 2651 | resp := new(MetadataResponse) |
| 2652 | resp.Version = 4 |
| 2653 | resp.ControllerID = broker.BrokerID() |
| 2654 | resp.AddBroker(broker.Addr(), broker.BrokerID()) |
| 2655 | resp.AddTopic("test-topic", ErrNoError) |
| 2656 | resp.AddTopic("test-topic-2", ErrNoError) |
| 2657 | resp.AddTopicPartition("test-topic", 0, broker.BrokerID(), nil, nil, nil, ErrNoError) |
| 2658 | resp.AddTopicPartition("test-topic-2", 0, broker.BrokerID(), nil, nil, nil, ErrNoError) |
| 2659 | return resp |
| 2660 | }, |
| 2661 | "FindCoordinatorRequest": func(req *request) encoderWithHeader { |
| 2662 | resp := new(FindCoordinatorResponse) |
| 2663 | resp.Version = 1 |
| 2664 | resp.Coordinator = &Broker{id: broker.BrokerID(), addr: broker.Addr()} |
| 2665 | resp.Err = ErrNoError |
| 2666 | return resp |
| 2667 | }, |
| 2668 | "InitProducerIDRequest": func(req *request) encoderWithHeader { |
| 2669 | return &InitProducerIDResponse{ |
| 2670 | Err: ErrNoError, |
| 2671 | ProducerID: 1, |
| 2672 | ProducerEpoch: 0, |
| 2673 | } |
| 2674 | }, |
| 2675 | "AddPartitionsToTxnRequest": func(req *request) encoderWithHeader { |
| 2676 | mu.Lock() |
| 2677 | addPartitionsCount++ |
| 2678 | count := addPartitionsCount |
| 2679 | mu.Unlock() |
| 2680 | |
| 2681 | if count == 1 { |
| 2682 | return &AddPartitionsToTxnResponse{ |
| 2683 | Errors: map[string][]*PartitionError{ |
| 2684 | "test-topic-2": {{Partition: 0, Err: ErrNoError}}, |
nothing calls this directly
no test coverage detected