(ordered bool)
| 1164 | } |
| 1165 | |
| 1166 | func (consumeOpts *consumeOpts) setDefaults(ordered bool) error { |
| 1167 | // we cannot use both max messages and max bytes unless we're using max bytes as fetch size limiter |
| 1168 | if consumeOpts.MaxBytes != unset && consumeOpts.MaxMessages != unset && !consumeOpts.LimitSize { |
| 1169 | return errors.New("only one of MaxMessages and MaxBytes can be specified") |
| 1170 | } |
| 1171 | if consumeOpts.MaxBytes != unset && !consumeOpts.LimitSize { |
| 1172 | // we used PullMaxBytes setting, set MaxMessages to a high value |
| 1173 | consumeOpts.MaxMessages = defaultBatchMaxBytesOnly |
| 1174 | } else if consumeOpts.MaxMessages == unset { |
| 1175 | // otherwise, if max messages is not set, set it to default value |
| 1176 | consumeOpts.MaxMessages = DefaultMaxMessages |
| 1177 | } |
| 1178 | // if user did not set max bytes, set it to 0 |
| 1179 | if consumeOpts.MaxBytes == unset { |
| 1180 | consumeOpts.MaxBytes = 0 |
| 1181 | } |
| 1182 | |
| 1183 | if consumeOpts.ThresholdMessages == 0 { |
| 1184 | // half of the max messages, rounded up |
| 1185 | consumeOpts.ThresholdMessages = int(math.Ceil(float64(consumeOpts.MaxMessages) / 2)) |
| 1186 | } |
| 1187 | if consumeOpts.ThresholdBytes == 0 { |
| 1188 | // half of the max bytes, rounded up |
| 1189 | consumeOpts.ThresholdBytes = int(math.Ceil(float64(consumeOpts.MaxBytes) / 2)) |
| 1190 | } |
| 1191 | |
| 1192 | // set default heartbeats |
| 1193 | if consumeOpts.Heartbeat == unset { |
| 1194 | // by default, use 50% of expiry time |
| 1195 | consumeOpts.Heartbeat = consumeOpts.Expires / 2 |
| 1196 | if ordered { |
| 1197 | // for ordered consumers, the default heartbeat is 5 seconds |
| 1198 | if consumeOpts.Expires < 10*time.Second { |
| 1199 | consumeOpts.Heartbeat = consumeOpts.Expires / 2 |
| 1200 | } else { |
| 1201 | consumeOpts.Heartbeat = 5 * time.Second |
| 1202 | } |
| 1203 | } else if consumeOpts.Heartbeat > 30*time.Second { |
| 1204 | // cap the heartbeat to 30 seconds |
| 1205 | consumeOpts.Heartbeat = 30 * time.Second |
| 1206 | } |
| 1207 | } |
| 1208 | if consumeOpts.Heartbeat > consumeOpts.Expires/2 { |
| 1209 | return fmt.Errorf("%w: the value of Heartbeat must be less than 50%% of expiry", ErrInvalidOption) |
| 1210 | } |
| 1211 | return nil |
| 1212 | } |
| 1213 | |
| 1214 | func (c *pullConsumer) getPinID() string { |
| 1215 | c.Lock() |
no test coverage detected