MCPcopy
hub / github.com/segmentio/kafka-go / TestCloseLeavesGroup

Function TestCloseLeavesGroup

reader_test.go:581–654  ·  view source on GitHub ↗
(t *testing.T)

Source from the content-addressed store, hash-verified

579}
580
581func TestCloseLeavesGroup(t *testing.T) {
582 if os.Getenv("KAFKA_VERSION") == "2.3.1" {
583 // There's a bug in 2.3.1 that causes the MemberMetadata to be in the wrong format and thus
584 // leads to an error when decoding the DescribeGroupsResponse.
585 //
586 // See https://issues.apache.org/jira/browse/KAFKA-9150 for details.
587 t.Skip("Skipping because kafka version is 2.3.1")
588 }
589
590 ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
591 defer cancel()
592
593 topic := makeTopic()
594 createTopic(t, topic, 1)
595 defer deleteTopic(t, topic)
596
597 groupID := makeGroupID()
598 r := NewReader(ReaderConfig{
599 Brokers: []string{"localhost:9092"},
600 Topic: topic,
601 GroupID: groupID,
602 MinBytes: 1,
603 MaxBytes: 10e6,
604 MaxWait: 100 * time.Millisecond,
605 RebalanceTimeout: time.Second,
606 })
607 prepareReader(t, ctx, r, Message{Value: []byte("test")})
608
609 conn, err := Dial("tcp", r.config.Brokers[0])
610 if err != nil {
611 t.Fatalf("error dialing: %v", err)
612 }
613 defer conn.Close()
614
615 client, shutdown := newLocalClient()
616 defer shutdown()
617
618 descGroups := func() DescribeGroupsResponse {
619 resp, err := client.DescribeGroups(
620 ctx,
621 &DescribeGroupsRequest{
622 GroupIDs: []string{groupID},
623 },
624 )
625 if err != nil {
626 t.Fatalf("error from describeGroups %v", err)
627 }
628 return *resp
629 }
630
631 _, err = r.ReadMessage(ctx)
632 if err != nil {
633 t.Fatalf("our reader never joind its group or couldn't read a message: %v", err)
634 }
635 resp := descGroups()
636 if len(resp.Groups) != 1 {
637 t.Fatalf("expected 1 group. got: %d", len(resp.Groups))
638 }

Callers

nothing calls this directly

Calls 13

ReadMessageMethod · 0.95
CloseMethod · 0.95
createTopicFunction · 0.85
deleteTopicFunction · 0.85
makeGroupIDFunction · 0.85
NewReaderFunction · 0.85
prepareReaderFunction · 0.85
DialFunction · 0.85
DescribeGroupsMethod · 0.80
makeTopicFunction · 0.70
newLocalClientFunction · 0.70
CloseMethod · 0.45

Tested by

no test coverage detected

Used in the wild real call sites across dependent graphs

searching dependent graphs…