MCPcopy
hub / github.com/IBM/sarama / TestJavaMurmur2PartitionerInterop

Function TestJavaMurmur2PartitionerInterop

functional_java_interop_test.go:400–477  ·  view source on GitHub ↗

TestJavaMurmur2PartitionerInterop verifies that Sarama's NewMurmur2Partitioner routes keyed messages to the same partitions as the Apache Kafka Java client's DefaultPartitioner (which uses murmur2 internally).

(t *testing.T)

Source from the content-addressed store, hash-verified

398// routes keyed messages to the same partitions as the Apache Kafka Java client's
399// DefaultPartitioner (which uses murmur2 internally).
400func TestJavaMurmur2PartitionerInterop(t *testing.T) {
401 setupFunctionalTest(t)
402 defer teardownFunctionalTest(t)
403
404 checkKafkaVersion(t, "0.10.0")
405
406 const topic = "test.64"
407 const numPartitions = int32(64)
408
409 keyedMessages := []struct{ key, value string }{
410 {"foo", "value-foo"},
411 {"bar", "value-bar"},
412 {"baz", "value-baz"},
413 {"kafka", "value-kafka"},
414 {"sarama", "value-sarama"},
415 {"hello", "value-hello"},
416 {"world", "value-world"},
417 }
418
419 // record the end offsets per partition before we produce anything
420 startOffsets := make([]int64, numPartitions)
421 for p := int32(0); p < numPartitions; p++ {
422 startOffsets[p] = endOffsetForPartition(t, topic, p)
423 }
424
425 // produce with Java's DefaultPartitioner (murmur2)
426 produceKeyedWithJava(t, topic, keyedMessages)
427
428 // determine which partition each key landed on via Java
429 javaPartition := make(map[string]int32, len(keyedMessages))
430 for p := int32(0); p < numPartitions; p++ {
431 endOff := endOffsetForPartition(t, topic, p)
432 if endOff <= startOffsets[p] {
433 continue
434 }
435 msgs := consumeKeyedFromPartition(t, topic, p, startOffsets[p], int(endOff-startOffsets[p]))
436 for key := range msgs {
437 javaPartition[key] = p
438 }
439 }
440 require.Len(t, javaPartition, len(keyedMessages), "Java producer did not produce all messages")
441
442 // record new end offsets before Sarama production
443 for p := int32(0); p < numPartitions; p++ {
444 startOffsets[p] = endOffsetForPartition(t, topic, p)
445 }
446
447 // produce the same keyed messages with Sarama's murmur2 partitioner
448 config := NewFunctionalTestConfig()
449 config.Producer.Partitioner = NewMurmur2Partitioner
450 config.Producer.Return.Successes = true
451 config.Producer.RequiredAcks = WaitForAll
452
453 producer, err := NewSyncProducer(FunctionalTestEnv.KafkaBrokerAddrs, config)
454 require.NoError(t, err)
455 defer producer.Close()
456
457 saramaPartition := make(map[string]int32, len(keyedMessages))

Callers

nothing calls this directly

Calls 12

CloseMethod · 0.95
SendMessageMethod · 0.95
setupFunctionalTestFunction · 0.85
teardownFunctionalTestFunction · 0.85
checkKafkaVersionFunction · 0.85
endOffsetForPartitionFunction · 0.85
produceKeyedWithJavaFunction · 0.85
NewFunctionalTestConfigFunction · 0.85
StringEncoderTypeAlias · 0.85
NewSyncProducerFunction · 0.70
LenMethod · 0.45

Tested by

no test coverage detected