(ctx context.Context, src any, dst any, opts ...transfer.Opt)
| 86 | } |
| 87 | |
| 88 | func (p *proxyTransferrer) Transfer(ctx context.Context, src any, dst any, opts ...transfer.Opt) error { |
| 89 | o := &transfer.Config{} |
| 90 | for _, opt := range opts { |
| 91 | opt(o) |
| 92 | } |
| 93 | apiOpts := &transferapi.TransferOptions{} |
| 94 | if o.Progress != nil { |
| 95 | sid := tstreaming.GenerateID("progress") |
| 96 | stream, err := p.streamCreator.Create(ctx, sid) |
| 97 | if err != nil { |
| 98 | return err |
| 99 | } |
| 100 | apiOpts.ProgressStream = sid |
| 101 | go func() { |
| 102 | for { |
| 103 | a, err := stream.Recv() |
| 104 | if err != nil { |
| 105 | if !errors.Is(err, io.EOF) { |
| 106 | log.G(ctx).WithError(err).Error("progress stream failed to recv") |
| 107 | } |
| 108 | return |
| 109 | } |
| 110 | i, err := typeurl.UnmarshalAny(a) |
| 111 | if err != nil { |
| 112 | log.G(ctx).WithError(err).Warnf("failed to unmarshal progress object: %v", a.GetTypeUrl()) |
| 113 | } |
| 114 | switch v := i.(type) { |
| 115 | case *transfertypes.Progress: |
| 116 | var descp *ocispec.Descriptor |
| 117 | if v.Desc != nil { |
| 118 | desc := oci.DescriptorFromProto(v.Desc) |
| 119 | descp = &desc |
| 120 | } |
| 121 | o.Progress(transfer.Progress{ |
| 122 | Event: v.Event, |
| 123 | Name: v.Name, |
| 124 | Parents: v.Parents, |
| 125 | Progress: v.Progress, |
| 126 | Total: v.Total, |
| 127 | Desc: descp, |
| 128 | }) |
| 129 | default: |
| 130 | log.G(ctx).Warnf("unhandled progress object %T: %v", i, a.GetTypeUrl()) |
| 131 | } |
| 132 | } |
| 133 | }() |
| 134 | } |
| 135 | asrc, err := p.marshalAny(ctx, src) |
| 136 | if err != nil { |
| 137 | return err |
| 138 | } |
| 139 | adst, err := p.marshalAny(ctx, dst) |
| 140 | if err != nil { |
| 141 | return err |
| 142 | } |
| 143 | req := &transferapi.TransferRequest{ |
| 144 | Source: &anypb.Any{ |
| 145 | TypeUrl: asrc.GetTypeUrl(), |
nothing calls this directly
no test coverage detected