AddConsumer adds a consumer to a stream. If the consumer already exists, and the configuration is the same, it will return the existing consumer. If the consumer already exists, and the configuration is different, it will return ErrConsumerNameAlreadyInUse.
(stream string, cfg *ConsumerConfig, opts ...JSOpt)
| 445 | // If the consumer already exists, and the configuration is different, it |
| 446 | // will return ErrConsumerNameAlreadyInUse. |
| 447 | func (js *js) AddConsumer(stream string, cfg *ConsumerConfig, opts ...JSOpt) (*ConsumerInfo, error) { |
| 448 | if cfg == nil { |
| 449 | cfg = &ConsumerConfig{} |
| 450 | } |
| 451 | consumerName := cfg.Name |
| 452 | if consumerName == _EMPTY_ { |
| 453 | consumerName = cfg.Durable |
| 454 | } |
| 455 | if consumerName != _EMPTY_ { |
| 456 | consInfo, err := js.ConsumerInfo(stream, consumerName, opts...) |
| 457 | if err != nil && !errors.Is(err, ErrConsumerNotFound) && !errors.Is(err, ErrStreamNotFound) { |
| 458 | return nil, err |
| 459 | } |
| 460 | |
| 461 | if consInfo != nil { |
| 462 | sameConfig := checkConfig(&consInfo.Config, cfg) |
| 463 | if sameConfig != nil { |
| 464 | return nil, fmt.Errorf("%w: creating consumer %q on stream %q", ErrConsumerNameAlreadyInUse, consumerName, stream) |
| 465 | } else { |
| 466 | return consInfo, nil |
| 467 | } |
| 468 | } |
| 469 | } |
| 470 | |
| 471 | return js.upsertConsumer(stream, consumerName, cfg, opts...) |
| 472 | } |
| 473 | |
| 474 | func (js *js) UpdateConsumer(stream string, cfg *ConsumerConfig, opts ...JSOpt) (*ConsumerInfo, error) { |
| 475 | if cfg == nil { |
nothing calls this directly
no test coverage detected