executeMultiShard handles commands that operate on multiple keys across shards
(ctx context.Context, cmd Cmder, policy *routing.CommandPolicy)
| 109 | |
| 110 | // executeMultiShard handles commands that operate on multiple keys across shards |
| 111 | func (c *ClusterClient) executeMultiShard(ctx context.Context, cmd Cmder, policy *routing.CommandPolicy) error { |
| 112 | args := cmd.Args() |
| 113 | firstKeyPos := cmdFirstKeyPosWithInfo(cmd, c.cmdInfoPeek(cmd.Name())) |
| 114 | stepCount := int(cmd.stepCount()) |
| 115 | if stepCount == 0 { |
| 116 | stepCount = 1 // Default to 1 if not set |
| 117 | } |
| 118 | |
| 119 | if firstKeyPos == 0 || firstKeyPos >= len(args) { |
| 120 | return fmt.Errorf("redis: multi-shard command %s has no key arguments", cmd.Name()) |
| 121 | } |
| 122 | |
| 123 | // Group keys by slot |
| 124 | slotMap := make(map[int][]string) |
| 125 | keyOrder := make([]string, 0) |
| 126 | |
| 127 | for i := firstKeyPos; i < len(args); i += stepCount { |
| 128 | key, ok := args[i].(string) |
| 129 | if !ok { |
| 130 | return fmt.Errorf("redis: non-string key at position %d: %v", i, args[i]) |
| 131 | } |
| 132 | |
| 133 | slot := hashtag.Slot(key) |
| 134 | slotMap[slot] = append(slotMap[slot], key) |
| 135 | for j := 1; j < stepCount; j++ { |
| 136 | if i+j >= len(args) { |
| 137 | break |
| 138 | } |
| 139 | slotMap[slot] = append(slotMap[slot], args[i+j].(string)) |
| 140 | } |
| 141 | keyOrder = append(keyOrder, key) |
| 142 | } |
| 143 | |
| 144 | return c.executeMultiSlot(ctx, cmd, slotMap, keyOrder, policy, firstKeyPos) |
| 145 | } |
| 146 | |
| 147 | // executeMultiSlot executes commands across multiple slots concurrently |
| 148 | func (c *ClusterClient) executeMultiSlot(ctx context.Context, cmd Cmder, slotMap map[int][]string, keyOrder []string, policy *routing.CommandPolicy, firstKeyPos int) error { |
no test coverage detected