Releases/removes closed POMs once they are clean (or when forced)
(force bool)
| 462 | |
| 463 | // Releases/removes closed POMs once they are clean (or when forced) |
| 464 | func (om *offsetManager) releasePOMs(force bool) (remaining int) { |
| 465 | om.pomsLock.Lock() |
| 466 | defer om.pomsLock.Unlock() |
| 467 | |
| 468 | for topic, topicManagers := range om.poms { |
| 469 | for partition, pom := range topicManagers { |
| 470 | pom.lock.Lock() |
| 471 | releaseDue := pom.done && (force || !pom.dirty) |
| 472 | pom.lock.Unlock() |
| 473 | |
| 474 | if releaseDue { |
| 475 | pom.release() |
| 476 | |
| 477 | delete(om.poms[topic], partition) |
| 478 | if len(om.poms[topic]) == 0 { |
| 479 | delete(om.poms, topic) |
| 480 | } |
| 481 | } |
| 482 | } |
| 483 | remaining += len(om.poms[topic]) |
| 484 | } |
| 485 | return |
| 486 | } |
| 487 | |
| 488 | func (om *offsetManager) findPOM(topic string, partition int32) *partitionOffsetManager { |
| 489 | om.pomsLock.RLock() |