(topic string, partition int32)
| 86 | } |
| 87 | |
| 88 | func (om *offsetManager) ManagePartition(topic string, partition int32) (PartitionOffsetManager, error) { |
| 89 | pom, err := om.newPartitionOffsetManager(topic, partition) |
| 90 | if err != nil { |
| 91 | return nil, err |
| 92 | } |
| 93 | |
| 94 | om.pomsLock.Lock() |
| 95 | defer om.pomsLock.Unlock() |
| 96 | |
| 97 | topicManagers := om.poms[topic] |
| 98 | if topicManagers == nil { |
| 99 | topicManagers = make(map[int32]*partitionOffsetManager) |
| 100 | om.poms[topic] = topicManagers |
| 101 | } |
| 102 | |
| 103 | if topicManagers[partition] != nil { |
| 104 | return nil, ConfigurationError("That topic/partition is already being managed") |
| 105 | } |
| 106 | |
| 107 | topicManagers[partition] = pom |
| 108 | return pom, nil |
| 109 | } |
| 110 | |
| 111 | func (om *offsetManager) Close() error { |
| 112 | om.closeOnce.Do(func() { |
nothing calls this directly
no test coverage detected