MCPcopy
hub / github.com/IBM/sarama / sendInternal

Method sendInternal

broker.go:1124–1171  ·  view source on GitHub ↗

b.lock must be held by caller

(rb protocolBody, promise *responsePromise)

Source from the content-addressed store, hash-verified

1122
1123// b.lock must be held by caller
1124func (b *Broker) sendInternal(rb protocolBody, promise *responsePromise) error {
1125 // try restricting API version to ranges advertised by the broker
1126 if err := restrictApiVersion(rb, b.brokerAPIVersions); err != nil {
1127 return err
1128 }
1129
1130 // response versions must always match their corresponding request's
1131 if promise != nil && promise.response != nil {
1132 promise.response.setVersion(rb.version())
1133 }
1134
1135 if !b.conf.Version.IsAtLeast(rb.requiredVersion()) {
1136 return ErrUnsupportedVersion
1137 }
1138
1139 req := &request{correlationID: b.correlationID, clientID: b.conf.ClientID, body: rb}
1140 buf, err := encode(req, b.metricRegistry)
1141 if err != nil {
1142 return err
1143 }
1144
1145 // check and wait if throttled
1146 b.waitIfThrottled()
1147
1148 requestTime := time.Now()
1149 // Will be decremented in responseReceiver (except error or request with NoResponse)
1150 b.addRequestInFlightMetrics(1)
1151 bytes, err := b.write(buf)
1152 b.updateOutgoingCommunicationMetrics(bytes)
1153 b.updateProtocolMetrics(rb)
1154 if err != nil {
1155 b.addRequestInFlightMetrics(-1)
1156 return err
1157 }
1158 b.correlationID++
1159
1160 if promise == nil {
1161 // Record request latency without the response
1162 b.updateRequestLatencyAndInFlightMetrics(time.Since(requestTime))
1163 return nil
1164 }
1165
1166 promise.requestTime = requestTime
1167 promise.correlationID = req.correlationID
1168 b.responses <- promise
1169
1170 return nil
1171}
1172
1173func (b *Broker) sendAndReceive(req protocolBody, res protocolBody) error {
1174 b.lock.Lock()

Callers 2

sendWithPromiseMethod · 0.95
authenticateViaSASLv1Method · 0.95

Calls 12

waitIfThrottledMethod · 0.95
writeMethod · 0.95
updateProtocolMetricsMethod · 0.95
restrictApiVersionFunction · 0.85
encodeFunction · 0.85
IsAtLeastMethod · 0.80
setVersionMethod · 0.65
versionMethod · 0.65
requiredVersionMethod · 0.65

Tested by

no test coverage detected