| 90 | } |
| 91 | |
| 92 | func (c *changeCache) wait(ctx context.Context, res *cachedChange) (*cachedChange, error) { |
| 93 | var err error |
| 94 | |
| 95 | select { |
| 96 | case <-res.waitCh: |
| 97 | err = res.err |
| 98 | default: |
| 99 | select { |
| 100 | case <-res.waitCh: |
| 101 | err = res.err |
| 102 | case <-ctx.Done(): |
| 103 | err = context.Cause(ctx) |
| 104 | } |
| 105 | } |
| 106 | |
| 107 | c.mu.Lock() |
| 108 | |
| 109 | res.waiters-- |
| 110 | if res.waiters == 0 { |
| 111 | res.cancel(err) |
| 112 | } |
| 113 | |
| 114 | if err == nil { |
| 115 | if existing := c.completedCalls[res.callKey]; existing != nil { |
| 116 | res = existing |
| 117 | } else { |
| 118 | c.completedCalls[res.callKey] = res |
| 119 | } |
| 120 | delete(c.ongoingCalls, res.callKey) |
| 121 | |
| 122 | res.refCount++ |
| 123 | c.mu.Unlock() |
| 124 | return res, nil |
| 125 | } |
| 126 | |
| 127 | if res.refCount == 0 && res.waiters == 0 { |
| 128 | if existing := c.ongoingCalls[res.callKey]; existing == res { |
| 129 | delete(c.ongoingCalls, res.callKey) |
| 130 | } |
| 131 | if existing := c.completedCalls[res.callKey]; existing == res { |
| 132 | delete(c.completedCalls, res.callKey) |
| 133 | } |
| 134 | } |
| 135 | |
| 136 | c.mu.Unlock() |
| 137 | return nil, err |
| 138 | } |
| 139 | |
| 140 | func (res *cachedChange) result() *ChangeWithStat { |
| 141 | if res == nil { |