MCPcopy
hub / github.com/nats-io/nats.go / setDefaults

Method setDefaults

jetstream/pull.go:1166–1212  ·  view source on GitHub ↗
(ordered bool)

Source from the content-addressed store, hash-verified

1164}
1165
1166func (consumeOpts *consumeOpts) setDefaults(ordered bool) error {
1167 // we cannot use both max messages and max bytes unless we're using max bytes as fetch size limiter
1168 if consumeOpts.MaxBytes != unset && consumeOpts.MaxMessages != unset && !consumeOpts.LimitSize {
1169 return errors.New("only one of MaxMessages and MaxBytes can be specified")
1170 }
1171 if consumeOpts.MaxBytes != unset && !consumeOpts.LimitSize {
1172 // we used PullMaxBytes setting, set MaxMessages to a high value
1173 consumeOpts.MaxMessages = defaultBatchMaxBytesOnly
1174 } else if consumeOpts.MaxMessages == unset {
1175 // otherwise, if max messages is not set, set it to default value
1176 consumeOpts.MaxMessages = DefaultMaxMessages
1177 }
1178 // if user did not set max bytes, set it to 0
1179 if consumeOpts.MaxBytes == unset {
1180 consumeOpts.MaxBytes = 0
1181 }
1182
1183 if consumeOpts.ThresholdMessages == 0 {
1184 // half of the max messages, rounded up
1185 consumeOpts.ThresholdMessages = int(math.Ceil(float64(consumeOpts.MaxMessages) / 2))
1186 }
1187 if consumeOpts.ThresholdBytes == 0 {
1188 // half of the max bytes, rounded up
1189 consumeOpts.ThresholdBytes = int(math.Ceil(float64(consumeOpts.MaxBytes) / 2))
1190 }
1191
1192 // set default heartbeats
1193 if consumeOpts.Heartbeat == unset {
1194 // by default, use 50% of expiry time
1195 consumeOpts.Heartbeat = consumeOpts.Expires / 2
1196 if ordered {
1197 // for ordered consumers, the default heartbeat is 5 seconds
1198 if consumeOpts.Expires < 10*time.Second {
1199 consumeOpts.Heartbeat = consumeOpts.Expires / 2
1200 } else {
1201 consumeOpts.Heartbeat = 5 * time.Second
1202 }
1203 } else if consumeOpts.Heartbeat > 30*time.Second {
1204 // cap the heartbeat to 30 seconds
1205 consumeOpts.Heartbeat = 30 * time.Second
1206 }
1207 }
1208 if consumeOpts.Heartbeat > consumeOpts.Expires/2 {
1209 return fmt.Errorf("%w: the value of Heartbeat must be less than 50%% of expiry", ErrInvalidOption)
1210 }
1211 return nil
1212}
1213
1214func (c *pullConsumer) getPinID() string {
1215 c.Lock()

Callers 2

parseConsumeOptsFunction · 0.95
parseMessagesOptsFunction · 0.95

Calls 1

ErrorfMethod · 0.80

Tested by

no test coverage detected