MCPcopy
hub / github.com/segmentio/kafka-go / TestClientOffsetCommit

Function TestClientOffsetCommit

offsetcommit_test.go:50–159  ·  view source on GitHub ↗
(t *testing.T)

Source from the content-addressed store, hash-verified

48}
49
50func TestClientOffsetCommit(t *testing.T) {
51 topic := makeTopic()
52 client, shutdown := newLocalClientWithTopic(topic, 3)
53 defer shutdown()
54 now := time.Now()
55
56 const N = 10 * 3
57 records := make([]Record, 0, N)
58 for i := 0; i < N; i++ {
59 records = append(records, Record{
60 Time: now,
61 Value: NewBytes([]byte("test-message-" + strconv.Itoa(i))),
62 })
63 }
64 ctx, cancel := context.WithTimeout(context.Background(), time.Second*30)
65 defer cancel()
66 res, err := client.Produce(ctx, &ProduceRequest{
67 Topic: topic,
68 RequiredAcks: RequireAll,
69 Records: NewRecordReader(records...),
70 })
71 if err != nil {
72 t.Fatal(err)
73 }
74
75 if res.Error != nil {
76 t.Error(res.Error)
77 }
78
79 for index, err := range res.RecordErrors {
80 t.Fatalf("record at index %d produced an error: %v", index, err)
81 }
82 ctx, cancel = context.WithTimeout(context.Background(), time.Second*30)
83 defer cancel()
84 groupID := makeGroupID()
85
86 group, err := NewConsumerGroup(ConsumerGroupConfig{
87 ID: groupID,
88 Topics: []string{topic},
89 Brokers: []string{"localhost:9092"},
90 HeartbeatInterval: 2 * time.Second,
91 RebalanceTimeout: 2 * time.Second,
92 RetentionTime: time.Hour,
93 Logger: log.New(os.Stdout, "cg-test: ", 0),
94 })
95 if err != nil {
96 t.Fatal(err)
97 }
98 defer group.Close()
99
100 gen, err := group.Next(ctx)
101 if err != nil {
102 t.Fatal(err)
103 }
104
105 ocr, err := client.OffsetCommit(ctx, &OffsetCommitRequest{
106 Addr: nil,
107 GroupID: groupID,

Callers

nothing calls this directly

Calls 12

CloseMethod · 0.95
NextMethod · 0.95
makeGroupIDFunction · 0.85
NewConsumerGroupFunction · 0.85
ProduceMethod · 0.80
OffsetCommitMethod · 0.80
OffsetFetchMethod · 0.80
makeTopicFunction · 0.70
newLocalClientWithTopicFunction · 0.70
NewBytesFunction · 0.70
NewRecordReaderFunction · 0.70
ErrorMethod · 0.45

Tested by

no test coverage detected