MCPcopy
hub / github.com/redis/go-redis / aggregateResponses

Method aggregateResponses

osscluster_router.go:430–463  ·  view source on GitHub ↗

aggregateResponses aggregates multiple shard responses

(cmd Cmder, cmds []Cmder, policy *routing.CommandPolicy)

Source from the content-addressed store, hash-verified

428
429// aggregateResponses aggregates multiple shard responses
430func (c *ClusterClient) aggregateResponses(cmd Cmder, cmds []Cmder, policy *routing.CommandPolicy) error {
431 if len(cmds) == 0 {
432 return errNoCmdsToAggregate
433 }
434
435 if len(cmds) == 1 {
436 shardCmd := cmds[0]
437 if err := shardCmd.Err(); err != nil {
438 cmd.SetErr(err)
439 return err
440 }
441 value, _ := ExtractCommandValue(shardCmd)
442 return c.setCommandValue(cmd, value)
443 }
444
445 aggregator := c.createAggregator(policy, cmd, false)
446
447 batchWithErrs := []routing.AggregatorResErr{}
448 // Add all results to aggregator
449 for _, shardCmd := range cmds {
450 value, err := ExtractCommandValue(shardCmd)
451 batchWithErrs = append(batchWithErrs, routing.AggregatorResErr{
452 Result: value,
453 Err: err,
454 })
455 }
456
457 err := aggregator.BatchSlice(batchWithErrs)
458 if err != nil {
459 return err
460 }
461
462 return c.finishAggregation(cmd, aggregator)
463}
464
465// createAggregator creates the appropriate response aggregator
466func (c *ClusterClient) createAggregator(policy *routing.CommandPolicy, cmd Cmder, isKeyed bool) routing.ResponseAggregator {

Callers 1

executeParallelMethod · 0.95

Calls 7

setCommandValueMethod · 0.95
createAggregatorMethod · 0.95
finishAggregationMethod · 0.95
ExtractCommandValueFunction · 0.85
ErrMethod · 0.65
SetErrMethod · 0.65
BatchSliceMethod · 0.65

Tested by

no test coverage detected