MCPcopy
hub / github.com/celery/celery / _update

Method _update

celery/backends/elasticsearch.py:167–231  ·  view source on GitHub ↗

Update state in a conflict free manner. If state is defined (not None), this will not update ES server if either: * existing state is success * existing state is a ready state and current state in not a ready state This way, a Retry state cannot override a Success o

(self, id, body, state, **kwargs)

Source from the content-addressed store, hash-verified

165 )
166
167 def _update(self, id, body, state, **kwargs):
168 """Update state in a conflict free manner.
169
170 If state is defined (not None), this will not update ES server if either:
171 * existing state is success
172 * existing state is a ready state and current state in not a ready state
173
174 This way, a Retry state cannot override a Success or Failure, and chord_unlock
175 will not retry indefinitely.
176 """
177 body = {bytes_to_str(k): v for k, v in body.items()}
178
179 try:
180 res_get = self._get(key=id)
181 if not res_get.get('found'):
182 return self._index(id, body, **kwargs)
183 # document disappeared between index and get calls.
184 except elasticsearch.exceptions.NotFoundError:
185 return self._index(id, body, **kwargs)
186
187 try:
188 meta_present_on_backend = self.decode_result(res_get['_source']['result'])
189 except (TypeError, KeyError):
190 pass
191 else:
192 if meta_present_on_backend['status'] == states.SUCCESS:
193 # if stored state is already in success, do nothing
194 return {'result': 'noop'}
195 elif meta_present_on_backend['status'] in states.READY_STATES and state in states.UNREADY_STATES:
196 # if stored state is in ready state and current not, do nothing
197 return {'result': 'noop'}
198
199 # get current sequence number and primary term
200 # https://www.elastic.co/guide/en/elasticsearch/reference/current/optimistic-concurrency-control.html
201 seq_no = res_get.get('_seq_no', 1)
202 prim_term = res_get.get('_primary_term', 1)
203
204 # try to update document with current seq_no and primary_term
205 if self.doc_type:
206 res = self.server.update(
207 id=bytes_to_str(id),
208 index=self.index,
209 doc_type=self.doc_type,
210 body={'doc': body},
211 params={'if_primary_term': prim_term, 'if_seq_no': seq_no},
212 **kwargs
213 )
214 else:
215 res = self.server.update(
216 id=bytes_to_str(id),
217 index=self.index,
218 body={'doc': body},
219 params={'if_primary_term': prim_term, 'if_seq_no': seq_no},
220 **kwargs
221 )
222 # result is elastic search update query result
223 # noop = query did not update any document
224 # updated = at least one document got updated

Callers 1

_set_with_stateMethod · 0.95

Calls 6

_getMethod · 0.95
_indexMethod · 0.95
decode_resultMethod · 0.80
itemsMethod · 0.45
getMethod · 0.45
updateMethod · 0.45

Tested by

no test coverage detected