(ctx context.Context, s stats.RPCStats, ai *attemptInfo)
| 196 | } |
| 197 | |
| 198 | func (h *clientMetricsHandler) processRPCEvent(ctx context.Context, s stats.RPCStats, ai *attemptInfo) { |
| 199 | switch st := s.(type) { |
| 200 | case *stats.Begin: |
| 201 | ci := getCallInfo(ctx) |
| 202 | if ci == nil { |
| 203 | logger.Error("ctx passed into client side stats handler metrics event handling has no metrics data present") |
| 204 | return |
| 205 | } |
| 206 | |
| 207 | attributes := []otelattribute.KeyValue{ |
| 208 | otelattribute.String("grpc.method", ci.method), |
| 209 | otelattribute.String("grpc.target", ci.target), |
| 210 | } |
| 211 | for _, o := range h.options.MetricsOptions.OptionalLabels { |
| 212 | if o == "grpc.client.call.custom" { |
| 213 | label := estats.CustomLabelFromContext(ctx) |
| 214 | attributes = append(attributes, otelattribute.String(o, label)) |
| 215 | } |
| 216 | } |
| 217 | |
| 218 | attrs := otelmetric.WithAttributeSet(otelattribute.NewSet(attributes...)) |
| 219 | h.clientMetrics.attemptStarted.Add(ctx, 1, attrs) |
| 220 | case *stats.OutPayload: |
| 221 | atomic.AddInt64(&ai.sentCompressedBytes, int64(st.CompressedLength)) |
| 222 | case *stats.InPayload: |
| 223 | atomic.AddInt64(&ai.recvCompressedBytes, int64(st.CompressedLength)) |
| 224 | case *stats.InHeader: |
| 225 | h.setLabelsFromPluginOption(ai, st.Header) |
| 226 | case *stats.InTrailer: |
| 227 | h.setLabelsFromPluginOption(ai, st.Trailer) |
| 228 | case *stats.End: |
| 229 | h.processRPCEnd(ctx, ai, st) |
| 230 | default: |
| 231 | } |
| 232 | } |
| 233 | |
| 234 | func (h *clientMetricsHandler) setLabelsFromPluginOption(ai *attemptInfo, incomingMetadata metadata.MD) { |
| 235 | if ai.pluginOptionLabels == nil && h.options.MetricsOptions.pluginOption != nil { |
no test coverage detected