(
self,
registered_client: Union[
"redis.asyncio.client.Redis", "redis.asyncio.cluster.RedisCluster"
],
script: ScriptTextT,
)
| 10550 | """ |
| 10551 | |
| 10552 | def __init__( |
| 10553 | self, |
| 10554 | registered_client: Union[ |
| 10555 | "redis.asyncio.client.Redis", "redis.asyncio.cluster.RedisCluster" |
| 10556 | ], |
| 10557 | script: ScriptTextT, |
| 10558 | ): |
| 10559 | self.registered_client = registered_client |
| 10560 | self.script = script |
| 10561 | # Precalculate and store the SHA1 hex digest of the script. |
| 10562 | |
| 10563 | if isinstance(script, str): |
| 10564 | # We need the encoding from the client in order to generate an |
| 10565 | # accurate byte representation of the script |
| 10566 | try: |
| 10567 | encoder = registered_client.connection_pool.get_encoder() |
| 10568 | except AttributeError: |
| 10569 | # Cluster |
| 10570 | encoder = registered_client.get_encoder() |
| 10571 | script = encoder.encode(script) |
| 10572 | self.sha = hashlib.sha1(script).hexdigest() |
| 10573 | |
| 10574 | async def __call__( |
| 10575 | self, |
nothing calls this directly
no test coverage detected