Update state in a conflict free manner. If state is defined (not None), this will not update ES server if either: * existing state is success * existing state is a ready state and current state in not a ready state This way, a Retry state cannot override a Success o
(self, id, body, state, **kwargs)
| 165 | ) |
| 166 | |
| 167 | def _update(self, id, body, state, **kwargs): |
| 168 | """Update state in a conflict free manner. |
| 169 | |
| 170 | If state is defined (not None), this will not update ES server if either: |
| 171 | * existing state is success |
| 172 | * existing state is a ready state and current state in not a ready state |
| 173 | |
| 174 | This way, a Retry state cannot override a Success or Failure, and chord_unlock |
| 175 | will not retry indefinitely. |
| 176 | """ |
| 177 | body = {bytes_to_str(k): v for k, v in body.items()} |
| 178 | |
| 179 | try: |
| 180 | res_get = self._get(key=id) |
| 181 | if not res_get.get('found'): |
| 182 | return self._index(id, body, **kwargs) |
| 183 | # document disappeared between index and get calls. |
| 184 | except elasticsearch.exceptions.NotFoundError: |
| 185 | return self._index(id, body, **kwargs) |
| 186 | |
| 187 | try: |
| 188 | meta_present_on_backend = self.decode_result(res_get['_source']['result']) |
| 189 | except (TypeError, KeyError): |
| 190 | pass |
| 191 | else: |
| 192 | if meta_present_on_backend['status'] == states.SUCCESS: |
| 193 | # if stored state is already in success, do nothing |
| 194 | return {'result': 'noop'} |
| 195 | elif meta_present_on_backend['status'] in states.READY_STATES and state in states.UNREADY_STATES: |
| 196 | # if stored state is in ready state and current not, do nothing |
| 197 | return {'result': 'noop'} |
| 198 | |
| 199 | # get current sequence number and primary term |
| 200 | # https://www.elastic.co/guide/en/elasticsearch/reference/current/optimistic-concurrency-control.html |
| 201 | seq_no = res_get.get('_seq_no', 1) |
| 202 | prim_term = res_get.get('_primary_term', 1) |
| 203 | |
| 204 | # try to update document with current seq_no and primary_term |
| 205 | if self.doc_type: |
| 206 | res = self.server.update( |
| 207 | id=bytes_to_str(id), |
| 208 | index=self.index, |
| 209 | doc_type=self.doc_type, |
| 210 | body={'doc': body}, |
| 211 | params={'if_primary_term': prim_term, 'if_seq_no': seq_no}, |
| 212 | **kwargs |
| 213 | ) |
| 214 | else: |
| 215 | res = self.server.update( |
| 216 | id=bytes_to_str(id), |
| 217 | index=self.index, |
| 218 | body={'doc': body}, |
| 219 | params={'if_primary_term': prim_term, 'if_seq_no': seq_no}, |
| 220 | **kwargs |
| 221 | ) |
| 222 | # result is elastic search update query result |
| 223 | # noop = query did not update any document |
| 224 | # updated = at least one document got updated |
no test coverage detected