This example shows how to use the consumer to read messages from a single partition.
()
| 2081 | // This example shows how to use the consumer to read messages |
| 2082 | // from a single partition. |
| 2083 | func ExampleConsumer() { |
| 2084 | consumer, err := NewConsumer([]string{"localhost:9092"}, NewTestConfig()) |
| 2085 | if err != nil { |
| 2086 | panic(err) |
| 2087 | } |
| 2088 | |
| 2089 | defer func() { |
| 2090 | if err := consumer.Close(); err != nil { |
| 2091 | log.Fatalln(err) |
| 2092 | } |
| 2093 | }() |
| 2094 | |
| 2095 | partitionConsumer, err := consumer.ConsumePartition("my_topic", 0, OffsetNewest) |
| 2096 | if err != nil { |
| 2097 | panic(err) |
| 2098 | } |
| 2099 | |
| 2100 | defer func() { |
| 2101 | if err := partitionConsumer.Close(); err != nil { |
| 2102 | log.Fatalln(err) |
| 2103 | } |
| 2104 | }() |
| 2105 | |
| 2106 | // Trap SIGINT to trigger a shutdown. |
| 2107 | signals := make(chan os.Signal, 1) |
| 2108 | signal.Notify(signals, os.Interrupt) |
| 2109 | |
| 2110 | consumed := 0 |
| 2111 | ConsumerLoop: |
| 2112 | for { |
| 2113 | select { |
| 2114 | case msg := <-partitionConsumer.Messages(): |
| 2115 | log.Printf("Consumed message offset %d\n", msg.Offset) |
| 2116 | consumed++ |
| 2117 | case <-signals: |
| 2118 | break ConsumerLoop |
| 2119 | } |
| 2120 | } |
| 2121 | |
| 2122 | log.Printf("Consumed: %d\n", consumed) |
| 2123 | } |
| 2124 | |
| 2125 | func Test_partitionConsumer_parseResponse(t *testing.T) { |
| 2126 | type args struct { |
nothing calls this directly
no test coverage detected