MCPcopy
hub / github.com/minio/minio-go / putObjectMultipartStreamParallel

Method putObjectMultipartStreamParallel

api-put-object-streaming.go:453–654  ·  view source on GitHub ↗

putObjectMultipartStreamParallel uploads opts.NumThreads parts in parallel. This is expected to take opts.PartSize * opts.NumThreads * (GOGC / 100) bytes of buffer.

(ctx context.Context, bucketName, objectName string,
	reader io.Reader, opts PutObjectOptions,
)

Source from the content-addressed store, hash-verified

451// putObjectMultipartStreamParallel uploads opts.NumThreads parts in parallel.
452// This is expected to take opts.PartSize * opts.NumThreads * (GOGC / 100) bytes of buffer.
453func (c *Client) putObjectMultipartStreamParallel(ctx context.Context, bucketName, objectName string,
454 reader io.Reader, opts PutObjectOptions,
455) (info UploadInfo, err error) {
456 // Input validation.
457 if err = s3utils.CheckValidBucketName(bucketName); err != nil {
458 return UploadInfo{}, err
459 }
460
461 if err = s3utils.CheckValidObjectName(objectName); err != nil {
462 return UploadInfo{}, err
463 }
464
465 // Cancel all when an error occurs.
466 ctx, cancel := context.WithCancel(ctx)
467 defer cancel()
468
469 // Calculate the optimal parts info for a given size.
470 totalPartsCount, partSize, _, err := OptimalPartInfo(-1, opts.PartSize)
471 if err != nil {
472 return UploadInfo{}, err
473 }
474
475 // Initiates a new multipart request
476 uploadID, err := c.newUploadID(ctx, bucketName, objectName, opts)
477 if err != nil {
478 return UploadInfo{}, err
479 }
480
481 // Aborts the multipart upload if the function returns
482 // any error, since we do not resume we should purge
483 // the parts which have been uploaded to relinquish
484 // storage space.
485 defer func() {
486 if err != nil {
487 c.abortMultipartUpload(ctx, bucketName, objectName, uploadID)
488 }
489 }()
490
491 // Create checksums
492 // CRC32C is ~50% faster on AMD64 @ 30GB/s
493 crc := opts.AutoChecksum.Hasher()
494
495 // Total data read and written to server. should be equal to 'size' at the end of the call.
496 var totalUploadedSize int64
497
498 // Initialize parts uploaded map.
499 partsInfo := make(map[int]ObjectPart)
500
501 // Create a buffer.
502 nBuffers := int64(opts.NumThreads)
503 bufs := make(chan []byte, nBuffers)
504 all := make([]byte, nBuffers*partSize)
505 for i := int64(0); i < nBuffers; i++ {
506 bufs <- all[i*partSize : i*partSize+partSize]
507 }
508
509 var wg sync.WaitGroup
510 var mu sync.Mutex

Callers 2

PutObjectMethod · 0.95

Calls 15

newUploadIDMethod · 0.95
abortMultipartUploadMethod · 0.95
uploadPartMethod · 0.95
CheckValidBucketNameFunction · 0.92
CheckValidObjectNameFunction · 0.92
OptimalPartInfoFunction · 0.85
newHookFunction · 0.85
errInvalidArgumentFunction · 0.85
completedPartsTypeAlias · 0.85
applyAutoChecksumFunction · 0.85
HasherMethod · 0.80

Tested by

no test coverage detected