putObjectMultipartFromReadAt - Uploads files bigger than 128MiB. Supports all readers which implements io.ReaderAt interface (ReadAt method). NOTE: This function is meant to be used for all readers which implement io.ReaderAt which allows us for resuming multipart uploads but reading at an offset,
(ctx context.Context, bucketName, objectName string, reader io.ReaderAt, size int64, opts PutObjectOptions, )
| 93 | // cleaned automatically when the caller i.e http client closes the |
| 94 | // stream after uploading all the contents successfully. |
| 95 | func (c *Client) putObjectMultipartStreamFromReadAt(ctx context.Context, bucketName, objectName string, |
| 96 | reader io.ReaderAt, size int64, opts PutObjectOptions, |
| 97 | ) (info UploadInfo, err error) { |
| 98 | // Input validation. |
| 99 | if err = s3utils.CheckValidBucketName(bucketName); err != nil { |
| 100 | return UploadInfo{}, err |
| 101 | } |
| 102 | if err = s3utils.CheckValidObjectName(objectName); err != nil { |
| 103 | return UploadInfo{}, err |
| 104 | } |
| 105 | |
| 106 | // Calculate the optimal parts info for a given size. |
| 107 | totalPartsCount, partSize, lastPartSize, err := OptimalPartInfo(size, opts.PartSize) |
| 108 | if err != nil { |
| 109 | return UploadInfo{}, err |
| 110 | } |
| 111 | |
| 112 | // Initiate a new multipart upload. |
| 113 | uploadID, err := c.newUploadID(ctx, bucketName, objectName, opts) |
| 114 | if err != nil { |
| 115 | return UploadInfo{}, err |
| 116 | } |
| 117 | |
| 118 | withChecksum := c.trailingHeaderSupport |
| 119 | |
| 120 | // Aborts the multipart upload in progress, if the |
| 121 | // function returns any error, since we do not resume |
| 122 | // we should purge the parts which have been uploaded |
| 123 | // to relinquish storage space. |
| 124 | defer func() { |
| 125 | if err != nil { |
| 126 | c.abortMultipartUpload(ctx, bucketName, objectName, uploadID) |
| 127 | } |
| 128 | }() |
| 129 | |
| 130 | // Total data read and written to server. should be equal to 'size' at the end of the call. |
| 131 | var totalUploadedSize int64 |
| 132 | |
| 133 | // Complete multipart upload. |
| 134 | var complMultipartUpload completeMultipartUpload |
| 135 | |
| 136 | // Declare a channel that sends the next part number to be uploaded. |
| 137 | uploadPartsCh := make(chan uploadPartReq) |
| 138 | |
| 139 | // Declare a channel that sends back the response of a part upload. |
| 140 | uploadedPartsCh := make(chan uploadedPartRes) |
| 141 | |
| 142 | // Used for readability, lastPartNumber is always totalPartsCount. |
| 143 | lastPartNumber := totalPartsCount |
| 144 | |
| 145 | partitionCtx, partitionCancel := context.WithCancel(ctx) |
| 146 | defer partitionCancel() |
| 147 | // Send each part number to the channel to be processed. |
| 148 | go func() { |
| 149 | defer close(uploadPartsCh) |
| 150 | |
| 151 | for p := 1; p <= totalPartsCount; p++ { |
| 152 | select { |
no test coverage detected