TestJavaMurmur2PartitionerInterop verifies that Sarama's NewMurmur2Partitioner routes keyed messages to the same partitions as the Apache Kafka Java client's DefaultPartitioner (which uses murmur2 internally).
(t *testing.T)
| 398 | // routes keyed messages to the same partitions as the Apache Kafka Java client's |
| 399 | // DefaultPartitioner (which uses murmur2 internally). |
| 400 | func TestJavaMurmur2PartitionerInterop(t *testing.T) { |
| 401 | setupFunctionalTest(t) |
| 402 | defer teardownFunctionalTest(t) |
| 403 | |
| 404 | checkKafkaVersion(t, "0.10.0") |
| 405 | |
| 406 | const topic = "test.64" |
| 407 | const numPartitions = int32(64) |
| 408 | |
| 409 | keyedMessages := []struct{ key, value string }{ |
| 410 | {"foo", "value-foo"}, |
| 411 | {"bar", "value-bar"}, |
| 412 | {"baz", "value-baz"}, |
| 413 | {"kafka", "value-kafka"}, |
| 414 | {"sarama", "value-sarama"}, |
| 415 | {"hello", "value-hello"}, |
| 416 | {"world", "value-world"}, |
| 417 | } |
| 418 | |
| 419 | // record the end offsets per partition before we produce anything |
| 420 | startOffsets := make([]int64, numPartitions) |
| 421 | for p := int32(0); p < numPartitions; p++ { |
| 422 | startOffsets[p] = endOffsetForPartition(t, topic, p) |
| 423 | } |
| 424 | |
| 425 | // produce with Java's DefaultPartitioner (murmur2) |
| 426 | produceKeyedWithJava(t, topic, keyedMessages) |
| 427 | |
| 428 | // determine which partition each key landed on via Java |
| 429 | javaPartition := make(map[string]int32, len(keyedMessages)) |
| 430 | for p := int32(0); p < numPartitions; p++ { |
| 431 | endOff := endOffsetForPartition(t, topic, p) |
| 432 | if endOff <= startOffsets[p] { |
| 433 | continue |
| 434 | } |
| 435 | msgs := consumeKeyedFromPartition(t, topic, p, startOffsets[p], int(endOff-startOffsets[p])) |
| 436 | for key := range msgs { |
| 437 | javaPartition[key] = p |
| 438 | } |
| 439 | } |
| 440 | require.Len(t, javaPartition, len(keyedMessages), "Java producer did not produce all messages") |
| 441 | |
| 442 | // record new end offsets before Sarama production |
| 443 | for p := int32(0); p < numPartitions; p++ { |
| 444 | startOffsets[p] = endOffsetForPartition(t, topic, p) |
| 445 | } |
| 446 | |
| 447 | // produce the same keyed messages with Sarama's murmur2 partitioner |
| 448 | config := NewFunctionalTestConfig() |
| 449 | config.Producer.Partitioner = NewMurmur2Partitioner |
| 450 | config.Producer.Return.Successes = true |
| 451 | config.Producer.RequiredAcks = WaitForAll |
| 452 | |
| 453 | producer, err := NewSyncProducer(FunctionalTestEnv.KafkaBrokerAddrs, config) |
| 454 | require.NoError(t, err) |
| 455 | defer producer.Close() |
| 456 | |
| 457 | saramaPartition := make(map[string]int32, len(keyedMessages)) |
nothing calls this directly
no test coverage detected