Send bulk emails with streaming progress. Celery equivalent: result = send_bulk.apply_async([recipients, subject, body]) while not result.ready(): print(result.info) # Progress polling With dirty arbiters, progress is streamed in real-time!
(data: BulkEmailRequest)
| 144 | |
| 145 | @app.post("/api/email/send-bulk") |
| 146 | async def send_bulk_emails(data: BulkEmailRequest): |
| 147 | """ |
| 148 | Send bulk emails with streaming progress. |
| 149 | |
| 150 | Celery equivalent: |
| 151 | result = send_bulk.apply_async([recipients, subject, body]) |
| 152 | while not result.ready(): |
| 153 | print(result.info) # Progress polling |
| 154 | |
| 155 | With dirty arbiters, progress is streamed in real-time! |
| 156 | """ |
| 157 | async def generate(): |
| 158 | try: |
| 159 | client = await get_dirty_client_async() |
| 160 | async for progress in client.stream_async( |
| 161 | EMAIL_WORKER, |
| 162 | "send_bulk_emails", |
| 163 | recipients=data.recipients, |
| 164 | subject=data.subject, |
| 165 | body=data.body, |
| 166 | ): |
| 167 | yield f"data: {json.dumps(progress)}\n\n" |
| 168 | except DirtyError as e: |
| 169 | yield f"data: {json.dumps({'error': str(e)})}\n\n" |
| 170 | |
| 171 | return StreamingResponse( |
| 172 | generate(), |
| 173 | media_type="text/event-stream", |
| 174 | headers={ |
| 175 | "Cache-Control": "no-cache", |
| 176 | "X-Accel-Buffering": "no", |
| 177 | }, |
| 178 | ) |
| 179 | |
| 180 | |
| 181 | @app.get("/api/email/stats") |
nothing calls this directly
no test coverage detected