(t *testing.T)
| 7 | ) |
| 8 | |
| 9 | func TestFuncOffsetManager(t *testing.T) { |
| 10 | t.Parallel() |
| 11 | checkKafkaVersion(t, "0.8.2") |
| 12 | setupFunctionalTest(t) |
| 13 | defer teardownFunctionalTest(t) |
| 14 | |
| 15 | client, err := NewClient(FunctionalTestEnv.KafkaBrokerAddrs, NewFunctionalTestConfig()) |
| 16 | if err != nil { |
| 17 | t.Fatal(err) |
| 18 | } |
| 19 | |
| 20 | offsetManager, err := NewOffsetManagerFromClient("sarama.TestFuncOffsetManager", client) |
| 21 | if err != nil { |
| 22 | t.Fatal(err) |
| 23 | } |
| 24 | |
| 25 | pom1, err := offsetManager.ManagePartition("test.1", 0) |
| 26 | if err != nil { |
| 27 | t.Fatal(err) |
| 28 | } |
| 29 | |
| 30 | pom1.MarkOffset(10, "test metadata") |
| 31 | safeClose(t, pom1) |
| 32 | |
| 33 | // Avoid flaky test: submit offset & let om cleanup removed poms |
| 34 | offsetManager.Commit() |
| 35 | |
| 36 | pom2, err := offsetManager.ManagePartition("test.1", 0) |
| 37 | if err != nil { |
| 38 | t.Fatal(err) |
| 39 | } |
| 40 | |
| 41 | offset, metadata := pom2.NextOffset() |
| 42 | |
| 43 | if offset != 10 { |
| 44 | t.Errorf("Expected the next offset to be 10, found %d.", offset) |
| 45 | } |
| 46 | if metadata != "test metadata" { |
| 47 | t.Errorf("Expected metadata to be 'test metadata', found %s.", metadata) |
| 48 | } |
| 49 | |
| 50 | safeClose(t, pom2) |
| 51 | safeClose(t, offsetManager) |
| 52 | safeClose(t, client) |
| 53 | } |
nothing calls this directly
no test coverage detected