(t *testing.T)
| 745 | } |
| 746 | |
| 747 | func TestPartitionOffsetManagerMarkOffsetWithRetention(t *testing.T) { |
| 748 | om, testClient, broker, coordinator := initOffsetManager(t, time.Hour) |
| 749 | defer broker.Close() |
| 750 | defer coordinator.Close() |
| 751 | pom := initPartitionOffsetManager(t, om, coordinator, 5, "original_meta") |
| 752 | |
| 753 | ocResponse := new(OffsetCommitResponse) |
| 754 | ocResponse.AddError("my_topic", 0, ErrNoError) |
| 755 | handler := func(req *request) (res encoderWithHeader) { |
| 756 | if req.body.version() != 2 { |
| 757 | t.Errorf("Expected to be using version 2. Actual: %v", req.body.version()) |
| 758 | } |
| 759 | offsetCommitRequest := req.body.(*OffsetCommitRequest) |
| 760 | if offsetCommitRequest.RetentionTime != (60 * 60 * 1000) { |
| 761 | t.Errorf("Expected an hour retention time. Actual: %v", offsetCommitRequest.RetentionTime) |
| 762 | } |
| 763 | return ocResponse |
| 764 | } |
| 765 | coordinator.setHandler(handler) |
| 766 | |
| 767 | pom.MarkOffset(100, "modified_meta") |
| 768 | offset, meta := pom.NextOffset() |
| 769 | |
| 770 | if offset != 100 { |
| 771 | t.Errorf("Expected offset 100. Actual: %v", offset) |
| 772 | } |
| 773 | if meta != "modified_meta" { |
| 774 | t.Errorf("Expected metadata \"modified_meta\". Actual: %q", meta) |
| 775 | } |
| 776 | |
| 777 | safeClose(t, pom) |
| 778 | safeClose(t, om) |
| 779 | safeClose(t, testClient) |
| 780 | } |
| 781 | |
| 782 | func TestPartitionOffsetManagerCommitErr(t *testing.T) { |
| 783 | om, testClient, broker, coordinator := initOffsetManager(t, 0) |
nothing calls this directly
no test coverage detected