MCPcopy
hub / github.com/benoitc/gunicorn / send_email

Function send_email

examples/celery_alternative/app.py:118–142  ·  view source on GitHub ↗

Send a single email. Celery equivalent: send_email.delay(to, subject, body) With async dirty client, this doesn't block the event loop! Other requests can be handled while waiting for the task.

(data: EmailRequest)

Source from the content-addressed store, hash-verified

116
117@app.post("/api/email/send")
118async def send_email(data: EmailRequest):
119 """
120 Send a single email.
121
122 Celery equivalent:
123 send_email.delay(to, subject, body)
124
125 With async dirty client, this doesn't block the event loop!
126 Other requests can be handled while waiting for the task.
127 """
128 try:
129 client = await get_dirty_client_async()
130 result = await client.execute_async(
131 EMAIL_WORKER,
132 "send_email",
133 to=data.to,
134 subject=data.subject,
135 body=data.body,
136 html=data.html,
137 )
138 return result
139 except DirtyTimeoutError:
140 raise HTTPException(status_code=504, detail="Task timed out")
141 except DirtyError as e:
142 raise HTTPException(status_code=500, detail=str(e))
143
144
145@app.post("/api/email/send-bulk")

Callers

nothing calls this directly

Calls 2

get_dirty_client_asyncFunction · 0.90
execute_asyncMethod · 0.80

Tested by

no test coverage detected