MCPcopy
hub / github.com/mongodb/node-mongodb-native / doWrite

Function doWrite

src/gridfs/upload.ts:410–489  ·  view source on GitHub ↗
(
  stream: GridFSBucketWriteStream,
  chunk: Uint8Array | string,
  encoding: BufferEncoding,
  callback: Callback<void>
)

Source from the content-addressed store, hash-verified

408}
409
410function doWrite(
411 stream: GridFSBucketWriteStream,
412 chunk: Uint8Array | string,
413 encoding: BufferEncoding,
414 callback: Callback<void>
415): void {
416 if (isAborted(stream, callback)) {
417 return;
418 }
419
420 const inputBuf =
421 typeof chunk === 'string' ? ByteUtils.fromUTF8(chunk) : ByteUtils.toLocalBufferType(chunk);
422
423 stream.length += inputBuf.length;
424
425 // Input is small enough to fit in our buffer
426 if (stream.pos + inputBuf.length < stream.chunkSizeBytes) {
427 ByteUtils.copy(inputBuf, stream.bufToStore, stream.pos);
428 stream.pos += inputBuf.length;
429 queueMicrotask(callback);
430 return;
431 }
432
433 // Otherwise, buffer is too big for current chunk, so we need to flush
434 // to MongoDB.
435 let inputBufRemaining = inputBuf.length;
436 let spaceRemaining: number = stream.chunkSizeBytes - stream.pos;
437 let numToCopy = Math.min(spaceRemaining, inputBuf.length);
438 let outstandingRequests = 0;
439 while (inputBufRemaining > 0) {
440 const inputBufPos = inputBuf.length - inputBufRemaining;
441 ByteUtils.copy(inputBuf, stream.bufToStore, stream.pos, inputBufPos, inputBufPos + numToCopy);
442 stream.pos += numToCopy;
443 spaceRemaining -= numToCopy;
444 let doc: GridFSChunk;
445 if (spaceRemaining === 0) {
446 doc = createChunkDoc(stream.id, stream.n, new Uint8Array(stream.bufToStore));
447
448 const remainingTimeMS = stream.timeoutContext?.remainingTimeMS;
449 if (remainingTimeMS != null && remainingTimeMS <= 0) {
450 return handleError(
451 stream,
452 new MongoOperationTimeoutError(
453 `Upload timed out after ${stream.timeoutContext?.timeoutMS}ms`
454 ),
455 callback
456 );
457 }
458
459 ++stream.state.outstandingRequests;
460 ++outstandingRequests;
461
462 if (isAborted(stream, callback)) {
463 return;
464 }
465
466 stream.chunks
467 .insertOne(doc, { writeConcern: stream.writeConcern, timeoutMS: remainingTimeMS })

Callers 1

_writeMethod · 0.85

Calls 6

isAbortedFunction · 0.85
createChunkDocFunction · 0.85
checkDoneFunction · 0.85
insertOneMethod · 0.80
handleErrorFunction · 0.70
minMethod · 0.45

Tested by

no test coverage detected