(t *testing.T)
| 617 | } |
| 618 | |
| 619 | func TestPartitionOffsetManagerInitialOffset(t *testing.T) { |
| 620 | om, testClient, broker, coordinator := initOffsetManager(t, 0) |
| 621 | defer broker.Close() |
| 622 | defer coordinator.Close() |
| 623 | testClient.Config().Consumer.Offsets.Initial = OffsetOldest |
| 624 | |
| 625 | // Kafka returns -1 if no offset has been stored for this partition yet. |
| 626 | pom := initPartitionOffsetManager(t, om, coordinator, -1, "") |
| 627 | |
| 628 | offset, meta := pom.NextOffset() |
| 629 | if offset != OffsetOldest { |
| 630 | t.Errorf("Expected offset 5. Actual: %v", offset) |
| 631 | } |
| 632 | if meta != "" { |
| 633 | t.Errorf("Expected metadata to be empty. Actual: %q", meta) |
| 634 | } |
| 635 | |
| 636 | safeClose(t, pom) |
| 637 | safeClose(t, om) |
| 638 | safeClose(t, testClient) |
| 639 | } |
| 640 | |
| 641 | func TestPartitionOffsetManagerNextOffset(t *testing.T) { |
| 642 | om, testClient, broker, coordinator := initOffsetManager(t, 0) |
nothing calls this directly
no test coverage detected