MCPcopy
hub / github.com/CopilotKit/CopilotKit / MessageStream

Class MessageStream

packages/bot-discord/src/message-stream.ts:25–100  ·  view source on GitHub ↗

Source from the content-addressed store, hash-verified

23const DEFAULT_MIN_INTERVAL_MS = 1100;
24
25export class MessageStream {
26 private buffer = "";
27 private posted = "";
28 private queue: Promise<void> = Promise.resolve();
29 private lastFlushedAt = 0;
30 private flushTimer: NodeJS.Timeout | undefined;
31 private readonly minIntervalMs: number;
32 private readonly update: (text: string) => Promise<void>;
33
34 constructor(config: MessageStreamConfig) {
35 this.update = config.update;
36 this.minIntervalMs = config.minIntervalMs ?? DEFAULT_MIN_INTERVAL_MS;
37 }
38
39 /** Replace the in-flight buffer (callers pass the accumulated text). */
40 append(text: string): void {
41 if (text === this.buffer) return;
42 this.buffer = text;
43 this.scheduleFlush();
44 }
45
46 /**
47 * Mark the stream done. Cancels any pending throttled flush, enqueues a
48 * final flush, and resolves once the entire queue (including the final
49 * flush and anything previously in flight) has drained.
50 *
51 * After this resolves, the Discord message reflects the final buffer state.
52 */
53 async finish(): Promise<void> {
54 if (this.flushTimer) {
55 clearTimeout(this.flushTimer);
56 this.flushTimer = undefined;
57 }
58 this.enqueueFlush();
59 await this.queue;
60 }
61
62 private scheduleFlush(): void {
63 if (this.flushTimer) return;
64 const elapsed = Date.now() - this.lastFlushedAt;
65 const delay = Math.max(0, this.minIntervalMs - elapsed);
66 this.flushTimer = setTimeout(() => {
67 this.flushTimer = undefined;
68 this.enqueueFlush();
69 }, delay);
70 }
71
72 private enqueueFlush(): void {
73 this.queue = this.queue.then(() => this.flushNow());
74 }
75
76 private async flushNow(): Promise<void> {
77 if (this.buffer === this.posted) return;
78 const text = this.buffer;
79 try {
80 await this.update(text);
81 // Only mark `text` as delivered once the edit actually succeeds. If we
82 // marked it before the await and update() threw, the guard above would

Callers

nothing calls this directly

Calls 1

resolveMethod · 0.80

Tested by

no test coverage detected

Used in the wild real call sites across dependent graphs

searching dependent graphs…