MCPcopy
hub / github.com/minio/minio-go / putObjectMultipartStreamFromReadAt

Method putObjectMultipartStreamFromReadAt

api-put-object-streaming.go:95–294  ·  view source on GitHub ↗

putObjectMultipartFromReadAt - Uploads files bigger than 128MiB. Supports all readers which implements io.ReaderAt interface (ReadAt method). NOTE: This function is meant to be used for all readers which implement io.ReaderAt which allows us for resuming multipart uploads but reading at an offset,

(ctx context.Context, bucketName, objectName string,
	reader io.ReaderAt, size int64, opts PutObjectOptions,
)

Source from the content-addressed store, hash-verified

93// cleaned automatically when the caller i.e http client closes the
94// stream after uploading all the contents successfully.
95func (c *Client) putObjectMultipartStreamFromReadAt(ctx context.Context, bucketName, objectName string,
96 reader io.ReaderAt, size int64, opts PutObjectOptions,
97) (info UploadInfo, err error) {
98 // Input validation.
99 if err = s3utils.CheckValidBucketName(bucketName); err != nil {
100 return UploadInfo{}, err
101 }
102 if err = s3utils.CheckValidObjectName(objectName); err != nil {
103 return UploadInfo{}, err
104 }
105
106 // Calculate the optimal parts info for a given size.
107 totalPartsCount, partSize, lastPartSize, err := OptimalPartInfo(size, opts.PartSize)
108 if err != nil {
109 return UploadInfo{}, err
110 }
111
112 // Initiate a new multipart upload.
113 uploadID, err := c.newUploadID(ctx, bucketName, objectName, opts)
114 if err != nil {
115 return UploadInfo{}, err
116 }
117
118 withChecksum := c.trailingHeaderSupport
119
120 // Aborts the multipart upload in progress, if the
121 // function returns any error, since we do not resume
122 // we should purge the parts which have been uploaded
123 // to relinquish storage space.
124 defer func() {
125 if err != nil {
126 c.abortMultipartUpload(ctx, bucketName, objectName, uploadID)
127 }
128 }()
129
130 // Total data read and written to server. should be equal to 'size' at the end of the call.
131 var totalUploadedSize int64
132
133 // Complete multipart upload.
134 var complMultipartUpload completeMultipartUpload
135
136 // Declare a channel that sends the next part number to be uploaded.
137 uploadPartsCh := make(chan uploadPartReq)
138
139 // Declare a channel that sends back the response of a part upload.
140 uploadedPartsCh := make(chan uploadedPartRes)
141
142 // Used for readability, lastPartNumber is always totalPartsCount.
143 lastPartNumber := totalPartsCount
144
145 partitionCtx, partitionCancel := context.WithCancel(ctx)
146 defer partitionCancel()
147 // Send each part number to the channel to be processed.
148 go func() {
149 defer close(uploadPartsCh)
150
151 for p := 1; p <= totalPartsCount; p++ {
152 select {

Callers 1

Calls 15

newUploadIDMethod · 0.95
abortMultipartUploadMethod · 0.95
uploadPartMethod · 0.95
CheckValidBucketNameFunction · 0.92
CheckValidObjectNameFunction · 0.92
OptimalPartInfoFunction · 0.85
newHookFunction · 0.85
newHashReaderWrapperFunction · 0.85
errUnexpectedEOFFunction · 0.85
completedPartsTypeAlias · 0.85
applyAutoChecksumFunction · 0.85

Tested by

no test coverage detected