(t *testing.T, config ReaderConfig)
| 1642 | } |
| 1643 | |
| 1644 | func getOffsets(t *testing.T, config ReaderConfig) map[int]int64 { |
| 1645 | // minimal config required to lookup coordinator |
| 1646 | cg := ConsumerGroup{ |
| 1647 | config: ConsumerGroupConfig{ |
| 1648 | ID: config.GroupID, |
| 1649 | Brokers: config.Brokers, |
| 1650 | Dialer: config.Dialer, |
| 1651 | }, |
| 1652 | } |
| 1653 | |
| 1654 | conn, err := cg.coordinator() |
| 1655 | if err != nil { |
| 1656 | t.Errorf("unable to connect to coordinator: %v", err) |
| 1657 | } |
| 1658 | defer conn.Close() |
| 1659 | |
| 1660 | offsets, err := conn.offsetFetch(offsetFetchRequestV1{ |
| 1661 | GroupID: config.GroupID, |
| 1662 | Topics: []offsetFetchRequestV1Topic{{ |
| 1663 | Topic: config.Topic, |
| 1664 | Partitions: []int32{0}, |
| 1665 | }}, |
| 1666 | }) |
| 1667 | if err != nil { |
| 1668 | t.Errorf("bad fetchOffsets: %v", err) |
| 1669 | } |
| 1670 | |
| 1671 | m := map[int]int64{} |
| 1672 | |
| 1673 | for _, r := range offsets.Responses { |
| 1674 | if r.Topic == config.Topic { |
| 1675 | for _, p := range r.PartitionResponses { |
| 1676 | m[int(p.Partition)] = p.Offset |
| 1677 | } |
| 1678 | } |
| 1679 | } |
| 1680 | |
| 1681 | return m |
| 1682 | } |
| 1683 | |
| 1684 | const ( |
| 1685 | connTO = 1 * time.Second |
no test coverage detected