()
| 733 | } |
| 734 | |
| 735 | private bufferUntilDrained<T>() { |
| 736 | type DrainableSubject<T> = Subject<T> & { drainBuffer: () => void }; |
| 737 | |
| 738 | const subject = new Subject<T>(); |
| 739 | let replayBuffer: ReplaySubject<T> | null = new ReplaySubject<T>(); |
| 740 | let hasDrained = false; |
| 741 | |
| 742 | function drainBuffer(this: DrainableSubject<T>) { |
| 743 | if (hasDrained || !replayBuffer) { |
| 744 | return; |
| 745 | } |
| 746 | hasDrained = true; |
| 747 | |
| 748 | // Replay buffered values to the new subscriber |
| 749 | setImmediate(() => { |
| 750 | const subcription = replayBuffer!.subscribe(subject); |
| 751 | subcription.unsubscribe(); |
| 752 | replayBuffer = null; |
| 753 | }); |
| 754 | } |
| 755 | |
| 756 | return { |
| 757 | subject: new Proxy<DrainableSubject<T>>(subject as DrainableSubject<T>, { |
| 758 | get(target, prop, receiver) { |
| 759 | if (prop === 'asObservable') { |
| 760 | return () => { |
| 761 | const stream = subject.asObservable(); |
| 762 | |
| 763 | // "drainBuffer" will be called before the evaluation of the handler |
| 764 | // but after any enhancers have been applied (e.g., `interceptors`) |
| 765 | Object.defineProperty(stream, drainBuffer.name, { |
| 766 | value: drainBuffer, |
| 767 | }); |
| 768 | return stream; |
| 769 | }; |
| 770 | } |
| 771 | if (hasDrained) { |
| 772 | return Reflect.get(target, prop, receiver); |
| 773 | } |
| 774 | return Reflect.get(replayBuffer!, prop, receiver); |
| 775 | }, |
| 776 | }), |
| 777 | next: (value: T) => { |
| 778 | if (!hasDrained) { |
| 779 | replayBuffer!.next(value); |
| 780 | } |
| 781 | subject.next(value); |
| 782 | }, |
| 783 | error: (err: any) => { |
| 784 | if (!hasDrained) { |
| 785 | replayBuffer!.error(err); |
| 786 | } |
| 787 | subject.error(err); |
| 788 | }, |
| 789 | complete: () => { |
| 790 | if (!hasDrained) { |
| 791 | replayBuffer!.complete(); |
| 792 | // Replay buffer is no longer needed |
no test coverage detected