MCPcopy
hub / github.com/nats-io/nats.go / CreateStream

Method CreateStream

jetstream/jetstream.go:598–659  ·  view source on GitHub ↗

CreateStream creates a new stream with the given config and returns an interface to operate on it. If stream with given name already exists, ErrStreamNameAlreadyInUse is returned.

(ctx context.Context, cfg StreamConfig)

Source from the content-addressed store, hash-verified

596// interface to operate on it. If stream with given name already exists,
597// ErrStreamNameAlreadyInUse is returned.
598func (js *jetStream) CreateStream(ctx context.Context, cfg StreamConfig) (Stream, error) {
599 if err := validateStreamName(cfg.Name); err != nil {
600 return nil, err
601 }
602 ctx, cancel := js.wrapContextWithoutDeadline(ctx)
603 if cancel != nil {
604 defer cancel()
605 }
606
607 ncfg, err := convertStreamConfigDomains(cfg)
608 if err != nil {
609 return nil, err
610 }
611
612 req, err := json.Marshal(ncfg)
613 if err != nil {
614 return nil, err
615 }
616
617 createSubject := fmt.Sprintf(apiStreamCreateT, cfg.Name)
618 var resp streamInfoResponse
619
620 if _, err = js.apiRequestJSON(ctx, createSubject, &resp, req); err != nil {
621 return nil, err
622 }
623 if resp.Error != nil {
624 if resp.Error.ErrorCode == JSErrCodeStreamNameInUse {
625 return nil, ErrStreamNameAlreadyInUse
626 }
627 return nil, resp.Error
628 }
629
630 // check that input subject transform (if used) is reflected in the returned StreamInfo
631 if cfg.SubjectTransform != nil && resp.StreamInfo.Config.SubjectTransform == nil {
632 return nil, ErrStreamSubjectTransformNotSupported
633 }
634
635 if len(cfg.Sources) != 0 {
636 if len(cfg.Sources) != len(resp.Config.Sources) {
637 return nil, ErrStreamSourceNotSupported
638 }
639
640 // the sources list in the response is not ordered
641 cfgNumTransforms := make([]int, len(cfg.Sources))
642 respNumTransforms := make([]int, len(resp.Config.Sources))
643 for i, cfgSource := range cfg.Sources {
644 cfgNumTransforms[i] = len(cfgSource.SubjectTransforms)
645 respNumTransforms[i] = len(resp.Config.Sources[i].SubjectTransforms)
646 }
647 slices.Sort(cfgNumTransforms)
648 slices.Sort(respNumTransforms)
649 if !slices.Equal(cfgNumTransforms, respNumTransforms) {
650 return nil, ErrStreamSubjectTransformNotSupported
651 }
652 }
653
654 return &stream{
655 js: js,

Callers 3

CreateKeyValueMethod · 0.95
CreateOrUpdateStreamMethod · 0.95
CreateObjectStoreMethod · 0.95

Calls 5

apiRequestJSONMethod · 0.95
validateStreamNameFunction · 0.85
EqualMethod · 0.80

Tested by

no test coverage detected