CreateStream creates a new stream with the given config and returns an interface to operate on it. If stream with given name already exists, ErrStreamNameAlreadyInUse is returned.
(ctx context.Context, cfg StreamConfig)
| 596 | // interface to operate on it. If stream with given name already exists, |
| 597 | // ErrStreamNameAlreadyInUse is returned. |
| 598 | func (js *jetStream) CreateStream(ctx context.Context, cfg StreamConfig) (Stream, error) { |
| 599 | if err := validateStreamName(cfg.Name); err != nil { |
| 600 | return nil, err |
| 601 | } |
| 602 | ctx, cancel := js.wrapContextWithoutDeadline(ctx) |
| 603 | if cancel != nil { |
| 604 | defer cancel() |
| 605 | } |
| 606 | |
| 607 | ncfg, err := convertStreamConfigDomains(cfg) |
| 608 | if err != nil { |
| 609 | return nil, err |
| 610 | } |
| 611 | |
| 612 | req, err := json.Marshal(ncfg) |
| 613 | if err != nil { |
| 614 | return nil, err |
| 615 | } |
| 616 | |
| 617 | createSubject := fmt.Sprintf(apiStreamCreateT, cfg.Name) |
| 618 | var resp streamInfoResponse |
| 619 | |
| 620 | if _, err = js.apiRequestJSON(ctx, createSubject, &resp, req); err != nil { |
| 621 | return nil, err |
| 622 | } |
| 623 | if resp.Error != nil { |
| 624 | if resp.Error.ErrorCode == JSErrCodeStreamNameInUse { |
| 625 | return nil, ErrStreamNameAlreadyInUse |
| 626 | } |
| 627 | return nil, resp.Error |
| 628 | } |
| 629 | |
| 630 | // check that input subject transform (if used) is reflected in the returned StreamInfo |
| 631 | if cfg.SubjectTransform != nil && resp.StreamInfo.Config.SubjectTransform == nil { |
| 632 | return nil, ErrStreamSubjectTransformNotSupported |
| 633 | } |
| 634 | |
| 635 | if len(cfg.Sources) != 0 { |
| 636 | if len(cfg.Sources) != len(resp.Config.Sources) { |
| 637 | return nil, ErrStreamSourceNotSupported |
| 638 | } |
| 639 | |
| 640 | // the sources list in the response is not ordered |
| 641 | cfgNumTransforms := make([]int, len(cfg.Sources)) |
| 642 | respNumTransforms := make([]int, len(resp.Config.Sources)) |
| 643 | for i, cfgSource := range cfg.Sources { |
| 644 | cfgNumTransforms[i] = len(cfgSource.SubjectTransforms) |
| 645 | respNumTransforms[i] = len(resp.Config.Sources[i].SubjectTransforms) |
| 646 | } |
| 647 | slices.Sort(cfgNumTransforms) |
| 648 | slices.Sort(respNumTransforms) |
| 649 | if !slices.Equal(cfgNumTransforms, respNumTransforms) { |
| 650 | return nil, ErrStreamSubjectTransformNotSupported |
| 651 | } |
| 652 | } |
| 653 | |
| 654 | return &stream{ |
| 655 | js: js, |
no test coverage detected