MCPcopy
hub / github.com/grpc/grpc-go / handleStreamDelayRead

Method handleStreamDelayRead

internal/transport/transport_test.go:285–368  ·  view source on GitHub ↗

handleStreamDelayRead delays reads so that the other side has to halt on stream-level flow control. This handler assumes dynamic flow control is turned off and assumes window sizes to be set to defaultWindowSize.

(t *testing.T, s *ServerStream)

Source from the content-addressed store, hash-verified

283// This handler assumes dynamic flow control is turned off and assumes window
284// sizes to be set to defaultWindowSize.
285func (h *testStreamHandler) handleStreamDelayRead(t *testing.T, s *ServerStream) {
286 req := expectedRequest
287 resp := expectedResponse
288 if s.Method() == "foo.Large" {
289 req = expectedRequestLarge
290 resp = expectedResponseLarge
291 }
292 var (
293 mu sync.Mutex
294 total int
295 )
296 s.wq.replenish = func(n int) {
297 mu.Lock()
298 total += n
299 mu.Unlock()
300 s.wq.realReplenish(n)
301 }
302 getTotal := func() int {
303 mu.Lock()
304 defer mu.Unlock()
305 return total
306 }
307 done := make(chan struct{})
308 defer close(done)
309 go func() {
310 for {
311 select {
312 // Prevent goroutine from leaking.
313 case <-done:
314 return
315 default:
316 }
317 if getTotal() == defaultWindowSize {
318 // Signal the client to start reading and
319 // thereby send window update.
320 close(h.notify)
321 return
322 }
323 runtime.Gosched()
324 }
325 }()
326 p := make([]byte, len(req))
327
328 // Let the other side run out of stream-level window before
329 // starting to read and thereby sending a window update.
330 timer := time.NewTimer(time.Second * 10)
331 select {
332 case <-h.getNotified:
333 timer.Stop()
334 case <-timer.C:
335 t.Errorf("Server timed-out.")
336 return
337 }
338 _, err := s.readTo(p)
339 if err != nil {
340 t.Errorf("s.Read(_) = _, %v, want _, <nil>", err)
341 return
342 }

Callers 1

startMethod · 0.95

Calls 13

StopMethod · 0.95
NewFunction · 0.92
newBufferSliceFunction · 0.85
realReplenishMethod · 0.80
NewTimerMethod · 0.80
readToMethod · 0.80
WriteStatusMethod · 0.80
MethodMethod · 0.65
ErrorfMethod · 0.65
EqualMethod · 0.65
WriteMethod · 0.65
LockMethod · 0.45

Tested by

no test coverage detected