MCPcopy
hub / github.com/nats-io/nats.go / newAsyncReply

Method newAsyncReply

jetstream/publish.go:427–468  ·  view source on GitHub ↗
()

Source from the content-addressed store, hash-verified

425)
426
427func (js *jetStream) newAsyncReply() (string, error) {
428 js.publisher.Lock()
429 if js.publisher.replySub == nil {
430 // Create our wildcard reply subject.
431 sha := sha256.New()
432 sha.Write([]byte(nuid.Next()))
433 b := sha.Sum(nil)
434 for i := 0; i < aReplyTokensize; i++ {
435 b[i] = rdigits[int(b[i]%base)]
436 }
437 js.publisher.replyPrefix = fmt.Sprintf("%s%s.", js.opts.replyPrefix, b[:aReplyTokensize])
438 sub, err := js.conn.Subscribe(fmt.Sprintf("%s*", js.publisher.replyPrefix), js.handleAsyncReply)
439 if err != nil {
440 js.publisher.Unlock()
441 return "", err
442 }
443 js.publisher.replySub = sub
444 js.publisher.rr = rand.New(rand.NewSource(time.Now().UnixNano()))
445 }
446 if js.publisher.connStatusCh == nil {
447 js.publisher.connStatusCh = js.conn.StatusChanged(nats.RECONNECTING, nats.CLOSED)
448 go js.resetPendingAcksOnReconnect()
449 }
450 var sb strings.Builder
451 sb.WriteString(js.publisher.replyPrefix)
452 for {
453 rn := js.publisher.rr.Int63()
454 var b [aReplyTokensize]byte
455 for i, l := 0, rn; i < len(b); i++ {
456 b[i] = rdigits[l%base]
457 l /= base
458 }
459 if _, ok := js.publisher.acks[string(b[:])]; ok {
460 continue
461 }
462 sb.Write(b[:])
463 break
464 }
465
466 js.publisher.Unlock()
467 return sb.String(), nil
468}
469
470// Handle an async reply from PublishAsync.
471func (js *jetStream) handleAsyncReply(m *nats.Msg) {

Callers 1

PublishMsgAsyncMethod · 0.95

Calls 6

NextMethod · 0.65
SubscribeMethod · 0.65
WriteMethod · 0.45
StatusChangedMethod · 0.45
StringMethod · 0.45

Tested by

no test coverage detected