MCPcopy
hub / github.com/IBM/sarama / TestSyncProducerUseTxn

Function TestSyncProducerUseTxn

mocks/sync_producer_test.go:90–156  ·  view source on GitHub ↗
(t *testing.T)

Source from the content-addressed store, hash-verified

88}
89
90func TestSyncProducerUseTxn(t *testing.T) {
91 config := NewTestConfig()
92 config.Producer.Transaction.ID = "test"
93 config.Producer.RequiredAcks = sarama.WaitForAll
94 config.Producer.Retry.Backoff = 0
95 config.Producer.Idempotent = true
96 config.Net.MaxOpenRequests = 1
97 config.Version = sarama.V0_11_0_0
98
99 sp := NewSyncProducer(t, config)
100 defer func() {
101 if err := sp.Close(); err != nil {
102 t.Error(err)
103 }
104 }()
105
106 if !sp.IsTransactional() {
107 t.Error("producer must be transactional")
108 }
109
110 sp.ExpectSendMessageAndSucceed()
111
112 msg := &sarama.ProducerMessage{Topic: "test", Value: sarama.StringEncoder("test")}
113
114 err := sp.BeginTxn()
115 if err != nil {
116 t.Errorf("txn can't be started, got %s", err)
117 }
118 if sp.TxnStatus()&sarama.ProducerTxnFlagInTransaction == 0 {
119 t.Error("transaction must be started")
120 }
121 _, offset, err := sp.SendMessage(msg)
122 if err != nil {
123 t.Errorf("The first message should have been produced successfully, but got %s", err)
124 }
125 if offset != 1 || offset != msg.Offset {
126 t.Errorf("The first message should have been assigned offset 1, but got %d", msg.Offset)
127 }
128
129 if err := sp.AddMessageToTxn(&sarama.ConsumerMessage{
130 Topic: "original-topic",
131 Partition: 0,
132 Offset: 123,
133 }, "test-group", nil); err != nil {
134 t.Error(err)
135 }
136
137 if err := sp.AddOffsetsToTxn(map[string][]*sarama.PartitionOffsetMetadata{
138 "original-topic": {
139 {
140 Partition: 1,
141 Offset: 321,
142 },
143 },
144 }, "test-group"); err != nil {
145 t.Error(err)
146 }
147

Callers

nothing calls this directly

Calls 14

CloseMethod · 0.95
IsTransactionalMethod · 0.95
BeginTxnMethod · 0.95
TxnStatusMethod · 0.95
SendMessageMethod · 0.95
AddMessageToTxnMethod · 0.95
AddOffsetsToTxnMethod · 0.95
CommitTxnMethod · 0.95
StringEncoderTypeAlias · 0.92
NewTestConfigFunction · 0.70
NewSyncProducerFunction · 0.70

Tested by

no test coverage detected