(
self,
nodes_manager: "NodesManager",
commands_parser: "CommandsParser",
result_callbacks: Optional[Dict[str, Callable]] = None,
cluster_response_callbacks: Optional[Dict[str, Callable]] = None,
startup_nodes: Optional[List["ClusterNode"]] = None,
read_from_replicas: bool = False,
load_balancing_strategy: Optional[LoadBalancingStrategy] = None,
cluster_error_retry_attempts: int = DEFAULT_RETRY_COUNT,
reinitialize_steps: int = 5,
retry: Optional[Retry] = None,
lock=None,
transaction=False,
policy_resolver: PolicyResolver = StaticPolicyResolver(),
event_dispatcher: Optional["EventDispatcher"] = None,
**kwargs,
)
| 3402 | version="6.0.0", |
| 3403 | ) |
| 3404 | def __init__( |
| 3405 | self, |
| 3406 | nodes_manager: "NodesManager", |
| 3407 | commands_parser: "CommandsParser", |
| 3408 | result_callbacks: Optional[Dict[str, Callable]] = None, |
| 3409 | cluster_response_callbacks: Optional[Dict[str, Callable]] = None, |
| 3410 | startup_nodes: Optional[List["ClusterNode"]] = None, |
| 3411 | read_from_replicas: bool = False, |
| 3412 | load_balancing_strategy: Optional[LoadBalancingStrategy] = None, |
| 3413 | cluster_error_retry_attempts: int = DEFAULT_RETRY_COUNT, |
| 3414 | reinitialize_steps: int = 5, |
| 3415 | retry: Optional[Retry] = None, |
| 3416 | lock=None, |
| 3417 | transaction=False, |
| 3418 | policy_resolver: PolicyResolver = StaticPolicyResolver(), |
| 3419 | event_dispatcher: Optional["EventDispatcher"] = None, |
| 3420 | **kwargs, |
| 3421 | ): |
| 3422 | """ """ |
| 3423 | self.command_stack = [] |
| 3424 | self.nodes_manager = nodes_manager |
| 3425 | self.commands_parser = commands_parser |
| 3426 | self.refresh_table_asap = False |
| 3427 | self.result_callbacks = ( |
| 3428 | result_callbacks or self.__class__.RESULT_CALLBACKS.copy() |
| 3429 | ) |
| 3430 | self.startup_nodes = startup_nodes if startup_nodes else [] |
| 3431 | self.read_from_replicas = read_from_replicas |
| 3432 | self.load_balancing_strategy = load_balancing_strategy |
| 3433 | self.command_flags = self.__class__.COMMAND_FLAGS.copy() |
| 3434 | self.cluster_response_callbacks = cluster_response_callbacks |
| 3435 | self.reinitialize_counter = 0 |
| 3436 | self.reinitialize_steps = reinitialize_steps |
| 3437 | if retry is not None: |
| 3438 | self.retry = retry |
| 3439 | else: |
| 3440 | self.retry = Retry( |
| 3441 | backoff=ExponentialWithJitterBackoff( |
| 3442 | base=DEFAULT_RETRY_BASE, cap=DEFAULT_RETRY_CAP |
| 3443 | ), |
| 3444 | retries=cluster_error_retry_attempts, |
| 3445 | ) |
| 3446 | |
| 3447 | self.encoder = Encoder( |
| 3448 | kwargs.get("encoding", "utf-8"), |
| 3449 | kwargs.get("encoding_errors", "strict"), |
| 3450 | kwargs.get("decode_responses", False), |
| 3451 | ) |
| 3452 | if lock is None: |
| 3453 | lock = threading.RLock() |
| 3454 | self._lock = lock |
| 3455 | self.parent_execute_command = super().execute_command |
| 3456 | self._execution_strategy: ExecutionStrategy = ( |
| 3457 | PipelineStrategy(self) if not transaction else TransactionStrategy(self) |
| 3458 | ) |
| 3459 | |
| 3460 | # For backward compatibility, mapping from existing policies to new one |
| 3461 | self._command_flags_mapping: dict[str, Union[RequestPolicy, ResponsePolicy]] = { |
nothing calls this directly
no test coverage detected