Pop blocks until an item is added to the queue, and then returns it. If multiple items are ready, they are returned in the order in which they were added/updated. The item is removed from the queue (and the store) before it is returned, so if you don't successfully process it, you need to add it ba
(process PopProcessFunc)
| 409 | // Pop returns a 'Deltas', which has a complete list of all the things |
| 410 | // that happened to the object (deltas) while it was sitting in the queue. |
| 411 | func (f *DeltaFIFO) Pop(process PopProcessFunc) (interface{}, error) { |
| 412 | f.lock.Lock() |
| 413 | defer f.lock.Unlock() |
| 414 | for { |
| 415 | for len(f.queue) == 0 { |
| 416 | // When the queue is empty, invocation of Pop() is blocked until new item is enqueued. |
| 417 | // When Close() is called, the f.closed is set and the condition is broadcasted. |
| 418 | // Which causes this loop to continue and return from the Pop(). |
| 419 | if f.IsClosed() { |
| 420 | return nil, FIFOClosedError |
| 421 | } |
| 422 | |
| 423 | f.cond.Wait() |
| 424 | } |
| 425 | id := f.queue[0] |
| 426 | f.queue = f.queue[1:] |
| 427 | if f.initialPopulationCount > 0 { |
| 428 | f.initialPopulationCount-- |
| 429 | } |
| 430 | item, ok := f.items[id] |
| 431 | if !ok { |
| 432 | // Item may have been deleted subsequently. |
| 433 | continue |
| 434 | } |
| 435 | delete(f.items, id) |
| 436 | err := process(item) |
| 437 | if e, ok := err.(ErrRequeue); ok { |
| 438 | f.addIfNotPresent(id, item) |
| 439 | err = e.Err |
| 440 | } |
| 441 | // Don't need to copyDeltas here, because we're transferring |
| 442 | // ownership to the caller. |
| 443 | return item, err |
| 444 | } |
| 445 | } |
| 446 | |
| 447 | // Replace will delete the contents of 'f', using instead the given map. |
| 448 | // 'f' takes ownership of the map, you should not reference the map again |