Run ETL pipeline with streaming progress. Celery equivalent: chain(extract.s(), transform.s(), load.s()).apply_async()
(data: ETLRequest)
| 298 | |
| 299 | @app.post("/api/data/etl") |
| 300 | async def run_etl(data: ETLRequest): |
| 301 | """ |
| 302 | Run ETL pipeline with streaming progress. |
| 303 | |
| 304 | Celery equivalent: |
| 305 | chain(extract.s(), transform.s(), load.s()).apply_async() |
| 306 | """ |
| 307 | async def generate(): |
| 308 | try: |
| 309 | client = await get_dirty_client_async() |
| 310 | async for progress in client.stream_async( |
| 311 | DATA_WORKER, |
| 312 | "etl_pipeline", |
| 313 | source_data=data.source_data, |
| 314 | transformations=data.transformations, |
| 315 | ): |
| 316 | yield f"data: {json.dumps(progress)}\n\n" |
| 317 | except DirtyError as e: |
| 318 | yield f"data: {json.dumps({'error': str(e)})}\n\n" |
| 319 | |
| 320 | return StreamingResponse( |
| 321 | generate(), |
| 322 | media_type="text/event-stream", |
| 323 | ) |
| 324 | |
| 325 | |
| 326 | @app.post("/api/data/query") |
nothing calls this directly
no test coverage detected