MCPcopy
hub / github.com/segmentio/kafka-go / coordinator

Method coordinator

consumergroup.go:898–921  ·  view source on GitHub ↗

coordinator establishes a connection to the coordinator for this consumer group.

()

Source from the content-addressed store, hash-verified

896// coordinator establishes a connection to the coordinator for this consumer
897// group.
898func (cg *ConsumerGroup) coordinator() (coordinator, error) {
899 // NOTE : could try to cache the coordinator to avoid the double connect
900 // here. since consumer group balances happen infrequently and are
901 // an expensive operation, we're not currently optimizing that case
902 // in order to keep the code simpler.
903 conn, err := cg.config.connect(cg.config.Dialer, cg.config.Brokers...)
904 if err != nil {
905 return nil, err
906 }
907 defer conn.Close()
908
909 out, err := conn.findCoordinator(findCoordinatorRequestV0{
910 CoordinatorKey: cg.config.ID,
911 })
912 if err == nil && out.ErrorCode != 0 {
913 err = Error(out.ErrorCode)
914 }
915 if err != nil {
916 return nil, err
917 }
918
919 address := net.JoinHostPort(out.Coordinator.Host, strconv.Itoa(int(out.Coordinator.Port)))
920 return cg.config.connect(cg.config.Dialer, address)
921}
922
923// joinGroup attempts to join the reader to the consumer group.
924// Returns GroupMemberAssignments is this Reader was selected as

Callers 3

getOffsetsFunction · 0.95
nextGenerationMethod · 0.95
leaveGroupMethod · 0.95

Calls 4

ErrorTypeAlias · 0.70
findCoordinatorMethod · 0.65
connectMethod · 0.45
CloseMethod · 0.45

Tested by 1

getOffsetsFunction · 0.76