(stream, consumerName string, cfg *ConsumerConfig, opts ...JSOpt)
| 486 | } |
| 487 | |
| 488 | func (js *js) upsertConsumer(stream, consumerName string, cfg *ConsumerConfig, opts ...JSOpt) (*ConsumerInfo, error) { |
| 489 | if err := checkStreamName(stream); err != nil { |
| 490 | return nil, err |
| 491 | } |
| 492 | o, cancel, err := getJSContextOpts(js.opts, opts...) |
| 493 | if err != nil { |
| 494 | return nil, err |
| 495 | } |
| 496 | if cancel != nil { |
| 497 | defer cancel() |
| 498 | } |
| 499 | |
| 500 | req, err := json.Marshal(&createConsumerRequest{Stream: stream, Config: cfg}) |
| 501 | if err != nil { |
| 502 | return nil, err |
| 503 | } |
| 504 | |
| 505 | var ccSubj string |
| 506 | if consumerName == _EMPTY_ { |
| 507 | // if consumer name is empty (neither Durable nor Name is set), use the legacy ephemeral endpoint |
| 508 | ccSubj = fmt.Sprintf(apiLegacyConsumerCreateT, stream) |
| 509 | } else if err := checkConsumerName(consumerName); err != nil { |
| 510 | return nil, err |
| 511 | } else if js.nc.serverMinVersion(2, 9, 0) { |
| 512 | if cfg.Durable != "" && js.opts.featureFlags.useDurableConsumerCreate { |
| 513 | // if user set the useDurableConsumerCreate flag, use the legacy DURABLE.CREATE endpoint |
| 514 | ccSubj = fmt.Sprintf(apiDurableCreateT, stream, consumerName) |
| 515 | } else if cfg.FilterSubject == _EMPTY_ || cfg.FilterSubject == ">" { |
| 516 | // if filter subject is empty or ">", use the endpoint without filter subject |
| 517 | ccSubj = fmt.Sprintf(apiConsumerCreateT, stream, consumerName) |
| 518 | } else { |
| 519 | // safeguard against passing invalid filter subject in request subject |
| 520 | if cfg.FilterSubject[0] == '.' || cfg.FilterSubject[len(cfg.FilterSubject)-1] == '.' { |
| 521 | return nil, fmt.Errorf("%w: %q", ErrInvalidFilterSubject, cfg.FilterSubject) |
| 522 | } |
| 523 | // if filter subject is not empty, use the endpoint with filter subject |
| 524 | ccSubj = fmt.Sprintf(apiConsumerCreateWithFilterSubjectT, stream, consumerName, cfg.FilterSubject) |
| 525 | } |
| 526 | } else { |
| 527 | if cfg.Durable != "" { |
| 528 | // if Durable is set, use the DURABLE.CREATE endpoint |
| 529 | ccSubj = fmt.Sprintf(apiDurableCreateT, stream, consumerName) |
| 530 | } else { |
| 531 | // if Durable is not set, use the legacy ephemeral endpoint |
| 532 | ccSubj = fmt.Sprintf(apiLegacyConsumerCreateT, stream) |
| 533 | } |
| 534 | } |
| 535 | |
| 536 | resp, err := js.apiRequestWithContext(o.ctx, js.apiSubj(ccSubj), req) |
| 537 | if err != nil { |
| 538 | if errors.Is(err, ErrNoResponders) { |
| 539 | err = ErrJetStreamNotEnabled |
| 540 | } |
| 541 | return nil, err |
| 542 | } |
| 543 | var info consumerResponse |
| 544 | err = json.Unmarshal(resp.Data, &info) |
| 545 | if err != nil { |
no test coverage detected