TestE2ECallMetricsStreaming tests the injection of custom backend metrics from the server application for a streaming RPC, and verifies that expected load reports are received at the client.
(t *testing.T)
| 149 | // from the server application for a streaming RPC, and verifies that expected |
| 150 | // load reports are received at the client. |
| 151 | func (s) TestE2ECallMetricsStreaming(t *testing.T) { |
| 152 | tests := []struct { |
| 153 | desc string |
| 154 | injectMetrics bool |
| 155 | wantProto *v3orcapb.OrcaLoadReport |
| 156 | }{ |
| 157 | { |
| 158 | desc: "with custom backend metrics", |
| 159 | injectMetrics: true, |
| 160 | wantProto: &v3orcapb.OrcaLoadReport{ |
| 161 | CpuUtilization: 1.0, |
| 162 | MemUtilization: 0.5, |
| 163 | RequestCost: map[string]float64{"queryCost": 0.25}, |
| 164 | Utilization: map[string]float64{"queueSize": 0.75}, |
| 165 | }, |
| 166 | }, |
| 167 | { |
| 168 | desc: "with no custom backend metrics", |
| 169 | injectMetrics: false, |
| 170 | }, |
| 171 | } |
| 172 | |
| 173 | for _, test := range tests { |
| 174 | t.Run(test.desc, func(t *testing.T) { |
| 175 | // A server option to enable reporting of per-call backend metrics. |
| 176 | smr := orca.NewServerMetricsRecorder() |
| 177 | callMetricsServerOption := orca.CallMetricsServerOption(smr) |
| 178 | smr.SetCPUUtilization(1.0) |
| 179 | |
| 180 | // An interceptor which injects custom backend metrics, added only |
| 181 | // when the injectMetrics field in the test is set. |
| 182 | injectingInterceptor := func(srv any, ss grpc.ServerStream, _ *grpc.StreamServerInfo, handler grpc.StreamHandler) error { |
| 183 | recorder := orca.CallMetricsRecorderFromContext(ss.Context()) |
| 184 | if recorder == nil { |
| 185 | err := errors.New("Failed to retrieve per-RPC custom metrics recorder from the RPC context") |
| 186 | t.Error(err) |
| 187 | return err |
| 188 | } |
| 189 | recorder.SetMemoryUtilization(0.5) |
| 190 | // This value will be overwritten by a write to the same metric |
| 191 | // from the server handler. |
| 192 | recorder.SetNamedUtilization("queueSize", 1.0) |
| 193 | return handler(srv, ss) |
| 194 | } |
| 195 | |
| 196 | // A stub server whose streaming handler injects custom metrics, if |
| 197 | // the injectMetrics field in the test is set. It overwrites one of |
| 198 | // the values injected above, by the interceptor. |
| 199 | srv := stubserver.StubServer{ |
| 200 | FullDuplexCallF: func(stream testgrpc.TestService_FullDuplexCallServer) error { |
| 201 | if test.injectMetrics { |
| 202 | recorder := orca.CallMetricsRecorderFromContext(stream.Context()) |
| 203 | if recorder == nil { |
| 204 | err := errors.New("Failed to retrieve per-RPC custom metrics recorder from the RPC context") |
| 205 | t.Error(err) |
| 206 | return err |
| 207 | } |
| 208 | recorder.SetRequestCost("queryCost", 0.25) |
nothing calls this directly
no test coverage detected