MCPcopy
hub / github.com/redis/redis-py / test_blocking_timeout

Method test_blocking_timeout

tests/test_lock.py:119–131  ·  view source on GitHub ↗
(self, r, fake_lock_time)

Source from the content-addressed store, hash-verified

117 lock.release()
118
119 def test_blocking_timeout(self, r, fake_lock_time):
120 lock1 = self.get_lock(r, "foo")
121 assert lock1.acquire(blocking=False)
122 try:
123 bt = 0.4
124 sleep = 0.05
125 lock2 = self.get_lock(r, "foo", sleep=sleep, blocking_timeout=bt)
126 assert not lock2.acquire()
127 assert fake_lock_time.now == pytest.approx(bt)
128 assert fake_lock_time.now > bt - sleep
129 assert fake_lock_time.sleeps == [sleep] * 8
130 finally:
131 lock1.release()
132
133 def test_context_manager(self, r):
134 # blocking_timeout prevents a deadlock if the lock can't be acquired

Callers

nothing calls this directly

Calls 3

get_lockMethod · 0.95
acquireMethod · 0.45
releaseMethod · 0.45

Tested by

no test coverage detected