(t *testing.T)
| 658 | } |
| 659 | |
| 660 | func TestPartitionOffsetManagerResetOffset(t *testing.T) { |
| 661 | om, testClient, broker, coordinator := initOffsetManager(t, 0) |
| 662 | defer broker.Close() |
| 663 | defer coordinator.Close() |
| 664 | pom := initPartitionOffsetManager(t, om, coordinator, 5, "original_meta") |
| 665 | |
| 666 | ocResponse := new(OffsetCommitResponse) |
| 667 | ocResponse.AddError("my_topic", 0, ErrNoError) |
| 668 | coordinator.Returns(ocResponse) |
| 669 | |
| 670 | expected := int64(1) |
| 671 | pom.ResetOffset(expected, "modified_meta") |
| 672 | actual, meta := pom.NextOffset() |
| 673 | |
| 674 | if actual != expected { |
| 675 | t.Errorf("Expected offset %v. Actual: %v", expected, actual) |
| 676 | } |
| 677 | if meta != "modified_meta" { |
| 678 | t.Errorf("Expected metadata \"modified_meta\". Actual: %q", meta) |
| 679 | } |
| 680 | |
| 681 | safeClose(t, pom) |
| 682 | safeClose(t, om) |
| 683 | safeClose(t, testClient) |
| 684 | } |
| 685 | |
| 686 | func TestPartitionOffsetManagerResetOffsetWithRetention(t *testing.T) { |
| 687 | om, testClient, broker, coordinator := initOffsetManager(t, time.Hour) |
nothing calls this directly
no test coverage detected