MCPcopy
hub / github.com/nats-io/nats.go / newAsyncReply

Method newAsyncReply

js.go:697–737  ·  view source on GitHub ↗
()

Source from the content-addressed store, hash-verified

695const aReplyTokensize = 6
696
697func (js *js) newAsyncReply() string {
698 js.mu.Lock()
699 if js.rsub == nil {
700 // Create our wildcard reply subject.
701 sha := sha256.New()
702 sha.Write([]byte(nuid.Next()))
703 b := sha.Sum(nil)
704 for i := 0; i < aReplyTokensize; i++ {
705 b[i] = rdigits[int(b[i]%base)]
706 }
707 js.rpre = fmt.Sprintf("%s%s.", js.replyPrefix, b[:aReplyTokensize])
708 sub, err := js.nc.Subscribe(fmt.Sprintf("%s*", js.rpre), js.handleAsyncReply)
709 if err != nil {
710 js.mu.Unlock()
711 return _EMPTY_
712 }
713 js.rsub = sub
714 js.rr = rand.New(rand.NewSource(time.Now().UnixNano()))
715 }
716 if js.connStatusCh == nil {
717 js.connStatusCh = js.nc.StatusChanged(RECONNECTING, CLOSED)
718 go js.resetPendingAcksOnReconnect()
719 }
720 var sb strings.Builder
721 sb.WriteString(js.rpre)
722 for {
723 rn := js.rr.Int63()
724 var b [aReplyTokensize]byte
725 for i, l := 0, rn; i < len(b); i++ {
726 b[i] = rdigits[l%base]
727 l /= base
728 }
729 if _, ok := js.pafs[string(b[:])]; ok {
730 continue
731 }
732 sb.Write(b[:])
733 break
734 }
735 js.mu.Unlock()
736 return sb.String()
737}
738
739func (js *js) resetPendingAcksOnReconnect() {
740 js.mu.Lock()

Callers 1

PublishMsgAsyncMethod · 0.95

Calls 6

NextMethod · 0.65
SubscribeMethod · 0.65
WriteMethod · 0.45
StatusChangedMethod · 0.45
StringMethod · 0.45

Tested by

no test coverage detected