Send a single email. Equivalent to Celery: @app.task def send_email(to, subject, body): ...
(self, to: str, subject: str, body: str,
html: bool = False)
| 71 | return method(*args, **kwargs) |
| 72 | |
| 73 | def send_email(self, to: str, subject: str, body: str, |
| 74 | html: bool = False) -> dict: |
| 75 | """ |
| 76 | Send a single email. |
| 77 | |
| 78 | Equivalent to Celery: |
| 79 | @app.task |
| 80 | def send_email(to, subject, body): |
| 81 | ... |
| 82 | """ |
| 83 | # Simulate email sending delay |
| 84 | time.sleep(random.uniform(0.1, 0.3)) |
| 85 | |
| 86 | self.emails_sent += 1 |
| 87 | |
| 88 | return { |
| 89 | "status": "sent", |
| 90 | "to": to, |
| 91 | "subject": subject, |
| 92 | "message_id": f"msg-{self.emails_sent}-{int(time.time())}", |
| 93 | "timestamp": datetime.now().isoformat(), |
| 94 | } |
| 95 | |
| 96 | def send_bulk_emails(self, recipients: list, subject: str, |
| 97 | body: str) -> Generator[dict, None, None]: |