aggregateResponses aggregates multiple shard responses
(cmd Cmder, cmds []Cmder, policy *routing.CommandPolicy)
| 428 | |
| 429 | // aggregateResponses aggregates multiple shard responses |
| 430 | func (c *ClusterClient) aggregateResponses(cmd Cmder, cmds []Cmder, policy *routing.CommandPolicy) error { |
| 431 | if len(cmds) == 0 { |
| 432 | return errNoCmdsToAggregate |
| 433 | } |
| 434 | |
| 435 | if len(cmds) == 1 { |
| 436 | shardCmd := cmds[0] |
| 437 | if err := shardCmd.Err(); err != nil { |
| 438 | cmd.SetErr(err) |
| 439 | return err |
| 440 | } |
| 441 | value, _ := ExtractCommandValue(shardCmd) |
| 442 | return c.setCommandValue(cmd, value) |
| 443 | } |
| 444 | |
| 445 | aggregator := c.createAggregator(policy, cmd, false) |
| 446 | |
| 447 | batchWithErrs := []routing.AggregatorResErr{} |
| 448 | // Add all results to aggregator |
| 449 | for _, shardCmd := range cmds { |
| 450 | value, err := ExtractCommandValue(shardCmd) |
| 451 | batchWithErrs = append(batchWithErrs, routing.AggregatorResErr{ |
| 452 | Result: value, |
| 453 | Err: err, |
| 454 | }) |
| 455 | } |
| 456 | |
| 457 | err := aggregator.BatchSlice(batchWithErrs) |
| 458 | if err != nil { |
| 459 | return err |
| 460 | } |
| 461 | |
| 462 | return c.finishAggregation(cmd, aggregator) |
| 463 | } |
| 464 | |
| 465 | // createAggregator creates the appropriate response aggregator |
| 466 | func (c *ClusterClient) createAggregator(policy *routing.CommandPolicy, cmd Cmder, isKeyed bool) routing.ResponseAggregator { |
no test coverage detected