| 198 | |
| 199 | |
| 200 | class _StreamingFetcher: |
| 201 | def __init__(self) -> None: |
| 202 | # make web-worker and data buffer on startup |
| 203 | self.streaming_ready = False |
| 204 | streaming_worker_code = ( |
| 205 | files(__package__) |
| 206 | .joinpath("emscripten_fetch_worker.js") |
| 207 | .read_text(encoding="utf-8") |
| 208 | ) |
| 209 | js_data_blob = js.Blob.new( |
| 210 | to_js([streaming_worker_code], create_pyproxies=False), |
| 211 | _obj_from_dict({"type": "application/javascript"}), |
| 212 | ) |
| 213 | |
| 214 | def promise_resolver(js_resolve_fn: JsProxy, js_reject_fn: JsProxy) -> None: |
| 215 | def onMsg(e: JsProxy) -> None: |
| 216 | self.streaming_ready = True |
| 217 | js_resolve_fn(e) |
| 218 | |
| 219 | def onErr(e: JsProxy) -> None: |
| 220 | js_reject_fn(e) # Defensive: never happens in ci |
| 221 | |
| 222 | self.js_worker.onmessage = onMsg |
| 223 | self.js_worker.onerror = onErr |
| 224 | |
| 225 | js_data_url = js.URL.createObjectURL(js_data_blob) |
| 226 | self.js_worker = js.globalThis.Worker.new(js_data_url) |
| 227 | self.js_worker_ready_promise = js.globalThis.Promise.new(promise_resolver) |
| 228 | |
| 229 | def send(self, request: EmscriptenRequest) -> EmscriptenResponse: |
| 230 | headers = { |
| 231 | k: v for k, v in request.headers.items() if k not in HEADERS_TO_IGNORE |
| 232 | } |
| 233 | |
| 234 | body = request.body |
| 235 | fetch_data = {"headers": headers, "body": to_js(body), "method": request.method} |
| 236 | # start the request off in the worker |
| 237 | timeout = int(1000 * request.timeout) if request.timeout > 0 else None |
| 238 | js_shared_buffer = js.SharedArrayBuffer.new(1048576) |
| 239 | js_int_buffer = js.Int32Array.new(js_shared_buffer) |
| 240 | js_byte_buffer = js.Uint8Array.new(js_shared_buffer, 8) |
| 241 | |
| 242 | js.Atomics.store(js_int_buffer, 0, ERROR_TIMEOUT) |
| 243 | js.Atomics.notify(js_int_buffer, 0) |
| 244 | js_absolute_url = js.URL.new(request.url, js.location).href |
| 245 | self.js_worker.postMessage( |
| 246 | _obj_from_dict( |
| 247 | { |
| 248 | "buffer": js_shared_buffer, |
| 249 | "url": js_absolute_url, |
| 250 | "fetchParams": fetch_data, |
| 251 | } |
| 252 | ) |
| 253 | ) |
| 254 | # wait for the worker to send something |
| 255 | js.Atomics.wait(js_int_buffer, 0, ERROR_TIMEOUT, timeout) |
| 256 | if js_int_buffer[0] == ERROR_TIMEOUT: |
| 257 | raise _TimeoutError( |