(group, memberID string, generation int32, client Client, sessionCanceler context.CancelCauseFunc)
| 55 | } |
| 56 | |
| 57 | func newOffsetManagerFromClient(group, memberID string, generation int32, client Client, sessionCanceler context.CancelCauseFunc) (*offsetManager, error) { |
| 58 | // Check that we are not dealing with a closed Client before processing any other arguments |
| 59 | if client.Closed() { |
| 60 | return nil, ErrClosedClient |
| 61 | } |
| 62 | |
| 63 | conf := client.Config() |
| 64 | om := &offsetManager{ |
| 65 | client: client, |
| 66 | conf: conf, |
| 67 | group: group, |
| 68 | poms: make(map[string]map[int32]*partitionOffsetManager), |
| 69 | sessionCanceler: sessionCanceler, |
| 70 | |
| 71 | memberID: memberID, |
| 72 | generation: generation, |
| 73 | |
| 74 | closing: make(chan none), |
| 75 | closed: make(chan none), |
| 76 | } |
| 77 | if conf.Consumer.Group.InstanceId != "" { |
| 78 | om.groupInstanceId = &conf.Consumer.Group.InstanceId |
| 79 | } |
| 80 | if conf.Consumer.Offsets.AutoCommit.Enable { |
| 81 | om.ticker = time.NewTicker(conf.Consumer.Offsets.AutoCommit.Interval) |
| 82 | go withRecover(om.mainLoop) |
| 83 | } |
| 84 | |
| 85 | return om, nil |
| 86 | } |
| 87 | |
| 88 | func (om *offsetManager) ManagePartition(topic string, partition int32) (PartitionOffsetManager, error) { |
| 89 | pom, err := om.newPartitionOffsetManager(topic, partition) |
no test coverage detected