(t *testing.T)
| 88 | } |
| 89 | |
| 90 | func TestSyncProducerUseTxn(t *testing.T) { |
| 91 | config := NewTestConfig() |
| 92 | config.Producer.Transaction.ID = "test" |
| 93 | config.Producer.RequiredAcks = sarama.WaitForAll |
| 94 | config.Producer.Retry.Backoff = 0 |
| 95 | config.Producer.Idempotent = true |
| 96 | config.Net.MaxOpenRequests = 1 |
| 97 | config.Version = sarama.V0_11_0_0 |
| 98 | |
| 99 | sp := NewSyncProducer(t, config) |
| 100 | defer func() { |
| 101 | if err := sp.Close(); err != nil { |
| 102 | t.Error(err) |
| 103 | } |
| 104 | }() |
| 105 | |
| 106 | if !sp.IsTransactional() { |
| 107 | t.Error("producer must be transactional") |
| 108 | } |
| 109 | |
| 110 | sp.ExpectSendMessageAndSucceed() |
| 111 | |
| 112 | msg := &sarama.ProducerMessage{Topic: "test", Value: sarama.StringEncoder("test")} |
| 113 | |
| 114 | err := sp.BeginTxn() |
| 115 | if err != nil { |
| 116 | t.Errorf("txn can't be started, got %s", err) |
| 117 | } |
| 118 | if sp.TxnStatus()&sarama.ProducerTxnFlagInTransaction == 0 { |
| 119 | t.Error("transaction must be started") |
| 120 | } |
| 121 | _, offset, err := sp.SendMessage(msg) |
| 122 | if err != nil { |
| 123 | t.Errorf("The first message should have been produced successfully, but got %s", err) |
| 124 | } |
| 125 | if offset != 1 || offset != msg.Offset { |
| 126 | t.Errorf("The first message should have been assigned offset 1, but got %d", msg.Offset) |
| 127 | } |
| 128 | |
| 129 | if err := sp.AddMessageToTxn(&sarama.ConsumerMessage{ |
| 130 | Topic: "original-topic", |
| 131 | Partition: 0, |
| 132 | Offset: 123, |
| 133 | }, "test-group", nil); err != nil { |
| 134 | t.Error(err) |
| 135 | } |
| 136 | |
| 137 | if err := sp.AddOffsetsToTxn(map[string][]*sarama.PartitionOffsetMetadata{ |
| 138 | "original-topic": { |
| 139 | { |
| 140 | Partition: 1, |
| 141 | Offset: 321, |
| 142 | }, |
| 143 | }, |
| 144 | }, "test-group"); err != nil { |
| 145 | t.Error(err) |
| 146 | } |
| 147 |
nothing calls this directly
no test coverage detected