MCPcopy
hub / github.com/nestjs/nest / bufferUntilDrained

Method bufferUntilDrained

packages/microservices/server/server-grpc.ts:735–806  ·  view source on GitHub ↗
()

Source from the content-addressed store, hash-verified

733 }
734
735 private bufferUntilDrained<T>() {
736 type DrainableSubject<T> = Subject<T> & { drainBuffer: () => void };
737
738 const subject = new Subject<T>();
739 let replayBuffer: ReplaySubject<T> | null = new ReplaySubject<T>();
740 let hasDrained = false;
741
742 function drainBuffer(this: DrainableSubject<T>) {
743 if (hasDrained || !replayBuffer) {
744 return;
745 }
746 hasDrained = true;
747
748 // Replay buffered values to the new subscriber
749 setImmediate(() => {
750 const subcription = replayBuffer!.subscribe(subject);
751 subcription.unsubscribe();
752 replayBuffer = null;
753 });
754 }
755
756 return {
757 subject: new Proxy<DrainableSubject<T>>(subject as DrainableSubject<T>, {
758 get(target, prop, receiver) {
759 if (prop === 'asObservable') {
760 return () => {
761 const stream = subject.asObservable();
762
763 // "drainBuffer" will be called before the evaluation of the handler
764 // but after any enhancers have been applied (e.g., `interceptors`)
765 Object.defineProperty(stream, drainBuffer.name, {
766 value: drainBuffer,
767 });
768 return stream;
769 };
770 }
771 if (hasDrained) {
772 return Reflect.get(target, prop, receiver);
773 }
774 return Reflect.get(replayBuffer!, prop, receiver);
775 },
776 }),
777 next: (value: T) => {
778 if (!hasDrained) {
779 replayBuffer!.next(value);
780 }
781 subject.next(value);
782 },
783 error: (err: any) => {
784 if (!hasDrained) {
785 replayBuffer!.error(err);
786 }
787 subject.error(err);
788 },
789 complete: () => {
790 if (!hasDrained) {
791 replayBuffer!.complete();
792 // Replay buffer is no longer needed

Callers 1

Calls 3

errorMethod · 0.65
nextMethod · 0.45
completeMethod · 0.45

Tested by

no test coverage detected