(ctx context.Context, parent *consumerGroup, claims map[string][]int32, memberID string, generationID int32, handler ConsumerGroupHandler)
| 893 | } |
| 894 | |
| 895 | func newConsumerGroupSession(ctx context.Context, parent *consumerGroup, claims map[string][]int32, memberID string, generationID int32, handler ConsumerGroupHandler) (*consumerGroupSession, error) { |
| 896 | // init context |
| 897 | ctx, cancel := context.WithCancelCause(ctx) |
| 898 | |
| 899 | // init offset manager |
| 900 | offsets, err := newOffsetManagerFromClient(parent.groupID, memberID, generationID, parent.client, cancel) |
| 901 | if err != nil { |
| 902 | return nil, err |
| 903 | } |
| 904 | |
| 905 | // init session |
| 906 | sess := &consumerGroupSession{ |
| 907 | parent: parent, |
| 908 | memberID: memberID, |
| 909 | generationID: generationID, |
| 910 | handler: handler, |
| 911 | offsets: offsets, |
| 912 | claims: claims, |
| 913 | ctx: ctx, |
| 914 | cancel: cancel, |
| 915 | hbDying: make(chan none), |
| 916 | hbDead: make(chan none), |
| 917 | } |
| 918 | |
| 919 | // start heartbeat loop |
| 920 | go sess.heartbeatLoop() |
| 921 | |
| 922 | // create a POM for each claim |
| 923 | for topic, partitions := range claims { |
| 924 | for _, partition := range partitions { |
| 925 | pom, err := offsets.ManagePartition(topic, partition) |
| 926 | if err != nil { |
| 927 | _ = sess.release(false) |
| 928 | return nil, err |
| 929 | } |
| 930 | |
| 931 | // handle POM errors |
| 932 | go func(topic string, partition int32) { |
| 933 | for err := range pom.Errors() { |
| 934 | sess.parent.handleError(err, topic, partition) |
| 935 | } |
| 936 | }(topic, partition) |
| 937 | } |
| 938 | } |
| 939 | |
| 940 | // perform setup |
| 941 | if err := handler.Setup(sess); err != nil { |
| 942 | _ = sess.release(true) |
| 943 | return nil, err |
| 944 | } |
| 945 | |
| 946 | // start consuming each topic partition in its own goroutine |
| 947 | for topic, partitions := range claims { |
| 948 | for _, partition := range partitions { |
| 949 | sess.waitGroup.Add(1) // increment wait group before spawning goroutine |
| 950 | go func(topic string, partition int32) { |
| 951 | defer sess.waitGroup.Done() |
| 952 | // cancel the group session as soon as any of the consume calls return |
no test coverage detected