(t *testing.T)
| 684 | } |
| 685 | |
| 686 | func TestPartitionOffsetManagerResetOffsetWithRetention(t *testing.T) { |
| 687 | om, testClient, broker, coordinator := initOffsetManager(t, time.Hour) |
| 688 | defer broker.Close() |
| 689 | defer coordinator.Close() |
| 690 | pom := initPartitionOffsetManager(t, om, coordinator, 5, "original_meta") |
| 691 | |
| 692 | ocResponse := new(OffsetCommitResponse) |
| 693 | ocResponse.AddError("my_topic", 0, ErrNoError) |
| 694 | handler := func(req *request) (res encoderWithHeader) { |
| 695 | if req.body.version() != 2 { |
| 696 | t.Errorf("Expected to be using version 2. Actual: %v", req.body.version()) |
| 697 | } |
| 698 | offsetCommitRequest := req.body.(*OffsetCommitRequest) |
| 699 | if offsetCommitRequest.RetentionTime != (60 * 60 * 1000) { |
| 700 | t.Errorf("Expected an hour retention time. Actual: %v", offsetCommitRequest.RetentionTime) |
| 701 | } |
| 702 | return ocResponse |
| 703 | } |
| 704 | coordinator.setHandler(handler) |
| 705 | |
| 706 | expected := int64(1) |
| 707 | pom.ResetOffset(expected, "modified_meta") |
| 708 | actual, meta := pom.NextOffset() |
| 709 | |
| 710 | if actual != expected { |
| 711 | t.Errorf("Expected offset %v. Actual: %v", expected, actual) |
| 712 | } |
| 713 | if meta != "modified_meta" { |
| 714 | t.Errorf("Expected metadata \"modified_meta\". Actual: %q", meta) |
| 715 | } |
| 716 | |
| 717 | safeClose(t, pom) |
| 718 | safeClose(t, om) |
| 719 | safeClose(t, testClient) |
| 720 | } |
| 721 | |
| 722 | func TestPartitionOffsetManagerMarkOffset(t *testing.T) { |
| 723 | om, testClient, broker, coordinator := initOffsetManager(t, 0) |
nothing calls this directly
no test coverage detected