MCPcopy
hub / github.com/grpc/grpc-go / TestE2ECallMetricsStreaming

Method TestE2ECallMetricsStreaming

orca/call_metrics_test.go:151–287  ·  view source on GitHub ↗

TestE2ECallMetricsStreaming tests the injection of custom backend metrics from the server application for a streaming RPC, and verifies that expected load reports are received at the client.

(t *testing.T)

Source from the content-addressed store, hash-verified

149// from the server application for a streaming RPC, and verifies that expected
150// load reports are received at the client.
151func (s) TestE2ECallMetricsStreaming(t *testing.T) {
152 tests := []struct {
153 desc string
154 injectMetrics bool
155 wantProto *v3orcapb.OrcaLoadReport
156 }{
157 {
158 desc: "with custom backend metrics",
159 injectMetrics: true,
160 wantProto: &v3orcapb.OrcaLoadReport{
161 CpuUtilization: 1.0,
162 MemUtilization: 0.5,
163 RequestCost: map[string]float64{"queryCost": 0.25},
164 Utilization: map[string]float64{"queueSize": 0.75},
165 },
166 },
167 {
168 desc: "with no custom backend metrics",
169 injectMetrics: false,
170 },
171 }
172
173 for _, test := range tests {
174 t.Run(test.desc, func(t *testing.T) {
175 // A server option to enable reporting of per-call backend metrics.
176 smr := orca.NewServerMetricsRecorder()
177 callMetricsServerOption := orca.CallMetricsServerOption(smr)
178 smr.SetCPUUtilization(1.0)
179
180 // An interceptor which injects custom backend metrics, added only
181 // when the injectMetrics field in the test is set.
182 injectingInterceptor := func(srv any, ss grpc.ServerStream, _ *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
183 recorder := orca.CallMetricsRecorderFromContext(ss.Context())
184 if recorder == nil {
185 err := errors.New("Failed to retrieve per-RPC custom metrics recorder from the RPC context")
186 t.Error(err)
187 return err
188 }
189 recorder.SetMemoryUtilization(0.5)
190 // This value will be overwritten by a write to the same metric
191 // from the server handler.
192 recorder.SetNamedUtilization("queueSize", 1.0)
193 return handler(srv, ss)
194 }
195
196 // A stub server whose streaming handler injects custom metrics, if
197 // the injectMetrics field in the test is set. It overwrites one of
198 // the values injected above, by the interceptor.
199 srv := stubserver.StubServer{
200 FullDuplexCallF: func(stream testgrpc.TestService_FullDuplexCallServer) error {
201 if test.injectMetrics {
202 recorder := orca.CallMetricsRecorderFromContext(stream.Context())
203 if recorder == nil {
204 err := errors.New("Failed to retrieve per-RPC custom metrics recorder from the RPC context")
205 t.Error(err)
206 return err
207 }
208 recorder.SetRequestCost("queryCost", 0.25)

Callers

nothing calls this directly

Calls 15

SetCPUUtilizationMethod · 0.95
StartServerMethod · 0.95
StopMethod · 0.95
FullDuplexCallMethod · 0.95
NewServerMetricsRecorderFunction · 0.92
CallMetricsServerOptionFunction · 0.92
ChainStreamInterceptorFunction · 0.92
NewClientFunction · 0.92
WithTransportCredentialsFunction · 0.92
NewCredentialsFunction · 0.92
ToLoadReportFunction · 0.92

Tested by

no test coverage detected