MCPcopy
hub / github.com/IBM/sarama / ExampleConsumer

Function ExampleConsumer

consumer_test.go:2083–2123  ·  view source on GitHub ↗

This example shows how to use the consumer to read messages from a single partition.

()

Source from the content-addressed store, hash-verified

2081// This example shows how to use the consumer to read messages
2082// from a single partition.
2083func ExampleConsumer() {
2084 consumer, err := NewConsumer([]string{"localhost:9092"}, NewTestConfig())
2085 if err != nil {
2086 panic(err)
2087 }
2088
2089 defer func() {
2090 if err := consumer.Close(); err != nil {
2091 log.Fatalln(err)
2092 }
2093 }()
2094
2095 partitionConsumer, err := consumer.ConsumePartition("my_topic", 0, OffsetNewest)
2096 if err != nil {
2097 panic(err)
2098 }
2099
2100 defer func() {
2101 if err := partitionConsumer.Close(); err != nil {
2102 log.Fatalln(err)
2103 }
2104 }()
2105
2106 // Trap SIGINT to trigger a shutdown.
2107 signals := make(chan os.Signal, 1)
2108 signal.Notify(signals, os.Interrupt)
2109
2110 consumed := 0
2111ConsumerLoop:
2112 for {
2113 select {
2114 case msg := <-partitionConsumer.Messages():
2115 log.Printf("Consumed message offset %d\n", msg.Offset)
2116 consumed++
2117 case <-signals:
2118 break ConsumerLoop
2119 }
2120 }
2121
2122 log.Printf("Consumed: %d\n", consumed)
2123}
2124
2125func Test_partitionConsumer_parseResponse(t *testing.T) {
2126 type args struct {

Callers

nothing calls this directly

Calls 7

CloseMethod · 0.95
ConsumePartitionMethod · 0.95
NewConsumerFunction · 0.70
NewTestConfigFunction · 0.70
CloseMethod · 0.65
MessagesMethod · 0.65
PrintfMethod · 0.65

Tested by

no test coverage detected