(ctx context.Context, cfg KeyValueConfig)
| 529 | } |
| 530 | |
| 531 | func (js *jetStream) CreateKeyValue(ctx context.Context, cfg KeyValueConfig) (KeyValue, error) { |
| 532 | scfg, err := js.prepareKeyValueConfig(ctx, cfg) |
| 533 | if err != nil { |
| 534 | return nil, err |
| 535 | } |
| 536 | |
| 537 | stream, err := js.CreateStream(ctx, scfg) |
| 538 | if err != nil { |
| 539 | if errors.Is(err, ErrStreamNameAlreadyInUse) { |
| 540 | // errors are joined so that backwards compatibility is retained |
| 541 | // and previous checks for ErrStreamNameAlreadyInUse will still work. |
| 542 | err = errors.Join(fmt.Errorf("%w: %s", ErrBucketExists, cfg.Bucket), err) |
| 543 | |
| 544 | // If we have a failure to add, it could be because we have |
| 545 | // a config change if the KV was created against before a bug fix |
| 546 | // that changed the value of discard policy. |
| 547 | // We will check if the stream exists and if the only difference |
| 548 | // is the discard policy, we will update the stream. |
| 549 | // The same logic applies for KVs created pre 2.9.x and |
| 550 | // the AllowDirect setting. |
| 551 | if stream, _ = js.Stream(ctx, scfg.Name); stream != nil { |
| 552 | cfg := stream.CachedInfo().Config |
| 553 | cfg.Discard = scfg.Discard |
| 554 | cfg.AllowDirect = scfg.AllowDirect |
| 555 | if reflect.DeepEqual(cfg, scfg) { |
| 556 | stream, err = js.UpdateStream(ctx, scfg) |
| 557 | } |
| 558 | } |
| 559 | } |
| 560 | if err != nil { |
| 561 | return nil, err |
| 562 | } |
| 563 | } |
| 564 | pushJS, err := js.legacyJetStream() |
| 565 | if err != nil { |
| 566 | return nil, err |
| 567 | } |
| 568 | |
| 569 | return mapStreamToKVS(js, pushJS, stream), nil |
| 570 | } |
| 571 | |
| 572 | func (js *jetStream) UpdateKeyValue(ctx context.Context, cfg KeyValueConfig) (KeyValue, error) { |
| 573 | scfg, err := js.prepareKeyValueConfig(ctx, cfg) |
nothing calls this directly
no test coverage detected