(ctx context.Context, cmds []Cmder)
| 1833 | } |
| 1834 | |
| 1835 | func (c *ClusterClient) processTxPipeline(ctx context.Context, cmds []Cmder) error { |
| 1836 | // Only call time.Now() if pipeline operation duration callback is set to avoid overhead |
| 1837 | var operationStart time.Time |
| 1838 | pipelineOpDurationCallback := otel.GetPipelineOperationDurationCallback() |
| 1839 | if pipelineOpDurationCallback != nil { |
| 1840 | operationStart = time.Now() |
| 1841 | } |
| 1842 | totalAttempts := 0 |
| 1843 | |
| 1844 | // Trim multi .. exec. |
| 1845 | cmds = cmds[1 : len(cmds)-1] |
| 1846 | |
| 1847 | if len(cmds) == 0 { |
| 1848 | return nil |
| 1849 | } |
| 1850 | |
| 1851 | state, err := c.state.Get(ctx) |
| 1852 | if err != nil { |
| 1853 | setCmdsErr(cmds, err) |
| 1854 | if pipelineOpDurationCallback != nil { |
| 1855 | operationDuration := time.Since(operationStart) |
| 1856 | pipelineOpDurationCallback(ctx, operationDuration, "MULTI", len(cmds), 1, err, nil, 0) |
| 1857 | } |
| 1858 | return err |
| 1859 | } |
| 1860 | |
| 1861 | keyedCmdsBySlot := c.slottedKeyedCommands(ctx, cmds) |
| 1862 | slot := -1 |
| 1863 | switch len(keyedCmdsBySlot) { |
| 1864 | case 0: |
| 1865 | slot = hashtag.RandomSlot() |
| 1866 | case 1: |
| 1867 | for sl := range keyedCmdsBySlot { |
| 1868 | slot = sl |
| 1869 | break |
| 1870 | } |
| 1871 | default: |
| 1872 | // TxPipeline does not support cross slot transaction. |
| 1873 | setCmdsErr(cmds, ErrCrossSlot) |
| 1874 | if pipelineOpDurationCallback != nil { |
| 1875 | operationDuration := time.Since(operationStart) |
| 1876 | pipelineOpDurationCallback(ctx, operationDuration, "MULTI", len(cmds), 1, ErrCrossSlot, nil, 0) |
| 1877 | } |
| 1878 | return ErrCrossSlot |
| 1879 | } |
| 1880 | |
| 1881 | node, err := state.slotMasterNode(slot) |
| 1882 | if err != nil { |
| 1883 | setCmdsErr(cmds, err) |
| 1884 | if pipelineOpDurationCallback != nil { |
| 1885 | operationDuration := time.Since(operationStart) |
| 1886 | pipelineOpDurationCallback(ctx, operationDuration, "MULTI", len(cmds), 1, err, nil, 0) |
| 1887 | } |
| 1888 | return err |
| 1889 | } |
| 1890 | |
| 1891 | var lastErr error |
| 1892 | cmdsMap := map[*clusterNode][]Cmder{node: cmds} |
nothing calls this directly
no test coverage detected