MCPcopy
hub / github.com/grpc/grpc-go / testServerStreamingConcurrent

Function testServerStreamingConcurrent

test/end2end_test.go:3474–3525  ·  view source on GitHub ↗
(t *testing.T, e env)

Source from the content-addressed store, hash-verified

3472}
3473
3474func testServerStreamingConcurrent(t *testing.T, e env) {
3475 te := newTest(t, e)
3476 te.startServer(concurrentSendServer{})
3477 defer te.tearDown()
3478
3479 cc := te.clientConn()
3480 tc := testgrpc.NewTestServiceClient(cc)
3481
3482 doStreamingCall := func() {
3483 req := &testpb.StreamingOutputCallRequest{}
3484 ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
3485 defer cancel()
3486 stream, err := tc.StreamingOutputCall(ctx, req)
3487 if err != nil {
3488 t.Errorf("%v.StreamingOutputCall(_) = _, %v, want <nil>", tc, err)
3489 return
3490 }
3491 var ngot int
3492 var buf bytes.Buffer
3493 for {
3494 reply, err := stream.Recv()
3495 if err == io.EOF {
3496 break
3497 }
3498 if err != nil {
3499 t.Fatal(err)
3500 }
3501 ngot++
3502 if buf.Len() > 0 {
3503 buf.WriteByte(',')
3504 }
3505 buf.Write(reply.GetPayload().GetBody())
3506 }
3507 if want := 10; ngot != want {
3508 t.Errorf("Got %d replies, want %d", ngot, want)
3509 }
3510 if got, want := buf.String(), "0,1,2,3,4,5,6,7,8,9"; got != want {
3511 t.Errorf("Got replies %q; want %q", got, want)
3512 }
3513 }
3514
3515 var wg sync.WaitGroup
3516 for i := 0; i < 20; i++ {
3517 wg.Add(1)
3518 go func() {
3519 defer wg.Done()
3520 doStreamingCall()
3521 }()
3522 }
3523 wg.Wait()
3524
3525}
3526
3527func generatePayloadSizes() [][]int {
3528 reqSizes := [][]int{

Callers 1

Calls 15

StreamingOutputCallMethod · 0.95
LenMethod · 0.95
StringMethod · 0.95
WaitMethod · 0.80
newTestFunction · 0.70
ErrorfMethod · 0.65
RecvMethod · 0.65
FatalMethod · 0.65
WriteMethod · 0.65
AddMethod · 0.65
startServerMethod · 0.45
tearDownMethod · 0.45

Tested by

no test coverage detected