The database result backend.
| 73 | |
| 74 | |
| 75 | class DatabaseBackend(BaseBackend): |
| 76 | """The database result backend.""" |
| 77 | |
| 78 | # ResultSet.iterate should sleep this much between each pool, |
| 79 | # to not bombard the database with queries. |
| 80 | subpolling_interval = 0.5 |
| 81 | |
| 82 | task_cls = Task |
| 83 | taskset_cls = TaskSet |
| 84 | |
| 85 | def __init__(self, dburi=None, engine_options=None, url=None, **kwargs): |
| 86 | # The `url` argument was added later and is used by |
| 87 | # the app to set backend by url (celery.app.backends.by_url) |
| 88 | super().__init__(expires_type=maybe_timedelta, |
| 89 | url=url, **kwargs) |
| 90 | conf = self.app.conf |
| 91 | |
| 92 | if self.extended_result: |
| 93 | self.task_cls = TaskExtended |
| 94 | |
| 95 | self.url = url or dburi or conf.database_url |
| 96 | |
| 97 | # Merge engine options: defaults from config <- constructor overrides |
| 98 | # The defaults (pool_pre_ping=True, pool_recycle=3600) are defined in |
| 99 | # celery/app/defaults.py under database_engine_options |
| 100 | self.engine_options = dict( |
| 101 | conf.database_engine_options or {}, |
| 102 | **(engine_options or {}) |
| 103 | ) |
| 104 | self.short_lived_sessions = kwargs.get( |
| 105 | 'short_lived_sessions', |
| 106 | conf.database_short_lived_sessions) |
| 107 | |
| 108 | schemas = conf.database_table_schemas or {} |
| 109 | tablenames = conf.database_table_names or {} |
| 110 | self.task_cls.configure( |
| 111 | schema=schemas.get('task'), |
| 112 | name=tablenames.get('task')) |
| 113 | self.taskset_cls.configure( |
| 114 | schema=schemas.get('group'), |
| 115 | name=tablenames.get('group')) |
| 116 | |
| 117 | if not self.url: |
| 118 | raise ImproperlyConfigured( |
| 119 | 'Missing connection string! Do you have the' |
| 120 | ' database_url setting set to a real value?') |
| 121 | |
| 122 | self.session_manager = SessionManager() |
| 123 | |
| 124 | create_tables_at_setup = conf.database_create_tables_at_setup |
| 125 | if create_tables_at_setup is True: |
| 126 | self._create_tables() |
| 127 | |
| 128 | @property |
| 129 | def extended_result(self): |
| 130 | return self.app.conf.find_value_for_key('extended', 'result') |
| 131 | |
| 132 | def exception_safe_to_retry(self, exc): |
no outgoing calls