MCPcopy
hub / github.com/redis/go-redis / mapCmdsByNode

Method mapCmdsByNode

osscluster.go:1628–1695  ·  view source on GitHub ↗
(ctx context.Context, cmdsMap *cmdsMap, cmds []Cmder)

Source from the content-addressed store, hash-verified

1626}
1627
1628func (c *ClusterClient) mapCmdsByNode(ctx context.Context, cmdsMap *cmdsMap, cmds []Cmder) error {
1629 state, err := c.state.Get(ctx)
1630 if err != nil {
1631 return err
1632 }
1633
1634 if c.opt.ReadOnly && c.cmdsAreReadOnly(ctx, cmds) {
1635 for _, cmd := range cmds {
1636 var policy *routing.CommandPolicy
1637 if c.cmdInfoResolver != nil {
1638 policy = c.cmdInfoResolver.GetCommandPolicy(ctx, cmd)
1639 }
1640 if policy != nil && !policy.CanBeUsedInPipeline() {
1641 return fmt.Errorf(
1642 "redis: cannot pipeline command %q with request policy ReqAllNodes/ReqAllShards/ReqMultiShard; Note: This behavior is subject to change in the future", cmd.Name(),
1643 )
1644 }
1645 slot := c.cmdSlot(cmd, -1)
1646 var node *clusterNode
1647 // For keyless commands (slot == -1), use ShardPicker if routing policies are enabled
1648 if slot == -1 && !c.opt.DisableRoutingPolicies && c.opt.ShardPicker != nil {
1649 if len(state.Masters) == 0 {
1650 return errClusterNoNodes
1651 }
1652 // For read-only keyless commands, pick from all nodes (masters + slaves)
1653 allNodes := append(state.Masters, state.Slaves...)
1654 idx := c.opt.ShardPicker.Next(len(allNodes))
1655 node = allNodes[idx]
1656 } else {
1657 node, err = c.slotReadOnlyNode(state, slot)
1658 if err != nil {
1659 return err
1660 }
1661 }
1662 cmdsMap.Add(node, cmd)
1663 }
1664 return nil
1665 }
1666
1667 for _, cmd := range cmds {
1668 var policy *routing.CommandPolicy
1669 if c.cmdInfoResolver != nil {
1670 policy = c.cmdInfoResolver.GetCommandPolicy(ctx, cmd)
1671 }
1672 if policy != nil && !policy.CanBeUsedInPipeline() {
1673 return fmt.Errorf(
1674 "redis: cannot pipeline command %q with request policy ReqAllNodes/ReqAllShards/ReqMultiShard; Note: This behavior is subject to change in the future", cmd.Name(),
1675 )
1676 }
1677 slot := c.cmdSlot(cmd, -1)
1678 var node *clusterNode
1679 // For keyless commands (slot == -1), use ShardPicker if routing policies are enabled
1680 if slot == -1 && !c.opt.DisableRoutingPolicies && c.opt.ShardPicker != nil {
1681 if len(state.Masters) == 0 {
1682 return errClusterNoNodes
1683 }
1684 idx := c.opt.ShardPicker.Next(len(state.Masters))
1685 node = state.Masters[idx]

Callers 6

processPipelineMethod · 0.95
processPipelineNodeMethod · 0.95
pipelineReadCmdsMethod · 0.95
processTxPipelineNodeMethod · 0.95

Calls 10

cmdsAreReadOnlyMethod · 0.95
CanBeUsedInPipelineMethod · 0.95
cmdSlotMethod · 0.95
slotReadOnlyNodeMethod · 0.95
GetCommandPolicyMethod · 0.80
GetMethod · 0.65
NameMethod · 0.65
NextMethod · 0.65
AddMethod · 0.65
slotMasterNodeMethod · 0.45

Tested by

no test coverage detected