newMockTxnManager wires a transaction manager to broker using the standard metadata, find-coordinator and init-producer-id handshake. The caller owns closing the returned client.
(t *testing.T, broker *MockBroker, config *Config)
| 444 | // metadata, find-coordinator and init-producer-id handshake. The caller owns |
| 445 | // closing the returned client. |
| 446 | func newMockTxnManager(t *testing.T, broker *MockBroker, config *Config) (Client, *transactionManager) { |
| 447 | // the mock broker matches each response version to the request version, so |
| 448 | // MetadataResponse.Version is left at its zero value here |
| 449 | metadataLeader := new(MetadataResponse) |
| 450 | metadataLeader.ControllerID = broker.brokerID |
| 451 | metadataLeader.AddBroker(broker.Addr(), broker.BrokerID()) |
| 452 | metadataLeader.AddTopic("test-topic", ErrNoError) |
| 453 | metadataLeader.AddTopicPartition("test-topic", 0, broker.BrokerID(), nil, nil, nil, ErrNoError) |
| 454 | broker.Returns(metadataLeader) |
| 455 | |
| 456 | client, err := NewClient([]string{broker.Addr()}, config) |
| 457 | require.NoError(t, err) |
| 458 | |
| 459 | broker.Returns(&FindCoordinatorResponse{Coordinator: client.Brokers()[0], Err: ErrNoError}) |
| 460 | broker.Returns(&InitProducerIDResponse{Err: ErrNoError, ProducerID: 1, ProducerEpoch: 0}) |
| 461 | |
| 462 | txmng, err := newTransactionManager(config, client) |
| 463 | require.NoError(t, err) |
| 464 | return client, txmng |
| 465 | } |
| 466 | |
| 467 | func TestTxnOffsetCommitGroupMetadata(t *testing.T) { |
| 468 | broker := NewMockBroker(t, 1) |
no test coverage detected