| 93 | } |
| 94 | |
| 95 | func (j *ProgressTracker) HandleProgress(ctx context.Context, pf transfer.ProgressFunc, pt StatusTracker) { |
| 96 | defer close(j.waitC) |
| 97 | // Instead of ticker, just delay |
| 98 | jobs := map[digest.Digest]*jobStatus{} |
| 99 | tc := time.NewTicker(time.Millisecond * 300) |
| 100 | defer tc.Stop() |
| 101 | |
| 102 | update := func() { |
| 103 | // TODO: Filter by references |
| 104 | active, err := pt.Active(ctx) |
| 105 | if err != nil { |
| 106 | log.G(ctx).WithError(err).Error("failed to get statuses for progress") |
| 107 | } |
| 108 | for dgst, job := range jobs { |
| 109 | switch job.state { |
| 110 | case jobAdded, jobInProgress: |
| 111 | status, ok := active.Status(job.name) |
| 112 | if ok { |
| 113 | if status.Offset > job.progress { |
| 114 | pf(transfer.Progress{ |
| 115 | Event: j.transferState, |
| 116 | Name: job.name, |
| 117 | Parents: job.parents, |
| 118 | Progress: status.Offset, |
| 119 | Total: status.Total, |
| 120 | Desc: &job.desc, |
| 121 | }) |
| 122 | job.progress = status.Offset |
| 123 | job.state = jobInProgress |
| 124 | jobs[dgst] = job |
| 125 | } |
| 126 | } else { |
| 127 | ok, err := pt.Check(ctx, job.desc.Digest) |
| 128 | if err != nil { |
| 129 | log.G(ctx).WithError(err).Error("failed to get statuses for progress") |
| 130 | } else if ok { |
| 131 | pf(transfer.Progress{ |
| 132 | Event: "complete", |
| 133 | Name: job.name, |
| 134 | Parents: job.parents, |
| 135 | Progress: job.desc.Size, |
| 136 | Total: job.desc.Size, |
| 137 | Desc: &job.desc, |
| 138 | }) |
| 139 | job.state = jobComplete |
| 140 | jobs[dgst] = job |
| 141 | } |
| 142 | } |
| 143 | case jobExtracting: |
| 144 | if job.progress == job.desc.Size { |
| 145 | pf(transfer.Progress{ |
| 146 | Event: "extracted", |
| 147 | Name: job.name, |
| 148 | Parents: job.parents, |
| 149 | Progress: job.desc.Size, |
| 150 | Total: job.desc.Size, |
| 151 | Desc: &job.desc, |
| 152 | }) |