(t *testing.T, e env)
| 3472 | } |
| 3473 | |
| 3474 | func testServerStreamingConcurrent(t *testing.T, e env) { |
| 3475 | te := newTest(t, e) |
| 3476 | te.startServer(concurrentSendServer{}) |
| 3477 | defer te.tearDown() |
| 3478 | |
| 3479 | cc := te.clientConn() |
| 3480 | tc := testgrpc.NewTestServiceClient(cc) |
| 3481 | |
| 3482 | doStreamingCall := func() { |
| 3483 | req := &testpb.StreamingOutputCallRequest{} |
| 3484 | ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) |
| 3485 | defer cancel() |
| 3486 | stream, err := tc.StreamingOutputCall(ctx, req) |
| 3487 | if err != nil { |
| 3488 | t.Errorf("%v.StreamingOutputCall(_) = _, %v, want <nil>", tc, err) |
| 3489 | return |
| 3490 | } |
| 3491 | var ngot int |
| 3492 | var buf bytes.Buffer |
| 3493 | for { |
| 3494 | reply, err := stream.Recv() |
| 3495 | if err == io.EOF { |
| 3496 | break |
| 3497 | } |
| 3498 | if err != nil { |
| 3499 | t.Fatal(err) |
| 3500 | } |
| 3501 | ngot++ |
| 3502 | if buf.Len() > 0 { |
| 3503 | buf.WriteByte(',') |
| 3504 | } |
| 3505 | buf.Write(reply.GetPayload().GetBody()) |
| 3506 | } |
| 3507 | if want := 10; ngot != want { |
| 3508 | t.Errorf("Got %d replies, want %d", ngot, want) |
| 3509 | } |
| 3510 | if got, want := buf.String(), "0,1,2,3,4,5,6,7,8,9"; got != want { |
| 3511 | t.Errorf("Got replies %q; want %q", got, want) |
| 3512 | } |
| 3513 | } |
| 3514 | |
| 3515 | var wg sync.WaitGroup |
| 3516 | for i := 0; i < 20; i++ { |
| 3517 | wg.Add(1) |
| 3518 | go func() { |
| 3519 | defer wg.Done() |
| 3520 | doStreamingCall() |
| 3521 | }() |
| 3522 | } |
| 3523 | wg.Wait() |
| 3524 | |
| 3525 | } |
| 3526 | |
| 3527 | func generatePayloadSizes() [][]int { |
| 3528 | reqSizes := [][]int{ |
no test coverage detected