()
| 425 | ) |
| 426 | |
| 427 | func (js *jetStream) newAsyncReply() (string, error) { |
| 428 | js.publisher.Lock() |
| 429 | if js.publisher.replySub == nil { |
| 430 | // Create our wildcard reply subject. |
| 431 | sha := sha256.New() |
| 432 | sha.Write([]byte(nuid.Next())) |
| 433 | b := sha.Sum(nil) |
| 434 | for i := 0; i < aReplyTokensize; i++ { |
| 435 | b[i] = rdigits[int(b[i]%base)] |
| 436 | } |
| 437 | js.publisher.replyPrefix = fmt.Sprintf("%s%s.", js.opts.replyPrefix, b[:aReplyTokensize]) |
| 438 | sub, err := js.conn.Subscribe(fmt.Sprintf("%s*", js.publisher.replyPrefix), js.handleAsyncReply) |
| 439 | if err != nil { |
| 440 | js.publisher.Unlock() |
| 441 | return "", err |
| 442 | } |
| 443 | js.publisher.replySub = sub |
| 444 | js.publisher.rr = rand.New(rand.NewSource(time.Now().UnixNano())) |
| 445 | } |
| 446 | if js.publisher.connStatusCh == nil { |
| 447 | js.publisher.connStatusCh = js.conn.StatusChanged(nats.RECONNECTING, nats.CLOSED) |
| 448 | go js.resetPendingAcksOnReconnect() |
| 449 | } |
| 450 | var sb strings.Builder |
| 451 | sb.WriteString(js.publisher.replyPrefix) |
| 452 | for { |
| 453 | rn := js.publisher.rr.Int63() |
| 454 | var b [aReplyTokensize]byte |
| 455 | for i, l := 0, rn; i < len(b); i++ { |
| 456 | b[i] = rdigits[l%base] |
| 457 | l /= base |
| 458 | } |
| 459 | if _, ok := js.publisher.acks[string(b[:])]; ok { |
| 460 | continue |
| 461 | } |
| 462 | sb.Write(b[:]) |
| 463 | break |
| 464 | } |
| 465 | |
| 466 | js.publisher.Unlock() |
| 467 | return sb.String(), nil |
| 468 | } |
| 469 | |
| 470 | // Handle an async reply from PublishAsync. |
| 471 | func (js *jetStream) handleAsyncReply(m *nats.Msg) { |
no test coverage detected