MCPcopy
hub / github.com/segmentio/kafka-go / sendRequest

Method sendRequest

transport.go:671–729  ·  view source on GitHub ↗
(ctx context.Context, req Request, state connPoolState)

Source from the content-addressed store, hash-verified

669}
670
671func (p *connPool) sendRequest(ctx context.Context, req Request, state connPoolState) promise {
672 brokerID := int32(-1)
673
674 switch m := req.(type) {
675 case protocol.BrokerMessage:
676 // Some requests are supposed to be sent to specific brokers (e.g. the
677 // partition leaders). They implement the BrokerMessage interface to
678 // delegate the routing decision to each message type.
679 broker, err := m.Broker(state.layout)
680 if err != nil {
681 return reject(err)
682 }
683 brokerID = broker.ID
684
685 case protocol.GroupMessage:
686 // Some requests are supposed to be sent to a group coordinator,
687 // look up which broker is currently the coordinator for the group
688 // so we can get a connection to that broker.
689 //
690 // TODO: should we cache the coordinator info?
691 p := p.sendRequest(ctx, &findcoordinator.Request{Key: m.Group()}, state)
692 r, err := p.await(ctx)
693 if err != nil {
694 return reject(err)
695 }
696 brokerID = r.(*findcoordinator.Response).NodeID
697 case protocol.TransactionalMessage:
698 p := p.sendRequest(ctx, &findcoordinator.Request{
699 Key: m.Transaction(),
700 KeyType: int8(CoordinatorKeyTypeTransaction),
701 }, state)
702 r, err := p.await(ctx)
703 if err != nil {
704 return reject(err)
705 }
706 brokerID = r.(*findcoordinator.Response).NodeID
707 }
708
709 var c *conn
710 var err error
711 if brokerID >= 0 {
712 c, err = p.grabBrokerConn(ctx, brokerID)
713 } else {
714 c, err = p.grabClusterConn(ctx)
715 }
716 if err != nil {
717 return reject(err)
718 }
719
720 res := make(async, 1)
721
722 c.reqs <- connRequest{
723 ctx: ctx,
724 req: req,
725 res: res,
726 }
727
728 return res

Callers 1

roundTripMethod · 0.95

Calls 7

grabBrokerConnMethod · 0.95
grabClusterConnMethod · 0.95
rejectFunction · 0.85
BrokerMethod · 0.65
GroupMethod · 0.65
awaitMethod · 0.65
TransactionMethod · 0.65

Tested by

no test coverage detected