MCPcopy
hub / github.com/segmentio/kafka-go / newLocalClientAndTopic

Function newLocalClientAndTopic

compress/compress_test.go:452–490  ·  view source on GitHub ↗
()

Source from the content-addressed store, hash-verified

450}
451
452func newLocalClientAndTopic() (*kafka.Client, string, func()) {
453 topic := makeTopic()
454 client, shutdown := newLocalClient()
455
456 _, err := client.CreateTopics(context.Background(), &kafka.CreateTopicsRequest{
457 Topics: []kafka.TopicConfig{{
458 Topic: topic,
459 NumPartitions: 1,
460 ReplicationFactor: 1,
461 }},
462 })
463 if err != nil {
464 shutdown()
465 panic(err)
466 }
467
468 // Topic creation seems to be asynchronous. Metadata for the topic partition
469 // layout in the cluster is available in the controller before being synced
470 // with the other brokers, which causes "Error:[3] Unknown Topic Or Partition"
471 // when sending requests to the partition leaders.
472 for i := 0; i < 20; i++ {
473 r, err := client.Fetch(context.Background(), &kafka.FetchRequest{
474 Topic: topic,
475 Partition: 0,
476 Offset: 0,
477 })
478 if err == nil && r.Error == nil {
479 break
480 }
481 time.Sleep(100 * time.Millisecond)
482 }
483
484 return client, topic, func() {
485 client.DeleteTopics(context.Background(), &kafka.DeleteTopicsRequest{
486 Topics: []string{topic},
487 })
488 shutdown()
489 }
490}
491
492func newLocalClient() (*kafka.Client, func()) {
493 return newClient(kafka.TCP("127.0.0.1:9092"))

Callers 2

testCompressedMessagesFunction · 0.70

Calls 5

FetchMethod · 0.80
makeTopicFunction · 0.70
newLocalClientFunction · 0.70
CreateTopicsMethod · 0.45
DeleteTopicsMethod · 0.45

Tested by

no test coverage detected