MCPcopy
hub / github.com/IBM/sarama / newConsumerGroupSession

Function newConsumerGroupSession

consumer_group.go:895–978  ·  view source on GitHub ↗
(ctx context.Context, parent *consumerGroup, claims map[string][]int32, memberID string, generationID int32, handler ConsumerGroupHandler)

Source from the content-addressed store, hash-verified

893}
894
895func newConsumerGroupSession(ctx context.Context, parent *consumerGroup, claims map[string][]int32, memberID string, generationID int32, handler ConsumerGroupHandler) (*consumerGroupSession, error) {
896 // init context
897 ctx, cancel := context.WithCancelCause(ctx)
898
899 // init offset manager
900 offsets, err := newOffsetManagerFromClient(parent.groupID, memberID, generationID, parent.client, cancel)
901 if err != nil {
902 return nil, err
903 }
904
905 // init session
906 sess := &consumerGroupSession{
907 parent: parent,
908 memberID: memberID,
909 generationID: generationID,
910 handler: handler,
911 offsets: offsets,
912 claims: claims,
913 ctx: ctx,
914 cancel: cancel,
915 hbDying: make(chan none),
916 hbDead: make(chan none),
917 }
918
919 // start heartbeat loop
920 go sess.heartbeatLoop()
921
922 // create a POM for each claim
923 for topic, partitions := range claims {
924 for _, partition := range partitions {
925 pom, err := offsets.ManagePartition(topic, partition)
926 if err != nil {
927 _ = sess.release(false)
928 return nil, err
929 }
930
931 // handle POM errors
932 go func(topic string, partition int32) {
933 for err := range pom.Errors() {
934 sess.parent.handleError(err, topic, partition)
935 }
936 }(topic, partition)
937 }
938 }
939
940 // perform setup
941 if err := handler.Setup(sess); err != nil {
942 _ = sess.release(true)
943 return nil, err
944 }
945
946 // start consuming each topic partition in its own goroutine
947 for topic, partitions := range claims {
948 for _, partition := range partitions {
949 sess.waitGroup.Add(1) // increment wait group before spawning goroutine
950 go func(topic string, partition int32) {
951 defer sess.waitGroup.Done()
952 // cancel the group session as soon as any of the consume calls return

Callers 1

newSessionMethod · 0.85

Calls 12

heartbeatLoopMethod · 0.95
releaseMethod · 0.95
consumeMethod · 0.95
StopMethod · 0.80
ManagePartitionMethod · 0.65
ErrorsMethod · 0.65
SetupMethod · 0.65
DoneMethod · 0.65
PartitionNotReadableMethod · 0.65
handleErrorMethod · 0.45
AddMethod · 0.45

Tested by

no test coverage detected