MCPcopy
hub / github.com/IBM/sarama / newOffsetManagerFromClient

Function newOffsetManagerFromClient

offset_manager.go:57–86  ·  view source on GitHub ↗
(group, memberID string, generation int32, client Client, sessionCanceler context.CancelCauseFunc)

Source from the content-addressed store, hash-verified

55}
56
57func newOffsetManagerFromClient(group, memberID string, generation int32, client Client, sessionCanceler context.CancelCauseFunc) (*offsetManager, error) {
58 // Check that we are not dealing with a closed Client before processing any other arguments
59 if client.Closed() {
60 return nil, ErrClosedClient
61 }
62
63 conf := client.Config()
64 om := &offsetManager{
65 client: client,
66 conf: conf,
67 group: group,
68 poms: make(map[string]map[int32]*partitionOffsetManager),
69 sessionCanceler: sessionCanceler,
70
71 memberID: memberID,
72 generation: generation,
73
74 closing: make(chan none),
75 closed: make(chan none),
76 }
77 if conf.Consumer.Group.InstanceId != "" {
78 om.groupInstanceId = &conf.Consumer.Group.InstanceId
79 }
80 if conf.Consumer.Offsets.AutoCommit.Enable {
81 om.ticker = time.NewTicker(conf.Consumer.Offsets.AutoCommit.Interval)
82 go withRecover(om.mainLoop)
83 }
84
85 return om, nil
86}
87
88func (om *offsetManager) ManagePartition(topic string, partition int32) (PartitionOffsetManager, error) {
89 pom, err := om.newPartitionOffsetManager(topic, partition)

Callers 2

newConsumerGroupSessionFunction · 0.85

Calls 3

withRecoverFunction · 0.85
ClosedMethod · 0.65
ConfigMethod · 0.65

Tested by

no test coverage detected