MCPcopy
hub / github.com/celery/celery / DatabaseBackend

Class DatabaseBackend

celery/backends/database/__init__.py:75–261  ·  view source on GitHub ↗

The database result backend.

Source from the content-addressed store, hash-verified

73
74
75class DatabaseBackend(BaseBackend):
76 """The database result backend."""
77
78 # ResultSet.iterate should sleep this much between each pool,
79 # to not bombard the database with queries.
80 subpolling_interval = 0.5
81
82 task_cls = Task
83 taskset_cls = TaskSet
84
85 def __init__(self, dburi=None, engine_options=None, url=None, **kwargs):
86 # The `url` argument was added later and is used by
87 # the app to set backend by url (celery.app.backends.by_url)
88 super().__init__(expires_type=maybe_timedelta,
89 url=url, **kwargs)
90 conf = self.app.conf
91
92 if self.extended_result:
93 self.task_cls = TaskExtended
94
95 self.url = url or dburi or conf.database_url
96
97 # Merge engine options: defaults from config <- constructor overrides
98 # The defaults (pool_pre_ping=True, pool_recycle=3600) are defined in
99 # celery/app/defaults.py under database_engine_options
100 self.engine_options = dict(
101 conf.database_engine_options or {},
102 **(engine_options or {})
103 )
104 self.short_lived_sessions = kwargs.get(
105 'short_lived_sessions',
106 conf.database_short_lived_sessions)
107
108 schemas = conf.database_table_schemas or {}
109 tablenames = conf.database_table_names or {}
110 self.task_cls.configure(
111 schema=schemas.get('task'),
112 name=tablenames.get('task'))
113 self.taskset_cls.configure(
114 schema=schemas.get('group'),
115 name=tablenames.get('group'))
116
117 if not self.url:
118 raise ImproperlyConfigured(
119 'Missing connection string! Do you have the'
120 ' database_url setting set to a real value?')
121
122 self.session_manager = SessionManager()
123
124 create_tables_at_setup = conf.database_create_tables_at_setup
125 if create_tables_at_setup is True:
126 self._create_tables()
127
128 @property
129 def extended_result(self):
130 return self.app.conf.find_value_for_key('extended', 'result')
131
132 def exception_safe_to_retry(self, exc):

Calls

no outgoing calls