putObjectMultipartStreamParallel uploads opts.NumThreads parts in parallel. This is expected to take opts.PartSize * opts.NumThreads * (GOGC / 100) bytes of buffer.
(ctx context.Context, bucketName, objectName string, reader io.Reader, opts PutObjectOptions, )
| 451 | // putObjectMultipartStreamParallel uploads opts.NumThreads parts in parallel. |
| 452 | // This is expected to take opts.PartSize * opts.NumThreads * (GOGC / 100) bytes of buffer. |
| 453 | func (c *Client) putObjectMultipartStreamParallel(ctx context.Context, bucketName, objectName string, |
| 454 | reader io.Reader, opts PutObjectOptions, |
| 455 | ) (info UploadInfo, err error) { |
| 456 | // Input validation. |
| 457 | if err = s3utils.CheckValidBucketName(bucketName); err != nil { |
| 458 | return UploadInfo{}, err |
| 459 | } |
| 460 | |
| 461 | if err = s3utils.CheckValidObjectName(objectName); err != nil { |
| 462 | return UploadInfo{}, err |
| 463 | } |
| 464 | |
| 465 | // Cancel all when an error occurs. |
| 466 | ctx, cancel := context.WithCancel(ctx) |
| 467 | defer cancel() |
| 468 | |
| 469 | // Calculate the optimal parts info for a given size. |
| 470 | totalPartsCount, partSize, _, err := OptimalPartInfo(-1, opts.PartSize) |
| 471 | if err != nil { |
| 472 | return UploadInfo{}, err |
| 473 | } |
| 474 | |
| 475 | // Initiates a new multipart request |
| 476 | uploadID, err := c.newUploadID(ctx, bucketName, objectName, opts) |
| 477 | if err != nil { |
| 478 | return UploadInfo{}, err |
| 479 | } |
| 480 | |
| 481 | // Aborts the multipart upload if the function returns |
| 482 | // any error, since we do not resume we should purge |
| 483 | // the parts which have been uploaded to relinquish |
| 484 | // storage space. |
| 485 | defer func() { |
| 486 | if err != nil { |
| 487 | c.abortMultipartUpload(ctx, bucketName, objectName, uploadID) |
| 488 | } |
| 489 | }() |
| 490 | |
| 491 | // Create checksums |
| 492 | // CRC32C is ~50% faster on AMD64 @ 30GB/s |
| 493 | crc := opts.AutoChecksum.Hasher() |
| 494 | |
| 495 | // Total data read and written to server. should be equal to 'size' at the end of the call. |
| 496 | var totalUploadedSize int64 |
| 497 | |
| 498 | // Initialize parts uploaded map. |
| 499 | partsInfo := make(map[int]ObjectPart) |
| 500 | |
| 501 | // Create a buffer. |
| 502 | nBuffers := int64(opts.NumThreads) |
| 503 | bufs := make(chan []byte, nBuffers) |
| 504 | all := make([]byte, nBuffers*partSize) |
| 505 | for i := int64(0); i < nBuffers; i++ { |
| 506 | bufs <- all[i*partSize : i*partSize+partSize] |
| 507 | } |
| 508 | |
| 509 | var wg sync.WaitGroup |
| 510 | var mu sync.Mutex |
no test coverage detected