| 123 | } |
| 124 | |
| 125 | func (c *Client) timeout(ctx context.Context, defaultTimeout time.Duration) time.Duration { |
| 126 | timeout := c.Timeout |
| 127 | |
| 128 | if deadline, ok := ctx.Deadline(); ok { |
| 129 | if remain := time.Until(deadline); remain < timeout { |
| 130 | timeout = remain |
| 131 | } |
| 132 | } |
| 133 | |
| 134 | if timeout > 0 { |
| 135 | // Half the timeout because it is communicated to kafka in multiple |
| 136 | // requests (e.g. Fetch, Produce, etc...), this adds buffer to account |
| 137 | // for network latency when waiting for the response from kafka. |
| 138 | return timeout / 2 |
| 139 | } |
| 140 | |
| 141 | return defaultTimeout |
| 142 | } |
| 143 | |
| 144 | func (c *Client) timeoutMs(ctx context.Context, defaultTimeout time.Duration) int32 { |
| 145 | return milliseconds(c.timeout(ctx, defaultTimeout)) |