(ctx context.Context, cmds []Cmder)
| 1560 | } |
| 1561 | |
| 1562 | func (c *ClusterClient) processPipeline(ctx context.Context, cmds []Cmder) error { |
| 1563 | // Only call time.Now() if pipeline operation duration callback is set to avoid overhead |
| 1564 | var operationStart time.Time |
| 1565 | pipelineOpDurationCallback := otel.GetPipelineOperationDurationCallback() |
| 1566 | if pipelineOpDurationCallback != nil { |
| 1567 | operationStart = time.Now() |
| 1568 | } |
| 1569 | totalAttempts := 0 |
| 1570 | |
| 1571 | cmdsMap := newCmdsMap() |
| 1572 | |
| 1573 | if err := c.mapCmdsByNode(ctx, cmdsMap, cmds); err != nil { |
| 1574 | setCmdsErr(cmds, err) |
| 1575 | if pipelineOpDurationCallback != nil { |
| 1576 | operationDuration := time.Since(operationStart) |
| 1577 | pipelineOpDurationCallback(ctx, operationDuration, "PIPELINE", len(cmds), 1, err, nil, 0) |
| 1578 | } |
| 1579 | return err |
| 1580 | } |
| 1581 | |
| 1582 | var lastErr error |
| 1583 | for attempt := 0; attempt <= c.opt.MaxRedirects; attempt++ { |
| 1584 | totalAttempts++ |
| 1585 | if attempt > 0 { |
| 1586 | if err := internal.Sleep(ctx, c.retryBackoff(attempt)); err != nil { |
| 1587 | setCmdsErr(cmds, err) |
| 1588 | if pipelineOpDurationCallback != nil { |
| 1589 | operationDuration := time.Since(operationStart) |
| 1590 | pipelineOpDurationCallback(ctx, operationDuration, "PIPELINE", len(cmds), totalAttempts, err, nil, 0) |
| 1591 | } |
| 1592 | return err |
| 1593 | } |
| 1594 | } |
| 1595 | |
| 1596 | failedCmds := newCmdsMap() |
| 1597 | var wg sync.WaitGroup |
| 1598 | |
| 1599 | for node, cmds := range cmdsMap.m { |
| 1600 | wg.Add(1) |
| 1601 | go func(node *clusterNode, cmds []Cmder) { |
| 1602 | defer wg.Done() |
| 1603 | c.processPipelineNode(ctx, node, cmds, failedCmds) |
| 1604 | }(node, cmds) |
| 1605 | } |
| 1606 | |
| 1607 | wg.Wait() |
| 1608 | if len(failedCmds.m) == 0 { |
| 1609 | break |
| 1610 | } |
| 1611 | cmdsMap = failedCmds |
| 1612 | lastErr = cmdsFirstErr(cmds) |
| 1613 | } |
| 1614 | |
| 1615 | // Record pipeline operation duration |
| 1616 | if pipelineOpDurationCallback != nil { |
| 1617 | operationDuration := time.Since(operationStart) |
| 1618 | finalErr := cmdsFirstErr(cmds) |
| 1619 | if finalErr == nil { |
nothing calls this directly
no test coverage detected