(ctx context.Context, cmdsMap *cmdsMap, cmds []Cmder)
| 1626 | } |
| 1627 | |
| 1628 | func (c *ClusterClient) mapCmdsByNode(ctx context.Context, cmdsMap *cmdsMap, cmds []Cmder) error { |
| 1629 | state, err := c.state.Get(ctx) |
| 1630 | if err != nil { |
| 1631 | return err |
| 1632 | } |
| 1633 | |
| 1634 | if c.opt.ReadOnly && c.cmdsAreReadOnly(ctx, cmds) { |
| 1635 | for _, cmd := range cmds { |
| 1636 | var policy *routing.CommandPolicy |
| 1637 | if c.cmdInfoResolver != nil { |
| 1638 | policy = c.cmdInfoResolver.GetCommandPolicy(ctx, cmd) |
| 1639 | } |
| 1640 | if policy != nil && !policy.CanBeUsedInPipeline() { |
| 1641 | return fmt.Errorf( |
| 1642 | "redis: cannot pipeline command %q with request policy ReqAllNodes/ReqAllShards/ReqMultiShard; Note: This behavior is subject to change in the future", cmd.Name(), |
| 1643 | ) |
| 1644 | } |
| 1645 | slot := c.cmdSlot(cmd, -1) |
| 1646 | var node *clusterNode |
| 1647 | // For keyless commands (slot == -1), use ShardPicker if routing policies are enabled |
| 1648 | if slot == -1 && !c.opt.DisableRoutingPolicies && c.opt.ShardPicker != nil { |
| 1649 | if len(state.Masters) == 0 { |
| 1650 | return errClusterNoNodes |
| 1651 | } |
| 1652 | // For read-only keyless commands, pick from all nodes (masters + slaves) |
| 1653 | allNodes := append(state.Masters, state.Slaves...) |
| 1654 | idx := c.opt.ShardPicker.Next(len(allNodes)) |
| 1655 | node = allNodes[idx] |
| 1656 | } else { |
| 1657 | node, err = c.slotReadOnlyNode(state, slot) |
| 1658 | if err != nil { |
| 1659 | return err |
| 1660 | } |
| 1661 | } |
| 1662 | cmdsMap.Add(node, cmd) |
| 1663 | } |
| 1664 | return nil |
| 1665 | } |
| 1666 | |
| 1667 | for _, cmd := range cmds { |
| 1668 | var policy *routing.CommandPolicy |
| 1669 | if c.cmdInfoResolver != nil { |
| 1670 | policy = c.cmdInfoResolver.GetCommandPolicy(ctx, cmd) |
| 1671 | } |
| 1672 | if policy != nil && !policy.CanBeUsedInPipeline() { |
| 1673 | return fmt.Errorf( |
| 1674 | "redis: cannot pipeline command %q with request policy ReqAllNodes/ReqAllShards/ReqMultiShard; Note: This behavior is subject to change in the future", cmd.Name(), |
| 1675 | ) |
| 1676 | } |
| 1677 | slot := c.cmdSlot(cmd, -1) |
| 1678 | var node *clusterNode |
| 1679 | // For keyless commands (slot == -1), use ShardPicker if routing policies are enabled |
| 1680 | if slot == -1 && !c.opt.DisableRoutingPolicies && c.opt.ShardPicker != nil { |
| 1681 | if len(state.Masters) == 0 { |
| 1682 | return errClusterNoNodes |
| 1683 | } |
| 1684 | idx := c.opt.ShardPicker.Next(len(state.Masters)) |
| 1685 | node = state.Masters[idx] |
no test coverage detected