MCPcopy
hub / github.com/redis/redis-py / acquire

Method acquire

redis/lock.py:190–235  ·  view source on GitHub ↗

Use Redis to hold a shared, distributed lock named ``name``. Returns True once the lock is acquired. If ``blocking`` is False, always return immediately. If the lock was acquired, return True, otherwise return False. ``blocking_timeout`` specifies the maxim

(
        self,
        sleep: Optional[Number] = None,
        blocking: Optional[bool] = None,
        blocking_timeout: Optional[Number] = None,
        token: Optional[str] = None,
    )

Source from the content-addressed store, hash-verified

188 )
189
190 def acquire(
191 self,
192 sleep: Optional[Number] = None,
193 blocking: Optional[bool] = None,
194 blocking_timeout: Optional[Number] = None,
195 token: Optional[str] = None,
196 ):
197 """
198 Use Redis to hold a shared, distributed lock named ``name``.
199 Returns True once the lock is acquired.
200
201 If ``blocking`` is False, always return immediately. If the lock
202 was acquired, return True, otherwise return False.
203
204 ``blocking_timeout`` specifies the maximum number of seconds to
205 wait trying to acquire the lock.
206
207 ``token`` specifies the token value to be used. If provided, token
208 must be a bytes object or a string that can be encoded to a bytes
209 object with the default encoding. If a token isn't specified, a UUID
210 will be generated.
211 """
212 if sleep is None:
213 sleep = self.sleep
214 if token is None:
215 token = uuid.uuid1().hex.encode()
216 else:
217 encoder = self.redis.get_encoder()
218 token = encoder.encode(token)
219 if blocking is None:
220 blocking = self.blocking
221 if blocking_timeout is None:
222 blocking_timeout = self.blocking_timeout
223 stop_trying_at = None
224 if blocking_timeout is not None:
225 stop_trying_at = mod_time.monotonic() + blocking_timeout
226 while True:
227 if self.do_acquire(token):
228 self.local.token = token
229 return True
230 if not blocking:
231 return False
232 next_try_at = mod_time.monotonic() + sleep
233 if stop_trying_at is not None and next_try_at > stop_trying_at:
234 return False
235 mod_time.sleep(sleep)
236
237 def do_acquire(self, token: str) -> bool:
238 if self.timeout:

Callers 15

__enter__Method · 0.95
test_lockMethod · 0.45
_test_lock_tokenMethod · 0.45
test_lockedMethod · 0.45
_test_ownedMethod · 0.45
test_competing_locksMethod · 0.45
test_timeoutMethod · 0.45
test_float_timeoutMethod · 0.45
test_blocking_timeoutMethod · 0.45

Calls 5

do_acquireMethod · 0.95
encodeMethod · 0.80
monotonicMethod · 0.80
get_encoderMethod · 0.45
sleepMethod · 0.45

Tested by 15

test_lockMethod · 0.36
_test_lock_tokenMethod · 0.36
test_lockedMethod · 0.36
_test_ownedMethod · 0.36
test_competing_locksMethod · 0.36
test_timeoutMethod · 0.36
test_float_timeoutMethod · 0.36
test_blocking_timeoutMethod · 0.36
test_extend_lockMethod · 0.36