MCPcopy
hub / github.com/segmentio/kafka-go / roundTrip

Method roundTrip

transport.go:339–443  ·  view source on GitHub ↗
(ctx context.Context, req Request)

Source from the content-addressed store, hash-verified

337}
338
339func (p *connPool) roundTrip(ctx context.Context, req Request) (Response, error) {
340 // This first select should never block after the first metadata response
341 // that would mark the pool as `ready`.
342 select {
343 case <-p.ready:
344 case <-ctx.Done():
345 return nil, ctx.Err()
346 }
347
348 state := p.grabState()
349 var response promise
350
351 switch m := req.(type) {
352 case *meta.Request:
353 // We serve metadata requests directly from the transport cache unless
354 // we would like to auto create a topic that isn't in our cache.
355 //
356 // This reduces the number of round trips to kafka brokers while keeping
357 // the logic simple when applying partitioning strategies.
358 if state.err != nil {
359 return nil, state.err
360 }
361
362 cachedMeta := filterMetadataResponse(m, state.metadata)
363 // requestNeeded indicates if we need to send this metadata request to the server.
364 // It's true when we want to auto-create topics and we don't have the topic in our
365 // cache.
366 var requestNeeded bool
367 if m.AllowAutoTopicCreation {
368 for _, topic := range cachedMeta.Topics {
369 if topic.ErrorCode == int16(UnknownTopicOrPartition) {
370 requestNeeded = true
371 break
372 }
373 }
374 }
375
376 if !requestNeeded {
377 return cachedMeta, nil
378 }
379
380 case protocol.Splitter:
381 // Messages that implement the Splitter interface trigger the creation of
382 // multiple requests that are all merged back into a single results by
383 // a merger.
384 messages, merger, err := m.Split(state.layout)
385 if err != nil {
386 return nil, err
387 }
388 promises := make([]promise, len(messages))
389 for i, m := range messages {
390 promises[i] = p.sendRequest(ctx, m, state)
391 }
392 response = join(promises, messages, merger)
393 }
394
395 if response == nil {
396 response = p.sendRequest(ctx, req, state)

Callers 2

TestIssue672Function · 0.95
TestIssue806Function · 0.95

Calls 9

grabStateMethod · 0.95
sendRequestMethod · 0.95
awaitMethod · 0.95
refreshMetadataMethod · 0.95
filterMetadataResponseFunction · 0.85
joinFunction · 0.85
DoneMethod · 0.80
SplitMethod · 0.65
ErrMethod · 0.45

Tested by 2

TestIssue672Function · 0.76
TestIssue806Function · 0.76