MCPcopy
hub / github.com/benoitc/gunicorn / run_etl

Function run_etl

examples/celery_alternative/app.py:300–323  ·  view source on GitHub ↗

Run ETL pipeline with streaming progress. Celery equivalent: chain(extract.s(), transform.s(), load.s()).apply_async()

(data: ETLRequest)

Source from the content-addressed store, hash-verified

298
299@app.post("/api/data/etl")
300async def run_etl(data: ETLRequest):
301 """
302 Run ETL pipeline with streaming progress.
303
304 Celery equivalent:
305 chain(extract.s(), transform.s(), load.s()).apply_async()
306 """
307 async def generate():
308 try:
309 client = await get_dirty_client_async()
310 async for progress in client.stream_async(
311 DATA_WORKER,
312 "etl_pipeline",
313 source_data=data.source_data,
314 transformations=data.transformations,
315 ):
316 yield f"data: {json.dumps(progress)}\n\n"
317 except DirtyError as e:
318 yield f"data: {json.dumps({'error': str(e)})}\n\n"
319
320 return StreamingResponse(
321 generate(),
322 media_type="text/event-stream",
323 )
324
325
326@app.post("/api/data/query")

Callers

nothing calls this directly

Calls 1

generateFunction · 0.70

Tested by

no test coverage detected