(bf stats.Features, unconstrained bool)
| 486 | } |
| 487 | |
| 488 | func setupStream(bf stats.Features, unconstrained bool) ([][]testgrpc.BenchmarkService_StreamingCallClient, *testpb.SimpleRequest, rpcCleanupFunc) { |
| 489 | clients, cleanup := makeClients(bf) |
| 490 | |
| 491 | streams := make([][]testgrpc.BenchmarkService_StreamingCallClient, bf.Connections) |
| 492 | ctx := context.Background() |
| 493 | if unconstrained { |
| 494 | md := metadata.Pairs(benchmark.UnconstrainedStreamingHeader, "1", benchmark.UnconstrainedStreamingDelayHeader, bf.SleepBetweenRPCs.String()) |
| 495 | ctx = metadata.NewOutgoingContext(ctx, md) |
| 496 | } |
| 497 | if bf.EnablePreloader { |
| 498 | md := metadata.Pairs(benchmark.PreloadMsgSizeHeader, strconv.Itoa(bf.RespSizeBytes), benchmark.UnconstrainedStreamingDelayHeader, bf.SleepBetweenRPCs.String()) |
| 499 | ctx = metadata.NewOutgoingContext(ctx, md) |
| 500 | } |
| 501 | for cn := 0; cn < bf.Connections; cn++ { |
| 502 | tc := clients[cn] |
| 503 | streams[cn] = make([]testgrpc.BenchmarkService_StreamingCallClient, bf.MaxConcurrentCalls) |
| 504 | for pos := 0; pos < bf.MaxConcurrentCalls; pos++ { |
| 505 | stream, err := tc.StreamingCall(ctx) |
| 506 | if err != nil { |
| 507 | logger.Fatalf("%v.StreamingCall(_) = _, %v", tc, err) |
| 508 | } |
| 509 | streams[cn][pos] = stream |
| 510 | } |
| 511 | } |
| 512 | |
| 513 | pl := benchmark.NewPayload(testpb.PayloadType_COMPRESSABLE, bf.ReqSizeBytes) |
| 514 | req := &testpb.SimpleRequest{ |
| 515 | ResponseType: pl.Type, |
| 516 | ResponseSize: int32(bf.RespSizeBytes), |
| 517 | Payload: pl, |
| 518 | } |
| 519 | |
| 520 | return streams, req, cleanup |
| 521 | } |
| 522 | |
| 523 | func prepareMessages(streams [][]testgrpc.BenchmarkService_StreamingCallClient, req *testpb.SimpleRequest) [][]*grpc.PreparedMsg { |
| 524 | preparedMsg := make([][]*grpc.PreparedMsg, len(streams)) |
no test coverage detected