MCPcopy
hub / github.com/segmentio/kafka-go / getOffsets

Function getOffsets

reader_test.go:1644–1682  ·  view source on GitHub ↗
(t *testing.T, config ReaderConfig)

Source from the content-addressed store, hash-verified

1642}
1643
1644func getOffsets(t *testing.T, config ReaderConfig) map[int]int64 {
1645 // minimal config required to lookup coordinator
1646 cg := ConsumerGroup{
1647 config: ConsumerGroupConfig{
1648 ID: config.GroupID,
1649 Brokers: config.Brokers,
1650 Dialer: config.Dialer,
1651 },
1652 }
1653
1654 conn, err := cg.coordinator()
1655 if err != nil {
1656 t.Errorf("unable to connect to coordinator: %v", err)
1657 }
1658 defer conn.Close()
1659
1660 offsets, err := conn.offsetFetch(offsetFetchRequestV1{
1661 GroupID: config.GroupID,
1662 Topics: []offsetFetchRequestV1Topic{{
1663 Topic: config.Topic,
1664 Partitions: []int32{0},
1665 }},
1666 })
1667 if err != nil {
1668 t.Errorf("bad fetchOffsets: %v", err)
1669 }
1670
1671 m := map[int]int64{}
1672
1673 for _, r := range offsets.Responses {
1674 if r.Topic == config.Topic {
1675 for _, p := range r.PartitionResponses {
1676 m[int(p.Partition)] = p.Offset
1677 }
1678 }
1679 }
1680
1681 return m
1682}
1683
1684const (
1685 connTO = 1 * time.Second

Calls 3

coordinatorMethod · 0.95
offsetFetchMethod · 0.65
CloseMethod · 0.45

Tested by

no test coverage detected