AddShardIdx records that a response has been received for the given shard index. This should be called each time a job completes for a particular shard. Returns the current completedThroughSeconds value.
(shardIdx int)
| 63 | // This should be called each time a job completes for a particular shard. |
| 64 | // Returns the current completedThroughSeconds value. |
| 65 | func (c *CompletionTracker) AddShardIdx(shardIdx int) uint32 { |
| 66 | // we haven't received shards yet |
| 67 | if len(c.shards) == 0 { |
| 68 | // if shardIdx doesn't fit in foundResponses then alloc a new slice and copy foundResponses forward |
| 69 | if shardIdx >= len(c.foundResponses) { |
| 70 | temp := make([]int, shardIdx+1) |
| 71 | copy(temp, c.foundResponses) |
| 72 | c.foundResponses = temp |
| 73 | } |
| 74 | |
| 75 | // and record this idx for when we get shards |
| 76 | c.foundResponses[shardIdx]++ |
| 77 | |
| 78 | return 0 |
| 79 | } |
| 80 | |
| 81 | // |
| 82 | if shardIdx >= len(c.foundResponses) { |
| 83 | return c.completedThroughSeconds |
| 84 | } |
| 85 | |
| 86 | c.foundResponses[shardIdx]++ |
| 87 | c.incrementCurShardIfComplete() |
| 88 | |
| 89 | return c.completedThroughSeconds |
| 90 | } |
| 91 | |
| 92 | // CompletedThroughSeconds returns the current completion timestamp. |
| 93 | // All results up to this timestamp are guaranteed to be complete and can be released. |