( ctx context.Context, cmds []Cmder, tx bool, )
| 836 | } |
| 837 | |
| 838 | func (c *Ring) generalProcessPipeline( |
| 839 | ctx context.Context, cmds []Cmder, tx bool, |
| 840 | ) error { |
| 841 | if tx { |
| 842 | // Trim multi .. exec. |
| 843 | cmds = cmds[1 : len(cmds)-1] |
| 844 | } |
| 845 | |
| 846 | cmdsMap := make(map[string][]Cmder) |
| 847 | |
| 848 | for _, cmd := range cmds { |
| 849 | hash := cmd.stringArg(cmdFirstKeyPosWithInfo(cmd, nil)) |
| 850 | if hash != "" { |
| 851 | hash = c.sharding.Hash(hash) |
| 852 | } |
| 853 | cmdsMap[hash] = append(cmdsMap[hash], cmd) |
| 854 | } |
| 855 | |
| 856 | var wg sync.WaitGroup |
| 857 | errs := make(chan error, len(cmdsMap)) |
| 858 | |
| 859 | for hash, cmds := range cmdsMap { |
| 860 | wg.Add(1) |
| 861 | go func(hash string, cmds []Cmder) { |
| 862 | defer wg.Done() |
| 863 | |
| 864 | // TODO: retry? |
| 865 | shard, err := c.sharding.GetByName(hash) |
| 866 | if err != nil { |
| 867 | setCmdsErr(cmds, err) |
| 868 | return |
| 869 | } |
| 870 | |
| 871 | hook := shard.Client.processPipelineHook |
| 872 | if tx { |
| 873 | cmds = wrapMultiExec(ctx, cmds) |
| 874 | hook = shard.Client.processTxPipelineHook |
| 875 | } |
| 876 | |
| 877 | if err = hook(ctx, cmds); err != nil { |
| 878 | errs <- err |
| 879 | } |
| 880 | }(hash, cmds) |
| 881 | } |
| 882 | |
| 883 | wg.Wait() |
| 884 | close(errs) |
| 885 | |
| 886 | if err := <-errs; err != nil { |
| 887 | return err |
| 888 | } |
| 889 | return cmdsFirstErr(cmds) |
| 890 | } |
| 891 | |
| 892 | func (c *Ring) Watch(ctx context.Context, fn func(*Tx) error, keys ...string) error { |
| 893 | if len(keys) == 0 { |
no test coverage detected