Put will place the contents from the reader into this object-store.
(meta *ObjectMeta, r io.Reader, opts ...ObjectOpt)
| 329 | |
| 330 | // Put will place the contents from the reader into this object-store. |
| 331 | func (obs *obs) Put(meta *ObjectMeta, r io.Reader, opts ...ObjectOpt) (*ObjectInfo, error) { |
| 332 | if meta == nil || meta.Name == "" { |
| 333 | return nil, ErrBadObjectMeta |
| 334 | } |
| 335 | |
| 336 | if meta.Opts == nil { |
| 337 | meta.Opts = &ObjectMetaOptions{ChunkSize: objDefaultChunkSize} |
| 338 | } else if meta.Opts.Link != nil { |
| 339 | return nil, ErrLinkNotAllowed |
| 340 | } else if meta.Opts.ChunkSize == 0 { |
| 341 | meta.Opts.ChunkSize = objDefaultChunkSize |
| 342 | } |
| 343 | |
| 344 | var o objOpts |
| 345 | for _, opt := range opts { |
| 346 | if opt != nil { |
| 347 | if err := opt.configureObject(&o); err != nil { |
| 348 | return nil, err |
| 349 | } |
| 350 | } |
| 351 | } |
| 352 | ctx := o.ctx |
| 353 | |
| 354 | // Create the new nuid so chunks go on a new subject if the name is re-used |
| 355 | newnuid := nuid.Next() |
| 356 | |
| 357 | // These will be used in more than one place |
| 358 | chunkSubj := fmt.Sprintf(objChunksPreTmpl, obs.name, newnuid) |
| 359 | |
| 360 | // Grab existing meta info (einfo). Ok to be found or not found, any other error is a problem |
| 361 | // Chunks on the old nuid can be cleaned up at the end |
| 362 | einfo, err := obs.GetInfo(meta.Name, GetObjectInfoShowDeleted()) // GetInfo will encode the name |
| 363 | if err != nil && err != ErrObjectNotFound { |
| 364 | return nil, err |
| 365 | } |
| 366 | |
| 367 | // For async error handling |
| 368 | var perr error |
| 369 | var mu sync.Mutex |
| 370 | setErr := func(err error) { |
| 371 | mu.Lock() |
| 372 | defer mu.Unlock() |
| 373 | perr = err |
| 374 | } |
| 375 | getErr := func() error { |
| 376 | mu.Lock() |
| 377 | defer mu.Unlock() |
| 378 | return perr |
| 379 | } |
| 380 | |
| 381 | // Create our own JS context to handle errors etc. |
| 382 | jetStream, err := obs.js.nc.JetStream(PublishAsyncErrHandler(func(js JetStream, _ *Msg, err error) { setErr(err) })) |
| 383 | if err != nil { |
| 384 | return nil, err |
| 385 | } |
| 386 | |
| 387 | defer jetStream.(*js).cleanupReplySub() |
| 388 |
no test coverage detected