(t *testing.T)
| 465 | } |
| 466 | |
| 467 | func TestTxnOffsetCommitGroupMetadata(t *testing.T) { |
| 468 | broker := NewMockBroker(t, 1) |
| 469 | defer broker.Close() |
| 470 | |
| 471 | config := NewTestConfig() |
| 472 | config.Producer.Idempotent = true |
| 473 | config.Producer.Transaction.ID = "test" |
| 474 | config.Version = V2_5_0_0 |
| 475 | config.Producer.RequiredAcks = WaitForAll |
| 476 | config.Net.MaxOpenRequests = 1 |
| 477 | |
| 478 | client, txmng := newMockTxnManager(t, broker, config) |
| 479 | defer client.Close() |
| 480 | txmng.status = ProducerTxnFlagInTransaction |
| 481 | |
| 482 | broker.Returns(&AddOffsetsToTxnResponse{Err: ErrNoError}) |
| 483 | broker.Returns(&FindCoordinatorResponse{Coordinator: client.Brokers()[0], Err: ErrNoError}) |
| 484 | broker.Returns(&TxnOffsetCommitResponse{ |
| 485 | Topics: map[string][]*PartitionError{ |
| 486 | "test-topic": {{Partition: 0, Err: ErrNoError}}, |
| 487 | }, |
| 488 | }) |
| 489 | |
| 490 | instanceID := "instance-1" |
| 491 | groupMetadata := &ConsumerGroupMetadata{ |
| 492 | GroupID: "test-group", |
| 493 | GenerationID: 42, |
| 494 | MemberID: "member-1", |
| 495 | GroupInstanceID: &instanceID, |
| 496 | } |
| 497 | offsets := topicPartitionOffsets{ |
| 498 | topicPartition{topic: "test-topic", partition: 0}: {Partition: 0, Offset: 0}, |
| 499 | } |
| 500 | |
| 501 | _, err := txmng.publishOffsetsToTxn(offsets, groupMetadata) |
| 502 | require.NoError(t, err) |
| 503 | |
| 504 | var committed *TxnOffsetCommitRequest |
| 505 | for _, rr := range broker.History() { |
| 506 | if req, ok := rr.Request.(*TxnOffsetCommitRequest); ok { |
| 507 | committed = req |
| 508 | } |
| 509 | } |
| 510 | require.NotNil(t, committed, "expected a TxnOffsetCommitRequest to reach the broker") |
| 511 | assert.EqualValues(t, 3, committed.Version) |
| 512 | assert.Equal(t, "test-group", committed.GroupID) |
| 513 | assert.EqualValues(t, 42, committed.GenerationID) |
| 514 | assert.Equal(t, "member-1", committed.MemberID) |
| 515 | require.NotNil(t, committed.GroupInstanceID) |
| 516 | assert.Equal(t, instanceID, *committed.GroupInstanceID) |
| 517 | } |
| 518 | |
| 519 | func TestTxnOffsetsCommit(t *testing.T) { |
| 520 | type testCase struct { |
nothing calls this directly
no test coverage detected