MCPcopy
hub / github.com/segmentio/kafka-go / run

Method run

transport.go:1254–1271  ·  view source on GitHub ↗
(pc *protocol.Conn, reqs <-chan connRequest)

Source from the content-addressed store, hash-verified

1252}
1253
1254func (c *conn) run(pc *protocol.Conn, reqs <-chan connRequest) {
1255 defer pc.Close()
1256
1257 for cr := range reqs {
1258 r, err := c.roundTrip(cr.ctx, pc, cr.req)
1259 if err != nil {
1260 cr.res.reject(err)
1261 if !errors.Is(err, protocol.ErrNoRecord) {
1262 break
1263 }
1264 } else {
1265 cr.res.resolve(r)
1266 }
1267 if !c.group.releaseConn(c) {
1268 break
1269 }
1270 }
1271}
1272
1273func (c *conn) roundTrip(ctx context.Context, pc *protocol.Conn, req Request) (Response, error) {
1274 pprof.SetGoroutineLabels(ctx)

Callers 1

connectMethod · 0.95

Calls 5

roundTripMethod · 0.95
rejectMethod · 0.80
resolveMethod · 0.80
releaseConnMethod · 0.80
CloseMethod · 0.45

Tested by

no test coverage detected