(lower KafkaVersion)
| 499 | } |
| 500 | |
| 501 | func versionRange(lower KafkaVersion) []KafkaVersion { |
| 502 | // Get the test cluster version from the environment. If there is nothing |
| 503 | // there then assume the highest. |
| 504 | upper, err := ParseKafkaVersion(os.Getenv("KAFKA_VERSION")) |
| 505 | if err != nil { |
| 506 | upper = MaxVersion |
| 507 | } |
| 508 | |
| 509 | // KIP-896 dictates a minimum lower bound of 2.1 protocol for Kafka 4.0 onwards |
| 510 | if upper.IsAtLeast(V4_0_0_0) { |
| 511 | if !lower.IsAtLeast(V2_1_0_0) { |
| 512 | lower = V2_1_0_0 |
| 513 | } |
| 514 | } |
| 515 | |
| 516 | versions := make([]KafkaVersion, 0, len(fvtRangeVersions)) |
| 517 | for _, v := range fvtRangeVersions { |
| 518 | if !v.IsAtLeast(lower) { |
| 519 | continue |
| 520 | } |
| 521 | if !upper.IsAtLeast(v) { |
| 522 | return versions |
| 523 | } |
| 524 | versions = append(versions, v) |
| 525 | } |
| 526 | return versions |
| 527 | } |
| 528 | |
| 529 | func produceMsgs(t *testing.T, clientVersions []KafkaVersion, codecs []CompressionCodec, flush int, countPerVerCodec int, idempotent bool) []*ProducerMessage { |
| 530 | var ( |
no test coverage detected