MCPcopy
hub / github.com/grafana/dskit / TestGrpcStatsMaxStreams

Function TestGrpcStatsMaxStreams

middleware/grpc_stats_test.go:309–396  ·  view source on GitHub ↗
(t *testing.T)

Source from the content-addressed store, hash-verified

307}
308
309func TestGrpcStatsMaxStreams(t *testing.T) {
310 const (
311 waitTime = 1 * time.Second
312 sleep = waitTime / 10
313 )
314 reg := prometheus.NewRegistry()
315
316 waitAndExpectMaxStreams := func(expected int) {
317 var err error
318 for endTime := time.Now().Add(waitTime); time.Now().Before(endTime); {
319 err = testutil.GatherAndCompare(reg, bytes.NewBufferString(fmt.Sprintf(`
320 # HELP grpc_concurrent_streams_by_conn_max The current number of concurrent streams in the connection with the most concurrent streams.
321 # TYPE grpc_concurrent_streams_by_conn_max gauge
322 grpc_concurrent_streams_by_conn_max{} %d
323 `, expected)), "grpc_concurrent_streams_by_conn_max")
324 if err == nil {
325 break
326 }
327 time.Sleep(sleep)
328 }
329 require.NoError(t, err)
330 }
331
332 received := promauto.With(reg).NewHistogramVec(prometheus.HistogramOpts{
333 Name: "received_payload_bytes",
334 Help: "Size of received gRPC messages",
335 Buckets: BodySizeBuckets,
336 }, []string{"method", "route"})
337
338 sent := promauto.With(reg).NewHistogramVec(prometheus.HistogramOpts{
339 Name: "sent_payload_bytes",
340 Help: "Size of sent gRPC",
341 Buckets: BodySizeBuckets,
342 }, []string{"method", "route"})
343
344 inflightRequests := promauto.With(reg).NewGaugeVec(prometheus.GaugeOpts{
345 Name: "inflight_requests",
346 Help: "Current number of inflight requests.",
347 }, []string{"method", "route"})
348
349 stats := NewStatsHandler(reg, received, sent, inflightRequests, true)
350
351 serv := grpc.NewServer(grpc.StatsHandler(stats), grpc.MaxRecvMsgSize(10e6))
352 defer serv.GracefulStop()
353
354 listener, err := net.Listen("tcp", "localhost:0")
355 require.NoError(t, err)
356
357 middleware_test.RegisterEchoServerServer(serv, &halfEcho{log: t.Log})
358
359 go func() {
360 require.NoError(t, serv.Serve(listener))
361 }()
362
363 // Create a connection with 10 streams
364 ctx, cancel1 := context.WithCancel(context.Background())
365 defer cancel1()
366 conn1 := launchConnWithStreams(t, ctx, listener, 10)

Callers

nothing calls this directly

Calls 8

RegisterEchoServerServerFunction · 0.92
NewStatsHandlerFunction · 0.85
launchConnWithStreamsFunction · 0.85
WithMethod · 0.80
AddMethod · 0.65
BeforeMethod · 0.65
SleepMethod · 0.65
CloseMethod · 0.65

Tested by

no test coverage detected