MCPcopy
hub / github.com/segmentio/kafka-go / TestClientDeleteOffset

Function TestClientDeleteOffset

offsetdelete_test.go:14–160  ·  view source on GitHub ↗
(t *testing.T)

Source from the content-addressed store, hash-verified

12)
13
14func TestClientDeleteOffset(t *testing.T) {
15 if !ktesting.KafkaIsAtLeast("2.4.0") {
16 return
17 }
18
19 topic := makeTopic()
20 client, shutdown := newLocalClientWithTopic(topic, 3)
21 defer shutdown()
22 now := time.Now()
23
24 const N = 10 * 3
25 records := make([]Record, 0, N)
26 for i := 0; i < N; i++ {
27 records = append(records, Record{
28 Time: now,
29 Value: NewBytes([]byte("test-message-" + strconv.Itoa(i))),
30 })
31 }
32 ctx, cancel := context.WithTimeout(context.Background(), time.Second*30)
33 defer cancel()
34 res, err := client.Produce(ctx, &ProduceRequest{
35 Topic: topic,
36 RequiredAcks: RequireAll,
37 Records: NewRecordReader(records...),
38 })
39 if err != nil {
40 t.Fatal(err)
41 }
42
43 if res.Error != nil {
44 t.Error(res.Error)
45 }
46
47 for index, err := range res.RecordErrors {
48 t.Fatalf("record at index %d produced an error: %v", index, err)
49 }
50 ctx, cancel = context.WithTimeout(context.Background(), time.Second*30)
51 defer cancel()
52 groupID := makeGroupID()
53
54 group, err := NewConsumerGroup(ConsumerGroupConfig{
55 ID: groupID,
56 Topics: []string{topic},
57 Brokers: []string{"localhost:9092"},
58 HeartbeatInterval: 2 * time.Second,
59 RebalanceTimeout: 2 * time.Second,
60 RetentionTime: time.Hour,
61 Logger: log.New(os.Stdout, "cg-test: ", 0),
62 })
63 if err != nil {
64 t.Fatal(err)
65 }
66
67 gen, err := group.Next(ctx)
68 if err != nil {
69 t.Fatal(err)
70 }
71

Callers

nothing calls this directly

Calls 13

NextMethod · 0.95
CloseMethod · 0.95
makeGroupIDFunction · 0.85
NewConsumerGroupFunction · 0.85
ProduceMethod · 0.80
OffsetCommitMethod · 0.80
OffsetFetchMethod · 0.80
OffsetDeleteMethod · 0.80
makeTopicFunction · 0.70
newLocalClientWithTopicFunction · 0.70
NewBytesFunction · 0.70
NewRecordReaderFunction · 0.70

Tested by

no test coverage detected