MCPcopy
hub / github.com/redis/go-redis / executeMultiSlot

Method executeMultiSlot

osscluster_router.go:148–177  ·  view source on GitHub ↗

executeMultiSlot executes commands across multiple slots concurrently

(ctx context.Context, cmd Cmder, slotMap map[int][]string, keyOrder []string, policy *routing.CommandPolicy, firstKeyPos int)

Source from the content-addressed store, hash-verified

146
147// executeMultiSlot executes commands across multiple slots concurrently
148func (c *ClusterClient) executeMultiSlot(ctx context.Context, cmd Cmder, slotMap map[int][]string, keyOrder []string, policy *routing.CommandPolicy, firstKeyPos int) error {
149 results := make(chan slotResult, len(slotMap))
150 var wg sync.WaitGroup
151
152 // Execute on each slot concurrently
153 for slot, keys := range slotMap {
154 wg.Add(1)
155 go func(slot int, keys []string) {
156 defer wg.Done()
157
158 node, err := c.cmdNodeWithShardPicker(ctx, cmd.Name(), slot, c.opt.ShardPicker)
159 if err != nil {
160 results <- slotResult{nil, keys, err}
161 return
162 }
163
164 // Create a command for this specific slot's keys
165 subCmd := c.createSlotSpecificCommand(ctx, cmd, keys, firstKeyPos)
166 err = node.Client.Process(ctx, subCmd)
167 results <- slotResult{subCmd, keys, err}
168 }(slot, keys)
169 }
170
171 go func() {
172 wg.Wait()
173 close(results)
174 }()
175
176 return c.aggregateMultiSlotResults(ctx, cmd, results, keyOrder, policy)
177}
178
179// createSlotSpecificCommand creates a new command for a specific slot's keys.
180// firstKeyPos is passed in from the caller (computed once in executeMultiShard)

Callers 1

executeMultiShardMethod · 0.95

Calls 7

WaitMethod · 0.80
AddMethod · 0.65
NameMethod · 0.65
ProcessMethod · 0.65

Tested by

no test coverage detected