(t *testing.T)
| 216 | } |
| 217 | |
| 218 | func TestReadOnlyAndAllCommittedMessages(t *testing.T) { |
| 219 | t.Skip("TODO: TestReadOnlyAndAllCommittedMessages is periodically failing inexplicably.") |
| 220 | checkKafkaVersion(t, "0.11.0") |
| 221 | setupFunctionalTest(t) |
| 222 | defer teardownFunctionalTest(t) |
| 223 | |
| 224 | config := NewFunctionalTestConfig() |
| 225 | config.ClientID = t.Name() |
| 226 | config.Net.MaxOpenRequests = 1 |
| 227 | config.Consumer.IsolationLevel = ReadCommitted |
| 228 | config.Producer.Idempotent = true |
| 229 | config.Producer.Return.Successes = true |
| 230 | config.Producer.RequiredAcks = WaitForAll |
| 231 | |
| 232 | client, err := NewClient(FunctionalTestEnv.KafkaBrokerAddrs, config) |
| 233 | if err != nil { |
| 234 | t.Fatal(err) |
| 235 | } |
| 236 | defer client.Close() |
| 237 | controller, err := client.Controller() |
| 238 | if err != nil { |
| 239 | t.Fatal(err) |
| 240 | } |
| 241 | defer controller.Close() |
| 242 | |
| 243 | transactionalID := strconv.FormatInt(time.Now().UnixNano()/(1<<22), 10) |
| 244 | |
| 245 | var coordinator *Broker |
| 246 | |
| 247 | // find the transaction coordinator |
| 248 | for { |
| 249 | coordRes, err := controller.FindCoordinator(&FindCoordinatorRequest{ |
| 250 | Version: 2, |
| 251 | CoordinatorKey: transactionalID, |
| 252 | CoordinatorType: CoordinatorTransaction, |
| 253 | }) |
| 254 | if err != nil { |
| 255 | t.Fatal(err) |
| 256 | } |
| 257 | if coordRes.Err != ErrNoError { |
| 258 | continue |
| 259 | } |
| 260 | if err := coordRes.Coordinator.Open(client.Config()); err != nil { |
| 261 | t.Fatal(err) |
| 262 | } |
| 263 | coordinator = coordRes.Coordinator |
| 264 | break |
| 265 | } |
| 266 | |
| 267 | // produce some uncommitted messages to the topic |
| 268 | pidRes, err := coordinator.InitProducerID(&InitProducerIDRequest{ |
| 269 | TransactionalID: &transactionalID, |
| 270 | TransactionTimeout: 10 * time.Second, |
| 271 | }) |
| 272 | if err != nil { |
| 273 | t.Fatal(err) |
| 274 | } |
| 275 | _, _ = coordinator.AddPartitionsToTxn(&AddPartitionsToTxnRequest{ |
nothing calls this directly
no test coverage detected