(t *testing.T, client Client)
| 1135 | } |
| 1136 | |
| 1137 | func validateProducerMetrics(t *testing.T, client Client) { |
| 1138 | // Get the broker used by test1 topic |
| 1139 | var broker *Broker |
| 1140 | if partitions, err := client.Partitions("test.1"); err != nil { |
| 1141 | t.Error(err) |
| 1142 | } else { |
| 1143 | for _, partition := range partitions { |
| 1144 | if b, err := client.Leader("test.1", partition); err != nil { |
| 1145 | t.Error(err) |
| 1146 | } else { |
| 1147 | if broker != nil && b != broker { |
| 1148 | t.Fatal("Expected only one broker, got at least 2") |
| 1149 | } |
| 1150 | broker = b |
| 1151 | } |
| 1152 | } |
| 1153 | } |
| 1154 | |
| 1155 | metricValidators := newMetricValidators() |
| 1156 | noResponse := client.Config().Producer.RequiredAcks == NoResponse |
| 1157 | compressionEnabled := client.Config().Producer.Compression != CompressionNone |
| 1158 | |
| 1159 | // We are adding 10ms of latency to all requests with toxiproxy |
| 1160 | minRequestLatencyInMs := 10 |
| 1161 | if noResponse { |
| 1162 | // but when we do not wait for a response it can be less than 1ms |
| 1163 | minRequestLatencyInMs = 0 |
| 1164 | } |
| 1165 | |
| 1166 | // We read at least 1 byte from the broker |
| 1167 | metricValidators.registerForAllBrokers(broker, minCountMeterValidator("incoming-byte-rate", 1)) |
| 1168 | // in at least 3 global requests (1 for metadata request, 1 for offset request and N for produce request) |
| 1169 | metricValidators.register(minCountMeterValidator("request-rate", 3)) |
| 1170 | metricValidators.register(minCountHistogramValidator("request-size", 3)) |
| 1171 | metricValidators.register(minValHistogramValidator("request-size", 1)) |
| 1172 | // and at least 2 requests to the registered broker (offset + produces) |
| 1173 | metricValidators.registerForBroker(broker, minCountMeterValidator("request-rate", 2)) |
| 1174 | metricValidators.registerForBroker(broker, minCountHistogramValidator("request-size", 2)) |
| 1175 | metricValidators.registerForBroker(broker, minValHistogramValidator("request-size", 1)) |
| 1176 | metricValidators.registerForBroker(broker, minValHistogramValidator("request-latency-in-ms", minRequestLatencyInMs)) |
| 1177 | |
| 1178 | // We send at least 1 batch |
| 1179 | metricValidators.registerForGlobalAndTopic("test_1", minCountHistogramValidator("batch-size", 1)) |
| 1180 | metricValidators.registerForGlobalAndTopic("test_1", minValHistogramValidator("batch-size", 1)) |
| 1181 | if compressionEnabled { |
| 1182 | // We record compression ratios between [0.50,-10.00] (50-1000 with a histogram) for at least one "fake" record |
| 1183 | metricValidators.registerForGlobalAndTopic("test_1", minCountHistogramValidator("compression-ratio", 1)) |
| 1184 | if client.Config().Version.IsAtLeast(V0_11_0_0) { |
| 1185 | // slightly better compression with batching |
| 1186 | metricValidators.registerForGlobalAndTopic("test_1", minValHistogramValidator("compression-ratio", 30)) |
| 1187 | } else { |
| 1188 | metricValidators.registerForGlobalAndTopic("test_1", minValHistogramValidator("compression-ratio", 50)) |
| 1189 | } |
| 1190 | metricValidators.registerForGlobalAndTopic("test_1", maxValHistogramValidator("compression-ratio", 1000)) |
| 1191 | } else { |
| 1192 | // We record compression ratios of 1.00 (100 with a histogram). |
| 1193 | if !client.Config().Version.IsAtLeast(V0_11_0_0) { |
| 1194 | // MessageSet metrics are recorded per message. |
no test coverage detected