MCPcopy
hub / github.com/IBM/sarama / TestFuncOffsetManager

Function TestFuncOffsetManager

functional_offset_manager_test.go:9–53  ·  view source on GitHub ↗
(t *testing.T)

Source from the content-addressed store, hash-verified

7)
8
9func TestFuncOffsetManager(t *testing.T) {
10 t.Parallel()
11 checkKafkaVersion(t, "0.8.2")
12 setupFunctionalTest(t)
13 defer teardownFunctionalTest(t)
14
15 client, err := NewClient(FunctionalTestEnv.KafkaBrokerAddrs, NewFunctionalTestConfig())
16 if err != nil {
17 t.Fatal(err)
18 }
19
20 offsetManager, err := NewOffsetManagerFromClient("sarama.TestFuncOffsetManager", client)
21 if err != nil {
22 t.Fatal(err)
23 }
24
25 pom1, err := offsetManager.ManagePartition("test.1", 0)
26 if err != nil {
27 t.Fatal(err)
28 }
29
30 pom1.MarkOffset(10, "test metadata")
31 safeClose(t, pom1)
32
33 // Avoid flaky test: submit offset & let om cleanup removed poms
34 offsetManager.Commit()
35
36 pom2, err := offsetManager.ManagePartition("test.1", 0)
37 if err != nil {
38 t.Fatal(err)
39 }
40
41 offset, metadata := pom2.NextOffset()
42
43 if offset != 10 {
44 t.Errorf("Expected the next offset to be 10, found %d.", offset)
45 }
46 if metadata != "test metadata" {
47 t.Errorf("Expected metadata to be 'test metadata', found %s.", metadata)
48 }
49
50 safeClose(t, pom2)
51 safeClose(t, offsetManager)
52 safeClose(t, client)
53}

Callers

nothing calls this directly

Calls 13

checkKafkaVersionFunction · 0.85
setupFunctionalTestFunction · 0.85
teardownFunctionalTestFunction · 0.85
NewFunctionalTestConfigFunction · 0.85
FatalMethod · 0.80
NewClientFunction · 0.70
safeCloseFunction · 0.70
ManagePartitionMethod · 0.65
MarkOffsetMethod · 0.65
CommitMethod · 0.65
NextOffsetMethod · 0.65

Tested by

no test coverage detected