aggregateMultiSlotResults aggregates results from multi-slot execution
(ctx context.Context, cmd Cmder, results <-chan slotResult, keyOrder []string, policy *routing.CommandPolicy)
| 364 | |
| 365 | // aggregateMultiSlotResults aggregates results from multi-slot execution |
| 366 | func (c *ClusterClient) aggregateMultiSlotResults(ctx context.Context, cmd Cmder, results <-chan slotResult, keyOrder []string, policy *routing.CommandPolicy) error { |
| 367 | keyedResults := make(map[string]routing.AggregatorResErr) |
| 368 | var firstErr error |
| 369 | |
| 370 | for result := range results { |
| 371 | if result.err != nil && firstErr == nil { |
| 372 | firstErr = result.err |
| 373 | } |
| 374 | if result.cmd != nil && result.err == nil { |
| 375 | value, err := ExtractCommandValue(result.cmd) |
| 376 | |
| 377 | // Check if the result is a slice (e.g., from MGET) |
| 378 | if sliceValue, ok := value.([]interface{}); ok { |
| 379 | // Map each element to its corresponding key |
| 380 | for i, key := range result.keys { |
| 381 | if i < len(sliceValue) { |
| 382 | keyedResults[key] = routing.AggregatorResErr{Result: sliceValue[i], Err: err} |
| 383 | } else { |
| 384 | keyedResults[key] = routing.AggregatorResErr{Result: nil, Err: err} |
| 385 | } |
| 386 | } |
| 387 | } else { |
| 388 | // For non-slice results, map the entire result to each key |
| 389 | for _, key := range result.keys { |
| 390 | keyedResults[key] = routing.AggregatorResErr{Result: value, Err: err} |
| 391 | } |
| 392 | } |
| 393 | } |
| 394 | |
| 395 | // TODO: return multiple errors by order when we will implement multiple errors returning |
| 396 | if result.err != nil { |
| 397 | firstErr = result.err |
| 398 | } |
| 399 | } |
| 400 | |
| 401 | return c.aggregateKeyedValues(cmd, keyedResults, keyOrder, policy) |
| 402 | } |
| 403 | |
| 404 | // aggregateKeyedValues aggregates individual key-value pairs while preserving key order |
| 405 | func (c *ClusterClient) aggregateKeyedValues(cmd Cmder, keyedResults map[string]routing.AggregatorResErr, keyOrder []string, policy *routing.CommandPolicy) error { |
no test coverage detected