MCPcopy
hub / github.com/IBM/sarama / main

Function main

tools/kafka-console-producer/kafka-console-producer.go:36–141  ·  tools/kafka-console-producer/kafka-console-producer.go::main
()

Source from the content-addressed store, hash-verified

34)
35
36func main() {
37 flag.Parse()
38
39 if *brokerList == "" {
40 printUsageErrorAndExit("no -brokers specified. Alternatively, set the KAFKA_PEERS environment variable")
41 }
42
43 if *topic == "" {
44 printUsageErrorAndExit("no -topic specified")
45 }
46
47 if *verbose {
48 sarama.Logger = logger
49 }
50
51 config := sarama.NewConfig()
52 config.Producer.RequiredAcks = sarama.WaitForAll
53 config.Producer.Return.Successes = true
54
55 if *tlsEnabled {
56 tlsConfig, err := tls.NewConfig(*tlsClientCert, *tlsClientKey)
57 if err != nil {
58 printErrorAndExit(69, "Failed to create TLS config: %s", err)
59 }
60
61 config.Net.TLS.Enable = true
62 config.Net.TLS.Config = tlsConfig
63 config.Net.TLS.Config.InsecureSkipVerify = *tlsSkipVerify
64 }
65
66 switch *partitioner {
67 case "":
68 if *partition >= 0 {
69 config.Producer.Partitioner = sarama.NewManualPartitioner
70 } else {
71 config.Producer.Partitioner = sarama.NewHashPartitioner
72 }
73 case "hash":
74 config.Producer.Partitioner = sarama.NewHashPartitioner
75 case "random":
76 config.Producer.Partitioner = sarama.NewRandomPartitioner
77 case "manual":
78 config.Producer.Partitioner = sarama.NewManualPartitioner
79 if *partition == -1 {
80 printUsageErrorAndExit("-partition is required when partitioning manually")
81 }
82 default:
83 printUsageErrorAndExit(fmt.Sprintf("Partitioner %s not supported.", *partitioner))
84 }
85
86 message := &sarama.ProducerMessage{Topic: *topic, Partition: int32(*partition)}
87
88 if *key != "" {
89 message.Key = sarama.StringEncoder(*key)
90 }
91
92 if *value != "" {
93 message.Value = sarama.StringEncoder(*value)

Callers

nothing calls this directly

Calls 12

CloseMethod · 0.95
SendMessageMethod · 0.95
NewConfigFunction · 0.92
NewConfigFunction · 0.92
StringEncoderTypeAlias · 0.92
ByteEncoderTypeAlias · 0.92
NewSyncProducerFunction · 0.92
stdinAvailableFunction · 0.85
printUsageErrorAndExitFunction · 0.70
printErrorAndExitFunction · 0.70
PrintlnMethod · 0.65
PrintfMethod · 0.65

Tested by

no test coverage detected