(ctx context.Context, req Request, state connPoolState)
| 669 | } |
| 670 | |
| 671 | func (p *connPool) sendRequest(ctx context.Context, req Request, state connPoolState) promise { |
| 672 | brokerID := int32(-1) |
| 673 | |
| 674 | switch m := req.(type) { |
| 675 | case protocol.BrokerMessage: |
| 676 | // Some requests are supposed to be sent to specific brokers (e.g. the |
| 677 | // partition leaders). They implement the BrokerMessage interface to |
| 678 | // delegate the routing decision to each message type. |
| 679 | broker, err := m.Broker(state.layout) |
| 680 | if err != nil { |
| 681 | return reject(err) |
| 682 | } |
| 683 | brokerID = broker.ID |
| 684 | |
| 685 | case protocol.GroupMessage: |
| 686 | // Some requests are supposed to be sent to a group coordinator, |
| 687 | // look up which broker is currently the coordinator for the group |
| 688 | // so we can get a connection to that broker. |
| 689 | // |
| 690 | // TODO: should we cache the coordinator info? |
| 691 | p := p.sendRequest(ctx, &findcoordinator.Request{Key: m.Group()}, state) |
| 692 | r, err := p.await(ctx) |
| 693 | if err != nil { |
| 694 | return reject(err) |
| 695 | } |
| 696 | brokerID = r.(*findcoordinator.Response).NodeID |
| 697 | case protocol.TransactionalMessage: |
| 698 | p := p.sendRequest(ctx, &findcoordinator.Request{ |
| 699 | Key: m.Transaction(), |
| 700 | KeyType: int8(CoordinatorKeyTypeTransaction), |
| 701 | }, state) |
| 702 | r, err := p.await(ctx) |
| 703 | if err != nil { |
| 704 | return reject(err) |
| 705 | } |
| 706 | brokerID = r.(*findcoordinator.Response).NodeID |
| 707 | } |
| 708 | |
| 709 | var c *conn |
| 710 | var err error |
| 711 | if brokerID >= 0 { |
| 712 | c, err = p.grabBrokerConn(ctx, brokerID) |
| 713 | } else { |
| 714 | c, err = p.grabClusterConn(ctx) |
| 715 | } |
| 716 | if err != nil { |
| 717 | return reject(err) |
| 718 | } |
| 719 | |
| 720 | res := make(async, 1) |
| 721 | |
| 722 | c.reqs <- connRequest{ |
| 723 | ctx: ctx, |
| 724 | req: req, |
| 725 | res: res, |
| 726 | } |
| 727 | |
| 728 | return res |
no test coverage detected