MCPcopy
hub / github.com/IBM/sarama / TestTxnOffsetCommitGroupMetadata

Function TestTxnOffsetCommitGroupMetadata

transaction_manager_test.go:467–517  ·  view source on GitHub ↗
(t *testing.T)

Source from the content-addressed store, hash-verified

465}
466
467func TestTxnOffsetCommitGroupMetadata(t *testing.T) {
468 broker := NewMockBroker(t, 1)
469 defer broker.Close()
470
471 config := NewTestConfig()
472 config.Producer.Idempotent = true
473 config.Producer.Transaction.ID = "test"
474 config.Version = V2_5_0_0
475 config.Producer.RequiredAcks = WaitForAll
476 config.Net.MaxOpenRequests = 1
477
478 client, txmng := newMockTxnManager(t, broker, config)
479 defer client.Close()
480 txmng.status = ProducerTxnFlagInTransaction
481
482 broker.Returns(&AddOffsetsToTxnResponse{Err: ErrNoError})
483 broker.Returns(&FindCoordinatorResponse{Coordinator: client.Brokers()[0], Err: ErrNoError})
484 broker.Returns(&TxnOffsetCommitResponse{
485 Topics: map[string][]*PartitionError{
486 "test-topic": {{Partition: 0, Err: ErrNoError}},
487 },
488 })
489
490 instanceID := "instance-1"
491 groupMetadata := &ConsumerGroupMetadata{
492 GroupID: "test-group",
493 GenerationID: 42,
494 MemberID: "member-1",
495 GroupInstanceID: &instanceID,
496 }
497 offsets := topicPartitionOffsets{
498 topicPartition{topic: "test-topic", partition: 0}: {Partition: 0, Offset: 0},
499 }
500
501 _, err := txmng.publishOffsetsToTxn(offsets, groupMetadata)
502 require.NoError(t, err)
503
504 var committed *TxnOffsetCommitRequest
505 for _, rr := range broker.History() {
506 if req, ok := rr.Request.(*TxnOffsetCommitRequest); ok {
507 committed = req
508 }
509 }
510 require.NotNil(t, committed, "expected a TxnOffsetCommitRequest to reach the broker")
511 assert.EqualValues(t, 3, committed.Version)
512 assert.Equal(t, "test-group", committed.GroupID)
513 assert.EqualValues(t, 42, committed.GenerationID)
514 assert.Equal(t, "member-1", committed.MemberID)
515 require.NotNil(t, committed.GroupInstanceID)
516 assert.Equal(t, instanceID, *committed.GroupInstanceID)
517}
518
519func TestTxnOffsetsCommit(t *testing.T) {
520 type testCase struct {

Callers

nothing calls this directly

Calls 9

CloseMethod · 0.95
ReturnsMethod · 0.95
HistoryMethod · 0.95
NewMockBrokerFunction · 0.85
newMockTxnManagerFunction · 0.85
publishOffsetsToTxnMethod · 0.80
NewTestConfigFunction · 0.70
CloseMethod · 0.65
BrokersMethod · 0.65

Tested by

no test coverage detected