| 88 | } |
| 89 | |
| 90 | func (g *grpcInflightLimitCheck) checkProbablyEarlyAbortedRequest(ctx context.Context, state *gprcInflightLimitCheckerState) func() { |
| 91 | return func() { |
| 92 | // If this function is running, we're in a corner case. Be very verbose in logging to help with debugging. |
| 93 | logger := state.logger(g.logger) |
| 94 | |
| 95 | level.Warn(g.logger).Log("msg", "gRPC request processing didn't start within 10s of receiving, checking the context state") |
| 96 | select { |
| 97 | case <-ctx.Done(): |
| 98 | level.Info(logger).Log("msg", "gRPC request context is done, assuming the request was cancelled before processing started, will call RPCCallFinished") |
| 99 | case <-state.headersProcessed: |
| 100 | level.Info(logger).Log("msg", "gRPC request processing has started, no need to call RPCCallFinished", "time_to_start_processing", time.Since(state.timestamp).String()) |
| 101 | return |
| 102 | default: |
| 103 | level.Info(logger).Log("msg", "gRPC request context is not done and processing hasn't started, will wait until context is done or processing starts") |
| 104 | |
| 105 | select { |
| 106 | case <-ctx.Done(): |
| 107 | level.Info(logger).Log("msg", "gRPC request context is finally done, assuming the request was cancelled before processing started, will call RPCCallFinished") |
| 108 | case <-state.headersProcessed: |
| 109 | level.Info(logger).Log("msg", "gRPC request processing has finally started, no need to call RPCCallFinished", "time_to_start_processing", time.Since(state.timestamp).String()) |
| 110 | return |
| 111 | } |
| 112 | } |
| 113 | |
| 114 | called := false |
| 115 | state.rpcCallFinishedOnce.Do(func() { |
| 116 | called = true |
| 117 | g.methodLimiter.RPCCallFinished(ctx) |
| 118 | }) |
| 119 | if called { |
| 120 | level.Info(logger).Log("msg", "called RPCCallFinished for gRPC request that never started processing") |
| 121 | } else { |
| 122 | level.Info(logger).Log("msg", "RPCCallFinishes was already called for this gRPC request, no need to call it again") |
| 123 | } |
| 124 | } |
| 125 | } |
| 126 | |
| 127 | func (g *grpcInflightLimitCheck) UnaryServerInterceptor(ctx context.Context, req any, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (any, error) { |
| 128 | finish, err := g.methodLimiter.RPCCallProcessing(ctx, info.FullMethod) |