MCPcopy
hub / github.com/redis/go-redis / generalProcessPipeline

Method generalProcessPipeline

ring.go:838–890  ·  view source on GitHub ↗
(
	ctx context.Context, cmds []Cmder, tx bool,
)

Source from the content-addressed store, hash-verified

836}
837
838func (c *Ring) generalProcessPipeline(
839 ctx context.Context, cmds []Cmder, tx bool,
840) error {
841 if tx {
842 // Trim multi .. exec.
843 cmds = cmds[1 : len(cmds)-1]
844 }
845
846 cmdsMap := make(map[string][]Cmder)
847
848 for _, cmd := range cmds {
849 hash := cmd.stringArg(cmdFirstKeyPosWithInfo(cmd, nil))
850 if hash != "" {
851 hash = c.sharding.Hash(hash)
852 }
853 cmdsMap[hash] = append(cmdsMap[hash], cmd)
854 }
855
856 var wg sync.WaitGroup
857 errs := make(chan error, len(cmdsMap))
858
859 for hash, cmds := range cmdsMap {
860 wg.Add(1)
861 go func(hash string, cmds []Cmder) {
862 defer wg.Done()
863
864 // TODO: retry?
865 shard, err := c.sharding.GetByName(hash)
866 if err != nil {
867 setCmdsErr(cmds, err)
868 return
869 }
870
871 hook := shard.Client.processPipelineHook
872 if tx {
873 cmds = wrapMultiExec(ctx, cmds)
874 hook = shard.Client.processTxPipelineHook
875 }
876
877 if err = hook(ctx, cmds); err != nil {
878 errs <- err
879 }
880 }(hash, cmds)
881 }
882
883 wg.Wait()
884 close(errs)
885
886 if err := <-errs; err != nil {
887 return err
888 }
889 return cmdsFirstErr(cmds)
890}
891
892func (c *Ring) Watch(ctx context.Context, fn func(*Tx) error, keys ...string) error {
893 if len(keys) == 0 {

Callers 1

NewRingFunction · 0.95

Calls 10

cmdFirstKeyPosWithInfoFunction · 0.85
setCmdsErrFunction · 0.85
wrapMultiExecFunction · 0.85
hookStruct · 0.85
cmdsFirstErrFunction · 0.85
GetByNameMethod · 0.80
WaitMethod · 0.80
stringArgMethod · 0.65
AddMethod · 0.65
HashMethod · 0.45

Tested by

no test coverage detected