b.lock must be held by caller
(rb protocolBody, promise *responsePromise)
| 1122 | |
| 1123 | // b.lock must be held by caller |
| 1124 | func (b *Broker) sendInternal(rb protocolBody, promise *responsePromise) error { |
| 1125 | // try restricting API version to ranges advertised by the broker |
| 1126 | if err := restrictApiVersion(rb, b.brokerAPIVersions); err != nil { |
| 1127 | return err |
| 1128 | } |
| 1129 | |
| 1130 | // response versions must always match their corresponding request's |
| 1131 | if promise != nil && promise.response != nil { |
| 1132 | promise.response.setVersion(rb.version()) |
| 1133 | } |
| 1134 | |
| 1135 | if !b.conf.Version.IsAtLeast(rb.requiredVersion()) { |
| 1136 | return ErrUnsupportedVersion |
| 1137 | } |
| 1138 | |
| 1139 | req := &request{correlationID: b.correlationID, clientID: b.conf.ClientID, body: rb} |
| 1140 | buf, err := encode(req, b.metricRegistry) |
| 1141 | if err != nil { |
| 1142 | return err |
| 1143 | } |
| 1144 | |
| 1145 | // check and wait if throttled |
| 1146 | b.waitIfThrottled() |
| 1147 | |
| 1148 | requestTime := time.Now() |
| 1149 | // Will be decremented in responseReceiver (except error or request with NoResponse) |
| 1150 | b.addRequestInFlightMetrics(1) |
| 1151 | bytes, err := b.write(buf) |
| 1152 | b.updateOutgoingCommunicationMetrics(bytes) |
| 1153 | b.updateProtocolMetrics(rb) |
| 1154 | if err != nil { |
| 1155 | b.addRequestInFlightMetrics(-1) |
| 1156 | return err |
| 1157 | } |
| 1158 | b.correlationID++ |
| 1159 | |
| 1160 | if promise == nil { |
| 1161 | // Record request latency without the response |
| 1162 | b.updateRequestLatencyAndInFlightMetrics(time.Since(requestTime)) |
| 1163 | return nil |
| 1164 | } |
| 1165 | |
| 1166 | promise.requestTime = requestTime |
| 1167 | promise.correlationID = req.correlationID |
| 1168 | b.responses <- promise |
| 1169 | |
| 1170 | return nil |
| 1171 | } |
| 1172 | |
| 1173 | func (b *Broker) sendAndReceive(req protocolBody, res protocolBody) error { |
| 1174 | b.lock.Lock() |
no test coverage detected