(t *testing.T)
| 14 | ) |
| 15 | |
| 16 | func TestClientDescribeGroups(t *testing.T) { |
| 17 | if os.Getenv("KAFKA_VERSION") == "2.3.1" { |
| 18 | // There's a bug in 2.3.1 that causes the MemberMetadata to be in the wrong format and thus |
| 19 | // leads to an error when decoding the DescribeGroupsResponse. |
| 20 | // |
| 21 | // See https://issues.apache.org/jira/browse/KAFKA-9150 for details. |
| 22 | t.Skip("Skipping because kafka version is 2.3.1") |
| 23 | } |
| 24 | |
| 25 | client, shutdown := newLocalClient() |
| 26 | defer shutdown() |
| 27 | |
| 28 | topic := makeTopic() |
| 29 | gid := fmt.Sprintf("%s-test-group", topic) |
| 30 | |
| 31 | createTopic(t, topic, 2) |
| 32 | defer deleteTopic(t, topic) |
| 33 | |
| 34 | w := newTestWriter(WriterConfig{ |
| 35 | Topic: topic, |
| 36 | }) |
| 37 | |
| 38 | ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) |
| 39 | defer cancel() |
| 40 | |
| 41 | err := w.WriteMessages( |
| 42 | ctx, |
| 43 | Message{ |
| 44 | Key: []byte("key"), |
| 45 | Value: []byte("value"), |
| 46 | }, |
| 47 | ) |
| 48 | |
| 49 | if err != nil { |
| 50 | t.Fatal(err) |
| 51 | } |
| 52 | |
| 53 | r := NewReader(ReaderConfig{ |
| 54 | Brokers: []string{"localhost:9092"}, |
| 55 | Topic: topic, |
| 56 | GroupID: gid, |
| 57 | MinBytes: 10, |
| 58 | MaxBytes: 1000, |
| 59 | }) |
| 60 | _, err = r.ReadMessage(ctx) |
| 61 | if err != nil { |
| 62 | t.Fatal(err) |
| 63 | } |
| 64 | |
| 65 | resp, err := client.DescribeGroups( |
| 66 | ctx, |
| 67 | &DescribeGroupsRequest{ |
| 68 | GroupIDs: []string{gid}, |
| 69 | }, |
| 70 | ) |
| 71 | if err != nil { |
| 72 | t.Fatal(err) |
| 73 | } |
nothing calls this directly
no test coverage detected