| 125 | } |
| 126 | |
| 127 | func (cfg *KafkaConfig) Validate() error { |
| 128 | if cfg.Address == "" { |
| 129 | return ErrMissingKafkaAddress |
| 130 | } |
| 131 | if cfg.Topic == "" { |
| 132 | return ErrMissingKafkaTopic |
| 133 | } |
| 134 | if cfg.ProducerMaxRecordSizeBytes < minProducerRecordDataBytesLimit || cfg.ProducerMaxRecordSizeBytes > maxProducerRecordDataBytesLimit { |
| 135 | return ErrInvalidProducerMaxRecordSizeBytes |
| 136 | } |
| 137 | if (cfg.TargetConsumerLagAtStartup == 0 && cfg.MaxConsumerLagAtStartup != 0) || (cfg.TargetConsumerLagAtStartup != 0 && cfg.MaxConsumerLagAtStartup == 0) { |
| 138 | return ErrInconsistentConsumerLagAtStartup |
| 139 | } |
| 140 | if cfg.MaxConsumerLagAtStartup < cfg.TargetConsumerLagAtStartup { |
| 141 | return ErrInvalidMaxConsumerLagAtStartup |
| 142 | } |
| 143 | |
| 144 | if (cfg.SASLUsername == "") != (cfg.SASLPassword.String() == "") { |
| 145 | return ErrInconsistentSASLCredentials |
| 146 | } |
| 147 | |
| 148 | return nil |
| 149 | } |
| 150 | |
| 151 | // GetConsumerGroup returns the consumer group to use for the given instanceID and partitionID. |
| 152 | func (cfg *KafkaConfig) GetConsumerGroup(instanceID string, partitionID int32) string { |