MCPcopy
hub / github.com/elastic/go-elasticsearch / main

Function main

_examples/bulk/kafka/kafka.go:93–258  ·  view source on GitHub ↗
()

Source from the content-addressed store, hash-verified

91}
92
93func main() {
94 log.SetFlags(0)
95
96 // Serve the "/debug/pprof/" and "/debug/vars" pages
97 //
98 go func() { log.Println(http.ListenAndServe("localhost:6060", nil)) }()
99
100 var (
101 wg sync.WaitGroup
102 ctx = context.Background()
103
104 producers []*producer.Producer
105 consumers []*consumer.Consumer
106 indexers []esutil.BulkIndexer
107 )
108
109 done := make(chan os.Signal, 1)
110 signal.Notify(done, os.Interrupt)
111 go func() { <-done; log.Println(""); os.Exit(0) }()
112
113 // Set up producers
114 //
115 for i := 1; i <= numProducers; i++ {
116 producers = append(producers,
117 &producer.Producer{
118 BrokerURL: brokerURL,
119 TopicName: topicName,
120 TopicParts: topicParts,
121 MessageRate: msgRate})
122 }
123
124 // Create an Elasticsearch client
125 //
126 es, err := elasticsearch.New(
127 elasticsearch.WithRetry(5, 502, 503, 504, 429),
128 elasticsearch.WithTransportOptions(
129 elastictransport.WithRetryBackoff(func(i int) time.Duration { return time.Duration(i) * 100 * time.Millisecond }),
130 elastictransport.WithMetrics(),
131 elastictransport.WithTransport(apmelasticsearch.WrapRoundTripper(http.DefaultTransport)),
132 ),
133 )
134 if err != nil {
135 log.Fatalf("Error creating client: %s", err)
136 }
137 // Export client metrics to the "expvar" package
138 expvar.Publish("go-elasticsearch", expvar.Func(func() interface{} { m, _ := es.Metrics(); return m }))
139
140 // Create the "stocks" index with correct mappings
141 //
142 res, err := es.Indices.Exists([]string{indexName})
143 if err != nil {
144 log.Fatalf("Error: Indices.Exists: %s", err)
145 }
146 res.Body.Close()
147 if res.StatusCode == 404 {
148 res, err := es.Indices.Create(
149 indexName,
150 es.Indices.Create.WithBody(strings.NewReader(mapping)),

Callers

nothing calls this directly

Calls 15

NewBulkIndexerFunction · 0.92
reportFunction · 0.85
DurationMethod · 0.80
IsErrorMethod · 0.80
PrintfMethod · 0.80
CreateTopicMethod · 0.80
CloseMethod · 0.65
AddMethod · 0.65
MetricsMethod · 0.45
ExistsMethod · 0.45
CreateMethod · 0.45
WithBodyMethod · 0.45

Tested by

no test coverage detected