MCPcopy
hub / github.com/nats-io/nats.go / upsertConsumer

Method upsertConsumer

jsm.go:488–567  ·  view source on GitHub ↗
(stream, consumerName string, cfg *ConsumerConfig, opts ...JSOpt)

Source from the content-addressed store, hash-verified

486}
487
488func (js *js) upsertConsumer(stream, consumerName string, cfg *ConsumerConfig, opts ...JSOpt) (*ConsumerInfo, error) {
489 if err := checkStreamName(stream); err != nil {
490 return nil, err
491 }
492 o, cancel, err := getJSContextOpts(js.opts, opts...)
493 if err != nil {
494 return nil, err
495 }
496 if cancel != nil {
497 defer cancel()
498 }
499
500 req, err := json.Marshal(&createConsumerRequest{Stream: stream, Config: cfg})
501 if err != nil {
502 return nil, err
503 }
504
505 var ccSubj string
506 if consumerName == _EMPTY_ {
507 // if consumer name is empty (neither Durable nor Name is set), use the legacy ephemeral endpoint
508 ccSubj = fmt.Sprintf(apiLegacyConsumerCreateT, stream)
509 } else if err := checkConsumerName(consumerName); err != nil {
510 return nil, err
511 } else if js.nc.serverMinVersion(2, 9, 0) {
512 if cfg.Durable != "" && js.opts.featureFlags.useDurableConsumerCreate {
513 // if user set the useDurableConsumerCreate flag, use the legacy DURABLE.CREATE endpoint
514 ccSubj = fmt.Sprintf(apiDurableCreateT, stream, consumerName)
515 } else if cfg.FilterSubject == _EMPTY_ || cfg.FilterSubject == ">" {
516 // if filter subject is empty or ">", use the endpoint without filter subject
517 ccSubj = fmt.Sprintf(apiConsumerCreateT, stream, consumerName)
518 } else {
519 // safeguard against passing invalid filter subject in request subject
520 if cfg.FilterSubject[0] == '.' || cfg.FilterSubject[len(cfg.FilterSubject)-1] == '.' {
521 return nil, fmt.Errorf("%w: %q", ErrInvalidFilterSubject, cfg.FilterSubject)
522 }
523 // if filter subject is not empty, use the endpoint with filter subject
524 ccSubj = fmt.Sprintf(apiConsumerCreateWithFilterSubjectT, stream, consumerName, cfg.FilterSubject)
525 }
526 } else {
527 if cfg.Durable != "" {
528 // if Durable is set, use the DURABLE.CREATE endpoint
529 ccSubj = fmt.Sprintf(apiDurableCreateT, stream, consumerName)
530 } else {
531 // if Durable is not set, use the legacy ephemeral endpoint
532 ccSubj = fmt.Sprintf(apiLegacyConsumerCreateT, stream)
533 }
534 }
535
536 resp, err := js.apiRequestWithContext(o.ctx, js.apiSubj(ccSubj), req)
537 if err != nil {
538 if errors.Is(err, ErrNoResponders) {
539 err = ErrJetStreamNotEnabled
540 }
541 return nil, err
542 }
543 var info consumerResponse
544 err = json.Unmarshal(resp.Data, &info)
545 if err != nil {

Callers 4

subscribeMethod · 0.95
AddConsumerMethod · 0.95
UpdateConsumerMethod · 0.95
resetOrderedConsumerMethod · 0.80

Calls 8

apiRequestWithContextMethod · 0.95
apiSubjMethod · 0.95
checkStreamNameFunction · 0.85
getJSContextOptsFunction · 0.85
checkConsumerNameFunction · 0.85
serverMinVersionMethod · 0.80
ErrorfMethod · 0.80
IsMethod · 0.45

Tested by

no test coverage detected