MCPcopy
hub / github.com/nats-io/nats.go / handleAsyncReply

Method handleAsyncReply

jetstream/publish.go:471–581  ·  view source on GitHub ↗

Handle an async reply from PublishAsync.

(m *nats.Msg)

Source from the content-addressed store, hash-verified

469
470// Handle an async reply from PublishAsync.
471func (js *jetStream) handleAsyncReply(m *nats.Msg) {
472 if len(m.Subject) <= js.opts.replyPrefixLen {
473 return
474 }
475 id := m.Subject[js.opts.replyPrefixLen:]
476
477 js.publisher.Lock()
478
479 paf := js.getPAF(id)
480 if paf == nil {
481 js.publisher.Unlock()
482 return
483 }
484
485 closeStc := func() {
486 // Check on anyone stalled and waiting.
487 if js.publisher.stallCh != nil && len(js.publisher.acks) < js.publisher.maxpa {
488 close(js.publisher.stallCh)
489 js.publisher.stallCh = nil
490 }
491 }
492
493 closeDchFn := func() func() {
494 var dch chan struct{}
495 // Check on anyone one waiting on done status.
496 if js.publisher.doneCh != nil && len(js.publisher.acks) == 0 {
497 dch = js.publisher.doneCh
498 js.publisher.doneCh = nil
499 }
500 // Return function to close done channel which
501 // should be deferred so that error is processed and
502 // can be checked.
503 return func() {
504 if dch != nil {
505 close(dch)
506 }
507 }
508 }
509
510 doErr := func(err error) {
511 paf.err = err
512 if paf.errCh != nil {
513 paf.errCh <- paf.err
514 }
515 cb := js.publisher.asyncPublisherOpts.aecb
516 js.publisher.Unlock()
517 if cb != nil {
518 cb(js, paf.msg, err)
519 }
520 }
521
522 if paf.timeout != nil {
523 paf.timeout.Stop()
524 }
525
526 // Process no responders etc.
527 if len(m.Data) == 0 && m.Header.Get(statusHdr) == statusNoResponders {
528 if paf.retries < paf.maxRetries {

Callers

nothing calls this directly

Calls 4

getPAFMethod · 0.95
PublishMsgAsyncMethod · 0.95
StopMethod · 0.65
GetMethod · 0.65

Tested by

no test coverage detected