MCPcopy
hub / github.com/redis/go-redis / processTxPipeline

Method processTxPipeline

osscluster.go:1835–1935  ·  view source on GitHub ↗
(ctx context.Context, cmds []Cmder)

Source from the content-addressed store, hash-verified

1833}
1834
1835func (c *ClusterClient) processTxPipeline(ctx context.Context, cmds []Cmder) error {
1836 // Only call time.Now() if pipeline operation duration callback is set to avoid overhead
1837 var operationStart time.Time
1838 pipelineOpDurationCallback := otel.GetPipelineOperationDurationCallback()
1839 if pipelineOpDurationCallback != nil {
1840 operationStart = time.Now()
1841 }
1842 totalAttempts := 0
1843
1844 // Trim multi .. exec.
1845 cmds = cmds[1 : len(cmds)-1]
1846
1847 if len(cmds) == 0 {
1848 return nil
1849 }
1850
1851 state, err := c.state.Get(ctx)
1852 if err != nil {
1853 setCmdsErr(cmds, err)
1854 if pipelineOpDurationCallback != nil {
1855 operationDuration := time.Since(operationStart)
1856 pipelineOpDurationCallback(ctx, operationDuration, "MULTI", len(cmds), 1, err, nil, 0)
1857 }
1858 return err
1859 }
1860
1861 keyedCmdsBySlot := c.slottedKeyedCommands(ctx, cmds)
1862 slot := -1
1863 switch len(keyedCmdsBySlot) {
1864 case 0:
1865 slot = hashtag.RandomSlot()
1866 case 1:
1867 for sl := range keyedCmdsBySlot {
1868 slot = sl
1869 break
1870 }
1871 default:
1872 // TxPipeline does not support cross slot transaction.
1873 setCmdsErr(cmds, ErrCrossSlot)
1874 if pipelineOpDurationCallback != nil {
1875 operationDuration := time.Since(operationStart)
1876 pipelineOpDurationCallback(ctx, operationDuration, "MULTI", len(cmds), 1, ErrCrossSlot, nil, 0)
1877 }
1878 return ErrCrossSlot
1879 }
1880
1881 node, err := state.slotMasterNode(slot)
1882 if err != nil {
1883 setCmdsErr(cmds, err)
1884 if pipelineOpDurationCallback != nil {
1885 operationDuration := time.Since(operationStart)
1886 pipelineOpDurationCallback(ctx, operationDuration, "MULTI", len(cmds), 1, err, nil, 0)
1887 }
1888 return err
1889 }
1890
1891 var lastErr error
1892 cmdsMap := map[*clusterNode][]Cmder{node: cmds}

Callers

nothing calls this directly

Calls 13

slottedKeyedCommandsMethod · 0.95
retryBackoffMethod · 0.95
processTxPipelineNodeMethod · 0.95
RandomSlotFunction · 0.92
SleepFunction · 0.92
setCmdsErrFunction · 0.85
newCmdsMapFunction · 0.85
cmdsFirstErrFunction · 0.85
WaitMethod · 0.80
GetMethod · 0.65
AddMethod · 0.65

Tested by

no test coverage detected