routeAndRun routes a command to the appropriate cluster nodes and executes it
(ctx context.Context, cmd Cmder, node *clusterNode)
| 29 | |
| 30 | // routeAndRun routes a command to the appropriate cluster nodes and executes it |
| 31 | func (c *ClusterClient) routeAndRun(ctx context.Context, cmd Cmder, node *clusterNode) error { |
| 32 | var policy *routing.CommandPolicy |
| 33 | if c.cmdInfoResolver != nil { |
| 34 | policy = c.cmdInfoResolver.GetCommandPolicy(ctx, cmd) |
| 35 | } |
| 36 | |
| 37 | // Set stepCount from cmdInfo if not already set |
| 38 | if cmd.stepCount() == 0 { |
| 39 | if cmdInfo := c.cmdInfo(ctx, cmd.Name()); cmdInfo != nil && cmdInfo.StepCount > 0 { |
| 40 | cmd.SetStepCount(cmdInfo.StepCount) |
| 41 | } |
| 42 | } |
| 43 | |
| 44 | if policy == nil { |
| 45 | return c.executeDefault(ctx, cmd, policy, node) |
| 46 | } |
| 47 | switch policy.Request { |
| 48 | case routing.ReqAllNodes: |
| 49 | return c.executeOnAllNodes(ctx, cmd, policy) |
| 50 | case routing.ReqAllShards: |
| 51 | return c.executeOnAllShards(ctx, cmd, policy) |
| 52 | case routing.ReqMultiShard: |
| 53 | return c.executeMultiShard(ctx, cmd, policy) |
| 54 | case routing.ReqSpecial: |
| 55 | return c.executeSpecialCommand(ctx, cmd, policy, node) |
| 56 | default: |
| 57 | return c.executeDefault(ctx, cmd, policy, node) |
| 58 | } |
| 59 | } |
| 60 | |
| 61 | // executeDefault handles standard command routing based on keys |
| 62 | func (c *ClusterClient) executeDefault(ctx context.Context, cmd Cmder, policy *routing.CommandPolicy, node *clusterNode) error { |
no test coverage detected