(t *testing.T)
| 60 | } |
| 61 | |
| 62 | func TestSyncProducerTransactional(t *testing.T) { |
| 63 | seedBroker := NewMockBroker(t, 1) |
| 64 | defer seedBroker.Close() |
| 65 | leader := NewMockBroker(t, 2) |
| 66 | defer leader.Close() |
| 67 | |
| 68 | config := NewTestConfig() |
| 69 | config.Version = V0_11_0_0 |
| 70 | config.Producer.RequiredAcks = WaitForAll |
| 71 | config.Producer.Return.Successes = true |
| 72 | config.Producer.Transaction.ID = "test" |
| 73 | config.Producer.Idempotent = true |
| 74 | config.Producer.Retry.Max = 5 |
| 75 | config.Net.MaxOpenRequests = 1 |
| 76 | |
| 77 | metadataResponse := new(MetadataResponse) |
| 78 | metadataResponse.Version = 4 |
| 79 | metadataResponse.ControllerID = leader.BrokerID() |
| 80 | metadataResponse.AddBroker(leader.Addr(), leader.BrokerID()) |
| 81 | metadataResponse.AddTopic("my_topic", ErrNoError) |
| 82 | metadataResponse.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, nil, ErrNoError) |
| 83 | seedBroker.Returns(metadataResponse) |
| 84 | |
| 85 | client, err := NewClient([]string{seedBroker.Addr()}, config) |
| 86 | if err != nil { |
| 87 | t.Fatal(err) |
| 88 | } |
| 89 | defer safeClose(t, client) |
| 90 | |
| 91 | findCoordinatorResponse := new(FindCoordinatorResponse) |
| 92 | findCoordinatorResponse.Coordinator = client.Brokers()[0] |
| 93 | findCoordinatorResponse.Version = 1 |
| 94 | leader.Returns(findCoordinatorResponse) |
| 95 | |
| 96 | initProducerIdResponse := new(InitProducerIDResponse) |
| 97 | leader.Returns(initProducerIdResponse) |
| 98 | |
| 99 | addPartitionToTxn := new(AddPartitionsToTxnResponse) |
| 100 | addPartitionToTxn.Errors = map[string][]*PartitionError{ |
| 101 | "my_topic": { |
| 102 | { |
| 103 | Partition: 0, |
| 104 | }, |
| 105 | }, |
| 106 | } |
| 107 | leader.Returns(addPartitionToTxn) |
| 108 | |
| 109 | prodSuccess := new(ProduceResponse) |
| 110 | prodSuccess.Version = 3 |
| 111 | prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError) |
| 112 | for range 10 { |
| 113 | leader.Returns(prodSuccess) |
| 114 | } |
| 115 | |
| 116 | endTxnResponse := &EndTxnResponse{} |
| 117 | leader.Returns(endTxnResponse) |
| 118 | |
| 119 | producer, err := NewSyncProducerFromClient(client) |
nothing calls this directly
no test coverage detected