(ctx context.Context, addr net.Addr, msg protocol.Message)
| 100 | } |
| 101 | |
| 102 | func (c *Client) roundTrip(ctx context.Context, addr net.Addr, msg protocol.Message) (protocol.Message, error) { |
| 103 | if c.Timeout > 0 { |
| 104 | var cancel context.CancelFunc |
| 105 | ctx, cancel = context.WithTimeout(ctx, c.Timeout) |
| 106 | defer cancel() |
| 107 | } |
| 108 | |
| 109 | if addr == nil { |
| 110 | if addr = c.Addr; addr == nil { |
| 111 | return nil, errors.New("no address was given for the kafka cluster in the request or on the client") |
| 112 | } |
| 113 | } |
| 114 | |
| 115 | return c.transport().RoundTrip(ctx, addr, msg) |
| 116 | } |
| 117 | |
| 118 | func (c *Client) transport() RoundTripper { |
| 119 | if c.Transport != nil { |
no test coverage detected