MCPcopy
hub / github.com/IBM/sarama / main

Function main

tools/kafka-console-consumer/kafka-console-consumer.go:34–132  ·  view source on GitHub ↗
()

Source from the content-addressed store, hash-verified

32)
33
34func main() {
35 flag.Parse()
36
37 if *brokerList == "" {
38 printUsageErrorAndExit("You have to provide -brokers as a comma-separated list, or set the KAFKA_PEERS environment variable.")
39 }
40
41 if *topic == "" {
42 printUsageErrorAndExit("-topic is required")
43 }
44
45 if *verbose {
46 sarama.Logger = logger
47 }
48
49 var initialOffset int64
50 switch *offset {
51 case "oldest":
52 initialOffset = sarama.OffsetOldest
53 case "newest":
54 initialOffset = sarama.OffsetNewest
55 default:
56 printUsageErrorAndExit("-offset should be `oldest` or `newest`")
57 }
58
59 config := sarama.NewConfig()
60 if *tlsEnabled {
61 tlsConfig, err := tls.NewConfig(*tlsClientCert, *tlsClientKey)
62 if err != nil {
63 printErrorAndExit(69, "Failed to create TLS config: %s", err)
64 }
65
66 config.Net.TLS.Enable = true
67 config.Net.TLS.Config = tlsConfig
68 config.Net.TLS.Config.InsecureSkipVerify = *tlsSkipVerify
69 }
70
71 c, err := sarama.NewConsumer(strings.Split(*brokerList, ","), config)
72 if err != nil {
73 printErrorAndExit(69, "Failed to start consumer: %s", err)
74 }
75
76 partitionList, err := getPartitions(c)
77 if err != nil {
78 printErrorAndExit(69, "Failed to get the list of partitions: %s", err)
79 }
80
81 var (
82 messages = make(chan *sarama.ConsumerMessage, *bufferSize)
83 closing = make(chan struct{})
84 wg sync.WaitGroup
85 )
86
87 go func() {
88 signals := make(chan os.Signal, 1)
89 signal.Notify(signals, syscall.SIGTERM, os.Interrupt)
90 <-signals
91 logger.Println("Initiating shutdown of consumer...")

Callers

nothing calls this directly

Calls 14

ConsumePartitionMethod · 0.95
CloseMethod · 0.95
NewConfigFunction · 0.92
NewConfigFunction · 0.92
NewConsumerFunction · 0.92
getPartitionsFunction · 0.85
printUsageErrorAndExitFunction · 0.70
printErrorAndExitFunction · 0.70
PrintlnMethod · 0.65
AsyncCloseMethod · 0.65
DoneMethod · 0.65
MessagesMethod · 0.65

Tested by

no test coverage detected