MCPcopy
hub / github.com/mongodb/node-mongodb-native / onData

Function onData

src/cmap/wire_protocol/on_data.ts:23–139  ·  view source on GitHub ↗
(
  emitter: EventEmitter,
  { timeoutContext, signal }: { timeoutContext?: TimeoutContext } & Abortable
)

Source from the content-addressed store, hash-verified

21 * It will reject upon an error event.
22 */
23export function onData(
24 emitter: EventEmitter,
25 { timeoutContext, signal }: { timeoutContext?: TimeoutContext } & Abortable
26) {
27 signal?.throwIfAborted();
28
29 // Setup pending events and pending promise lists
30 /**
31 * When the caller has not yet called .next(), we store the
32 * value from the event in this list. Next time they call .next()
33 * we pull the first value out of this list and resolve a promise with it.
34 */
35 const unconsumedEvents = new List<Uint8Array>();
36 /**
37 * When there has not yet been an event, a new promise will be created
38 * and implicitly stored in this list. When an event occurs we take the first
39 * promise in this list and resolve it.
40 */
41 const unconsumedPromises = new List<PendingPromises>();
42
43 /**
44 * Stored an error created by an error event.
45 * This error will turn into a rejection for the subsequent .next() call
46 */
47 let error: Error | null = null;
48
49 /** Set to true only after event listeners have been removed. */
50 let finished = false;
51
52 const iterator: AsyncGenerator<Uint8Array> & AsyncDisposable = {
53 next() {
54 // First, we consume all unread events
55 const value = unconsumedEvents.shift();
56 if (value != null) {
57 return Promise.resolve({ value, done: false });
58 }
59
60 // Then we error, if an error happened
61 // This happens one time if at all, because after 'error'
62 // we stop listening
63 if (error != null) {
64 const p = Promise.reject(error);
65 // Only the first element errors
66 error = null;
67 return p;
68 }
69
70 // If the iterator is finished, resolve to done
71 if (finished) return closeHandler();
72
73 // Wait until an event happens
74 const { promise, resolve, reject } = promiseWithResolvers<IteratorResult<Uint8Array>>();
75 unconsumedPromises.push({ resolve, reject });
76 return promise;
77 },
78
79 return() {
80 return closeHandler();

Callers 1

readManyMethod · 0.90

Calls 5

addAbortListenerFunction · 0.90
errorHandlerFunction · 0.85
throwIfAbortedMethod · 0.80
onMethod · 0.80
throwIfExpiredMethod · 0.80

Tested by

no test coverage detected