()
| 91 | } |
| 92 | |
| 93 | func main() { |
| 94 | log.SetFlags(0) |
| 95 | |
| 96 | // Serve the "/debug/pprof/" and "/debug/vars" pages |
| 97 | // |
| 98 | go func() { log.Println(http.ListenAndServe("localhost:6060", nil)) }() |
| 99 | |
| 100 | var ( |
| 101 | wg sync.WaitGroup |
| 102 | ctx = context.Background() |
| 103 | |
| 104 | producers []*producer.Producer |
| 105 | consumers []*consumer.Consumer |
| 106 | indexers []esutil.BulkIndexer |
| 107 | ) |
| 108 | |
| 109 | done := make(chan os.Signal, 1) |
| 110 | signal.Notify(done, os.Interrupt) |
| 111 | go func() { <-done; log.Println(""); os.Exit(0) }() |
| 112 | |
| 113 | // Set up producers |
| 114 | // |
| 115 | for i := 1; i <= numProducers; i++ { |
| 116 | producers = append(producers, |
| 117 | &producer.Producer{ |
| 118 | BrokerURL: brokerURL, |
| 119 | TopicName: topicName, |
| 120 | TopicParts: topicParts, |
| 121 | MessageRate: msgRate}) |
| 122 | } |
| 123 | |
| 124 | // Create an Elasticsearch client |
| 125 | // |
| 126 | es, err := elasticsearch.New( |
| 127 | elasticsearch.WithRetry(5, 502, 503, 504, 429), |
| 128 | elasticsearch.WithTransportOptions( |
| 129 | elastictransport.WithRetryBackoff(func(i int) time.Duration { return time.Duration(i) * 100 * time.Millisecond }), |
| 130 | elastictransport.WithMetrics(), |
| 131 | elastictransport.WithTransport(apmelasticsearch.WrapRoundTripper(http.DefaultTransport)), |
| 132 | ), |
| 133 | ) |
| 134 | if err != nil { |
| 135 | log.Fatalf("Error creating client: %s", err) |
| 136 | } |
| 137 | // Export client metrics to the "expvar" package |
| 138 | expvar.Publish("go-elasticsearch", expvar.Func(func() interface{} { m, _ := es.Metrics(); return m })) |
| 139 | |
| 140 | // Create the "stocks" index with correct mappings |
| 141 | // |
| 142 | res, err := es.Indices.Exists([]string{indexName}) |
| 143 | if err != nil { |
| 144 | log.Fatalf("Error: Indices.Exists: %s", err) |
| 145 | } |
| 146 | res.Body.Close() |
| 147 | if res.StatusCode == 404 { |
| 148 | res, err := es.Indices.Create( |
| 149 | indexName, |
| 150 | es.Indices.Create.WithBody(strings.NewReader(mapping)), |
nothing calls this directly
no test coverage detected