MCPcopy
hub / github.com/redis/go-redis / processPipeline

Method processPipeline

osscluster.go:1562–1626  ·  view source on GitHub ↗
(ctx context.Context, cmds []Cmder)

Source from the content-addressed store, hash-verified

1560}
1561
1562func (c *ClusterClient) processPipeline(ctx context.Context, cmds []Cmder) error {
1563 // Only call time.Now() if pipeline operation duration callback is set to avoid overhead
1564 var operationStart time.Time
1565 pipelineOpDurationCallback := otel.GetPipelineOperationDurationCallback()
1566 if pipelineOpDurationCallback != nil {
1567 operationStart = time.Now()
1568 }
1569 totalAttempts := 0
1570
1571 cmdsMap := newCmdsMap()
1572
1573 if err := c.mapCmdsByNode(ctx, cmdsMap, cmds); err != nil {
1574 setCmdsErr(cmds, err)
1575 if pipelineOpDurationCallback != nil {
1576 operationDuration := time.Since(operationStart)
1577 pipelineOpDurationCallback(ctx, operationDuration, "PIPELINE", len(cmds), 1, err, nil, 0)
1578 }
1579 return err
1580 }
1581
1582 var lastErr error
1583 for attempt := 0; attempt <= c.opt.MaxRedirects; attempt++ {
1584 totalAttempts++
1585 if attempt > 0 {
1586 if err := internal.Sleep(ctx, c.retryBackoff(attempt)); err != nil {
1587 setCmdsErr(cmds, err)
1588 if pipelineOpDurationCallback != nil {
1589 operationDuration := time.Since(operationStart)
1590 pipelineOpDurationCallback(ctx, operationDuration, "PIPELINE", len(cmds), totalAttempts, err, nil, 0)
1591 }
1592 return err
1593 }
1594 }
1595
1596 failedCmds := newCmdsMap()
1597 var wg sync.WaitGroup
1598
1599 for node, cmds := range cmdsMap.m {
1600 wg.Add(1)
1601 go func(node *clusterNode, cmds []Cmder) {
1602 defer wg.Done()
1603 c.processPipelineNode(ctx, node, cmds, failedCmds)
1604 }(node, cmds)
1605 }
1606
1607 wg.Wait()
1608 if len(failedCmds.m) == 0 {
1609 break
1610 }
1611 cmdsMap = failedCmds
1612 lastErr = cmdsFirstErr(cmds)
1613 }
1614
1615 // Record pipeline operation duration
1616 if pipelineOpDurationCallback != nil {
1617 operationDuration := time.Since(operationStart)
1618 finalErr := cmdsFirstErr(cmds)
1619 if finalErr == nil {

Callers

nothing calls this directly

Calls 10

mapCmdsByNodeMethod · 0.95
retryBackoffMethod · 0.95
processPipelineNodeMethod · 0.95
SleepFunction · 0.92
newCmdsMapFunction · 0.85
setCmdsErrFunction · 0.85
cmdsFirstErrFunction · 0.85
WaitMethod · 0.80
AddMethod · 0.65

Tested by

no test coverage detected