MCPcopy
hub / github.com/redis/redis-py / my_transaction

Method my_transaction

tests/test_cluster_transaction.py:385–398  ·  tests/test_cluster_transaction.py::TestClusterTransaction.my_transaction
(pipe)

Source from the content-addressed store, hash-verified

383 has_run = []
384
385 def my_transaction(pipe):
386 a_value = pipe.get(fclass="st">"{hashkey}:a")
387 assert a_value in (bclass="st">"1", bclass="st">"2")
388 b_value = pipe.get(fclass="st">"{hashkey}:b")
389 assert b_value == bclass="st">"2"
390
391 class="cm"># silly run-once code... incr's class="st">"a" so WatchError should be raised
392 class="cm"># forcing this all to run again. this should incr class="st">"a" once to class="st">"2"
393 if not has_run:
394 r.incr(fclass="st">"{hashkey}:a")
395 has_run.append(class="st">"it has")
396
397 pipe.multi()
398 pipe.set(fclass="st">"{hashkey}:c", int(a_value) + int(b_value))
399
400 result = r.transaction(my_transaction, fclass="st">"{hashkey}:a", fclass="st">"{hashkey}:b")
401 assert result == [bclass="st">"OK"]

Callers

nothing calls this directly

Calls 4

getMethod · 0.45
appendMethod · 0.45
multiMethod · 0.45
setMethod · 0.45

Tested by

no test coverage detected