MCPcopy
hub / github.com/IBM/sarama / TestSyncProducerTransactional

Function TestSyncProducerTransactional

sync_producer_test.go:62–163  ·  view source on GitHub ↗
(t *testing.T)

Source from the content-addressed store, hash-verified

60}
61
62func TestSyncProducerTransactional(t *testing.T) {
63 seedBroker := NewMockBroker(t, 1)
64 defer seedBroker.Close()
65 leader := NewMockBroker(t, 2)
66 defer leader.Close()
67
68 config := NewTestConfig()
69 config.Version = V0_11_0_0
70 config.Producer.RequiredAcks = WaitForAll
71 config.Producer.Return.Successes = true
72 config.Producer.Transaction.ID = "test"
73 config.Producer.Idempotent = true
74 config.Producer.Retry.Max = 5
75 config.Net.MaxOpenRequests = 1
76
77 metadataResponse := new(MetadataResponse)
78 metadataResponse.Version = 4
79 metadataResponse.ControllerID = leader.BrokerID()
80 metadataResponse.AddBroker(leader.Addr(), leader.BrokerID())
81 metadataResponse.AddTopic("my_topic", ErrNoError)
82 metadataResponse.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, nil, ErrNoError)
83 seedBroker.Returns(metadataResponse)
84
85 client, err := NewClient([]string{seedBroker.Addr()}, config)
86 if err != nil {
87 t.Fatal(err)
88 }
89 defer safeClose(t, client)
90
91 findCoordinatorResponse := new(FindCoordinatorResponse)
92 findCoordinatorResponse.Coordinator = client.Brokers()[0]
93 findCoordinatorResponse.Version = 1
94 leader.Returns(findCoordinatorResponse)
95
96 initProducerIdResponse := new(InitProducerIDResponse)
97 leader.Returns(initProducerIdResponse)
98
99 addPartitionToTxn := new(AddPartitionsToTxnResponse)
100 addPartitionToTxn.Errors = map[string][]*PartitionError{
101 "my_topic": {
102 {
103 Partition: 0,
104 },
105 },
106 }
107 leader.Returns(addPartitionToTxn)
108
109 prodSuccess := new(ProduceResponse)
110 prodSuccess.Version = 3
111 prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError)
112 for range 10 {
113 leader.Returns(prodSuccess)
114 }
115
116 endTxnResponse := &EndTxnResponse{}
117 leader.Returns(endTxnResponse)
118
119 producer, err := NewSyncProducerFromClient(client)

Callers

nothing calls this directly

Calls 15

CloseMethod · 0.95
BrokerIDMethod · 0.95
AddrMethod · 0.95
ReturnsMethod · 0.95
BrokersMethod · 0.95
NewMockBrokerFunction · 0.85
StringEncoderTypeAlias · 0.85
AddBrokerMethod · 0.80
AddTopicMethod · 0.80
FatalMethod · 0.80
NewTestConfigFunction · 0.70

Tested by

no test coverage detected