CreateKeyValue will create a KeyValue store with the following configuration.
(cfg *KeyValueConfig)
| 397 | |
| 398 | // CreateKeyValue will create a KeyValue store with the following configuration. |
| 399 | func (js *js) CreateKeyValue(cfg *KeyValueConfig) (KeyValue, error) { |
| 400 | if !js.nc.serverMinVersion(2, 6, 2) { |
| 401 | return nil, errors.New("nats: key-value requires at least server version 2.6.2") |
| 402 | } |
| 403 | if cfg == nil { |
| 404 | return nil, ErrKeyValueConfigRequired |
| 405 | } |
| 406 | if !bucketValid(cfg.Bucket) { |
| 407 | return nil, ErrInvalidBucketName |
| 408 | } |
| 409 | if _, err := js.AccountInfo(); err != nil { |
| 410 | return nil, err |
| 411 | } |
| 412 | |
| 413 | // Default to 1 for history. Max is 64 for now. |
| 414 | history := int64(1) |
| 415 | if cfg.History > 0 { |
| 416 | if cfg.History > KeyValueMaxHistory { |
| 417 | return nil, ErrHistoryToLarge |
| 418 | } |
| 419 | history = int64(cfg.History) |
| 420 | } |
| 421 | |
| 422 | replicas := cfg.Replicas |
| 423 | if replicas == 0 { |
| 424 | replicas = 1 |
| 425 | } |
| 426 | |
| 427 | // We will set explicitly some values so that we can do comparison |
| 428 | // if we get an "already in use" error and need to check if it is same. |
| 429 | maxBytes := cfg.MaxBytes |
| 430 | if maxBytes == 0 { |
| 431 | maxBytes = -1 |
| 432 | } |
| 433 | maxMsgSize := cfg.MaxValueSize |
| 434 | if maxMsgSize == 0 { |
| 435 | maxMsgSize = -1 |
| 436 | } |
| 437 | // When stream's MaxAge is not set, server uses 2 minutes as the default |
| 438 | // for the duplicate window. If MaxAge is set, and lower than 2 minutes, |
| 439 | // then the duplicate window will be set to that. If MaxAge is greater, |
| 440 | // we will cap the duplicate window to 2 minutes (to be consistent with |
| 441 | // previous behavior). |
| 442 | duplicateWindow := 2 * time.Minute |
| 443 | if cfg.TTL > 0 && cfg.TTL < duplicateWindow { |
| 444 | duplicateWindow = cfg.TTL |
| 445 | } |
| 446 | var compression StoreCompression |
| 447 | if cfg.Compression { |
| 448 | compression = S2Compression |
| 449 | } |
| 450 | scfg := &StreamConfig{ |
| 451 | Name: fmt.Sprintf(kvBucketNameTmpl, cfg.Bucket), |
| 452 | Description: cfg.Description, |
| 453 | MaxMsgsPerSubject: history, |
| 454 | MaxBytes: maxBytes, |
| 455 | MaxAge: cfg.TTL, |
| 456 | MaxMsgSize: maxMsgSize, |
nothing calls this directly
no test coverage detected