MCPcopy
hub / github.com/IBM/sarama / main

Function main

tools/kafka-console-partitionconsumer/kafka-console-partitionconsumer.go:26–89  ·  view source on GitHub ↗
()

Source from the content-addressed store, hash-verified

24)
25
26func main() {
27 flag.Parse()
28
29 if *brokerList == "" {
30 printUsageErrorAndExit("You have to provide -brokers as a comma-separated list, or set the KAFKA_PEERS environment variable.")
31 }
32
33 if *topic == "" {
34 printUsageErrorAndExit("-topic is required")
35 }
36
37 if *partition == -1 {
38 printUsageErrorAndExit("-partition is required")
39 }
40
41 if *verbose {
42 sarama.Logger = logger
43 }
44
45 var (
46 initialOffset int64
47 offsetError error
48 )
49 switch *offset {
50 case "oldest":
51 initialOffset = sarama.OffsetOldest
52 case "newest":
53 initialOffset = sarama.OffsetNewest
54 default:
55 initialOffset, offsetError = strconv.ParseInt(*offset, 10, 64)
56 }
57
58 if offsetError != nil {
59 printUsageErrorAndExit("Invalid initial offset: %s", *offset)
60 }
61
62 c, err := sarama.NewConsumer(strings.Split(*brokerList, ","), nil)
63 if err != nil {
64 printErrorAndExit(69, "Failed to start consumer: %s", err)
65 }
66
67 pc, err := c.ConsumePartition(*topic, int32(*partition), initialOffset)
68 if err != nil {
69 printErrorAndExit(69, "Failed to start partition consumer: %s", err)
70 }
71
72 go func() {
73 signals := make(chan os.Signal, 1)
74 signal.Notify(signals, syscall.SIGTERM, os.Interrupt)
75 <-signals
76 pc.AsyncClose()
77 }()
78
79 for msg := range pc.Messages() {
80 fmt.Printf("Offset:\t%d\n", msg.Offset)
81 fmt.Printf("Key:\t%s\n", string(msg.Key))
82 fmt.Printf("Value:\t%s\n", string(msg.Value))
83 fmt.Println()

Callers

nothing calls this directly

Calls 9

ConsumePartitionMethod · 0.95
CloseMethod · 0.95
NewConsumerFunction · 0.92
printUsageErrorAndExitFunction · 0.70
printErrorAndExitFunction · 0.70
AsyncCloseMethod · 0.65
MessagesMethod · 0.65
PrintfMethod · 0.65
PrintlnMethod · 0.65

Tested by

no test coverage detected