(t *testing.T)
| 579 | } |
| 580 | |
| 581 | func TestCloseLeavesGroup(t *testing.T) { |
| 582 | if os.Getenv("KAFKA_VERSION") == "2.3.1" { |
| 583 | // There's a bug in 2.3.1 that causes the MemberMetadata to be in the wrong format and thus |
| 584 | // leads to an error when decoding the DescribeGroupsResponse. |
| 585 | // |
| 586 | // See https://issues.apache.org/jira/browse/KAFKA-9150 for details. |
| 587 | t.Skip("Skipping because kafka version is 2.3.1") |
| 588 | } |
| 589 | |
| 590 | ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) |
| 591 | defer cancel() |
| 592 | |
| 593 | topic := makeTopic() |
| 594 | createTopic(t, topic, 1) |
| 595 | defer deleteTopic(t, topic) |
| 596 | |
| 597 | groupID := makeGroupID() |
| 598 | r := NewReader(ReaderConfig{ |
| 599 | Brokers: []string{"localhost:9092"}, |
| 600 | Topic: topic, |
| 601 | GroupID: groupID, |
| 602 | MinBytes: 1, |
| 603 | MaxBytes: 10e6, |
| 604 | MaxWait: 100 * time.Millisecond, |
| 605 | RebalanceTimeout: time.Second, |
| 606 | }) |
| 607 | prepareReader(t, ctx, r, Message{Value: []byte("test")}) |
| 608 | |
| 609 | conn, err := Dial("tcp", r.config.Brokers[0]) |
| 610 | if err != nil { |
| 611 | t.Fatalf("error dialing: %v", err) |
| 612 | } |
| 613 | defer conn.Close() |
| 614 | |
| 615 | client, shutdown := newLocalClient() |
| 616 | defer shutdown() |
| 617 | |
| 618 | descGroups := func() DescribeGroupsResponse { |
| 619 | resp, err := client.DescribeGroups( |
| 620 | ctx, |
| 621 | &DescribeGroupsRequest{ |
| 622 | GroupIDs: []string{groupID}, |
| 623 | }, |
| 624 | ) |
| 625 | if err != nil { |
| 626 | t.Fatalf("error from describeGroups %v", err) |
| 627 | } |
| 628 | return *resp |
| 629 | } |
| 630 | |
| 631 | _, err = r.ReadMessage(ctx) |
| 632 | if err != nil { |
| 633 | t.Fatalf("our reader never joind its group or couldn't read a message: %v", err) |
| 634 | } |
| 635 | resp := descGroups() |
| 636 | if len(resp.Groups) != 1 { |
| 637 | t.Fatalf("expected 1 group. got: %d", len(resp.Groups)) |
| 638 | } |
nothing calls this directly
no test coverage detected
searching dependent graphs…