(req protocolBody, res protocolBody)
| 1171 | } |
| 1172 | |
| 1173 | func (b *Broker) sendAndReceive(req protocolBody, res protocolBody) error { |
| 1174 | b.lock.Lock() |
| 1175 | defer b.lock.Unlock() |
| 1176 | |
| 1177 | promise, err := b.send(req, res) |
| 1178 | if err != nil { |
| 1179 | b.maybeCloseLocked(err) |
| 1180 | return err |
| 1181 | } |
| 1182 | |
| 1183 | if promise == nil { |
| 1184 | return nil |
| 1185 | } |
| 1186 | |
| 1187 | err = handleResponsePromise(req, res, promise, b.metricRegistry) |
| 1188 | if err != nil { |
| 1189 | b.maybeCloseLocked(err) |
| 1190 | return err |
| 1191 | } |
| 1192 | if res != nil { |
| 1193 | b.handleThrottledResponse(res) |
| 1194 | } |
| 1195 | return nil |
| 1196 | } |
| 1197 | |
| 1198 | // negotiateApiVersion clamps pb's version to the broker's advertised maximum |
| 1199 | // for pb's API (treating pb's current version as the client max). When the |
no test coverage detected