| 91 | const APPEND_CHAR_LIMIT = 12000; |
| 92 | |
| 93 | export class NativeMessageStream implements TextStream { |
| 94 | private buffer = ""; |
| 95 | private queue: Promise<void> = Promise.resolve(); |
| 96 | private lastFlushedAt = 0; |
| 97 | private flushTimer: ReturnType<typeof setTimeout> | undefined; |
| 98 | private finished = false; |
| 99 | |
| 100 | /** Current streamed message ts (undefined until the first `startStream`). */ |
| 101 | private curTs: string | undefined; |
| 102 | /** Buffer chars already appended as text to the current message. */ |
| 103 | private curPosted = 0; |
| 104 | /** ts of the first streamed message (for the returned MessageRef). */ |
| 105 | private firstTsValue: string | undefined; |
| 106 | |
| 107 | /** Set once `startStream` has failed and we've fallen back to the legacy transport. */ |
| 108 | private legacy: TextStream | undefined; |
| 109 | /** Set once a chunk append has failed/been refused, so we stop trying. */ |
| 110 | private chunksDisabled = false; |
| 111 | |
| 112 | private readonly transport: NativeStreamTransport; |
| 113 | private readonly makeFallback: () => TextStream; |
| 114 | private readonly onStartFailure: ((err: unknown) => void) | undefined; |
| 115 | private readonly onChunkFailure: ((err: unknown) => void) | undefined; |
| 116 | private readonly minIntervalMs: number; |
| 117 | |
| 118 | constructor(config: NativeMessageStreamConfig) { |
| 119 | this.transport = config.transport; |
| 120 | this.makeFallback = config.fallback; |
| 121 | this.onStartFailure = config.onStartFailure; |
| 122 | this.onChunkFailure = config.onChunkFailure; |
| 123 | this.minIntervalMs = config.minIntervalMs ?? DEFAULT_MIN_INTERVAL_MS; |
| 124 | } |
| 125 | |
| 126 | /** The first streamed message's ts (or the fallback's), available after finish(). */ |
| 127 | get firstTs(): string | undefined { |
| 128 | return this.firstTsValue; |
| 129 | } |
| 130 | |
| 131 | append(fullText: string): void { |
| 132 | if (this.legacy) { |
| 133 | this.legacy.append(fullText); |
| 134 | return; |
| 135 | } |
| 136 | if (fullText === this.buffer) return; |
| 137 | this.buffer = fullText; |
| 138 | this.scheduleFlush(); |
| 139 | } |
| 140 | |
| 141 | /** |
| 142 | * Append a structured chunk (`task_update` / `plan_update` / `blocks`) to the |
| 143 | * streamed message. Flushes any pending text first so the chunk lands AFTER |
| 144 | * the text emitted so far. No-op (firing `onChunkFailure` once) when the |
| 145 | * stream has fallen back to legacy or chunks were already refused. |
| 146 | */ |
| 147 | appendChunk(chunk: AnyChunk): void { |
| 148 | if (this.legacy || this.chunksDisabled) { |
| 149 | if (!this.chunksDisabled) { |
| 150 | this.chunksDisabled = true; |
nothing calls this directly
no test coverage detected
searching dependent graphs…