()
| 24 | ) |
| 25 | |
| 26 | func main() { |
| 27 | flag.Parse() |
| 28 | |
| 29 | if *brokerList == "" { |
| 30 | printUsageErrorAndExit("You have to provide -brokers as a comma-separated list, or set the KAFKA_PEERS environment variable.") |
| 31 | } |
| 32 | |
| 33 | if *topic == "" { |
| 34 | printUsageErrorAndExit("-topic is required") |
| 35 | } |
| 36 | |
| 37 | if *partition == -1 { |
| 38 | printUsageErrorAndExit("-partition is required") |
| 39 | } |
| 40 | |
| 41 | if *verbose { |
| 42 | sarama.Logger = logger |
| 43 | } |
| 44 | |
| 45 | var ( |
| 46 | initialOffset int64 |
| 47 | offsetError error |
| 48 | ) |
| 49 | switch *offset { |
| 50 | case "oldest": |
| 51 | initialOffset = sarama.OffsetOldest |
| 52 | case "newest": |
| 53 | initialOffset = sarama.OffsetNewest |
| 54 | default: |
| 55 | initialOffset, offsetError = strconv.ParseInt(*offset, 10, 64) |
| 56 | } |
| 57 | |
| 58 | if offsetError != nil { |
| 59 | printUsageErrorAndExit("Invalid initial offset: %s", *offset) |
| 60 | } |
| 61 | |
| 62 | c, err := sarama.NewConsumer(strings.Split(*brokerList, ","), nil) |
| 63 | if err != nil { |
| 64 | printErrorAndExit(69, "Failed to start consumer: %s", err) |
| 65 | } |
| 66 | |
| 67 | pc, err := c.ConsumePartition(*topic, int32(*partition), initialOffset) |
| 68 | if err != nil { |
| 69 | printErrorAndExit(69, "Failed to start partition consumer: %s", err) |
| 70 | } |
| 71 | |
| 72 | go func() { |
| 73 | signals := make(chan os.Signal, 1) |
| 74 | signal.Notify(signals, syscall.SIGTERM, os.Interrupt) |
| 75 | <-signals |
| 76 | pc.AsyncClose() |
| 77 | }() |
| 78 | |
| 79 | for msg := range pc.Messages() { |
| 80 | fmt.Printf("Offset:\t%d\n", msg.Offset) |
| 81 | fmt.Printf("Key:\t%s\n", string(msg.Key)) |
| 82 | fmt.Printf("Value:\t%s\n", string(msg.Value)) |
| 83 | fmt.Println() |
nothing calls this directly
no test coverage detected