coordinator establishes a connection to the coordinator for this consumer group.
()
| 896 | // coordinator establishes a connection to the coordinator for this consumer |
| 897 | // group. |
| 898 | func (cg *ConsumerGroup) coordinator() (coordinator, error) { |
| 899 | // NOTE : could try to cache the coordinator to avoid the double connect |
| 900 | // here. since consumer group balances happen infrequently and are |
| 901 | // an expensive operation, we're not currently optimizing that case |
| 902 | // in order to keep the code simpler. |
| 903 | conn, err := cg.config.connect(cg.config.Dialer, cg.config.Brokers...) |
| 904 | if err != nil { |
| 905 | return nil, err |
| 906 | } |
| 907 | defer conn.Close() |
| 908 | |
| 909 | out, err := conn.findCoordinator(findCoordinatorRequestV0{ |
| 910 | CoordinatorKey: cg.config.ID, |
| 911 | }) |
| 912 | if err == nil && out.ErrorCode != 0 { |
| 913 | err = Error(out.ErrorCode) |
| 914 | } |
| 915 | if err != nil { |
| 916 | return nil, err |
| 917 | } |
| 918 | |
| 919 | address := net.JoinHostPort(out.Coordinator.Host, strconv.Itoa(int(out.Coordinator.Port))) |
| 920 | return cg.config.connect(cg.config.Dialer, address) |
| 921 | } |
| 922 | |
| 923 | // joinGroup attempts to join the reader to the consumer group. |
| 924 | // Returns GroupMemberAssignments is this Reader was selected as |