(self, expected_token: bytes)
| 286 | self.local.token = None |
| 287 | |
| 288 | async def do_release(self, expected_token: bytes) -> None: |
| 289 | if not bool( |
| 290 | await self.lua_release( |
| 291 | keys=[self.name], args=[expected_token], client=self.redis |
| 292 | ) |
| 293 | ): |
| 294 | raise LockNotOwnedError("Cannot release a lock that's no longer owned") |
| 295 | |
| 296 | def extend( |
| 297 | self, additional_time: Number, replace_ttl: bool = False |
no test coverage detected