(ctx context.Context, conns []*grpc.ClientConn, rpcCountPerConn int, reqSize int, respSize int, payloadType string, poissonLambda *float64)
| 294 | } |
| 295 | |
| 296 | func (bc *benchmarkClient) streamingLoop(ctx context.Context, conns []*grpc.ClientConn, rpcCountPerConn int, reqSize int, respSize int, payloadType string, poissonLambda *float64) { |
| 297 | var doRPC func(testgrpc.BenchmarkService_StreamingCallClient, int, int) error |
| 298 | if payloadType == "bytebuf" { |
| 299 | doRPC = benchmark.DoByteBufStreamingRoundTrip |
| 300 | } else { |
| 301 | doRPC = benchmark.DoStreamingRoundTrip |
| 302 | } |
| 303 | for ic, conn := range conns { |
| 304 | // For each connection, create rpcCountPerConn goroutines to do rpc. |
| 305 | for j := 0; j < rpcCountPerConn; j++ { |
| 306 | c := testgrpc.NewBenchmarkServiceClient(conn) |
| 307 | stream, err := c.StreamingCall(context.Background()) |
| 308 | if err != nil { |
| 309 | logger.Fatalf("%v.StreamingCall(_) = _, %v", c, err) |
| 310 | } |
| 311 | idx := ic*rpcCountPerConn + j |
| 312 | bc.lockingHistograms[idx].histogram = stats.NewHistogram(bc.histogramOptions) |
| 313 | if poissonLambda == nil { // Closed loop. |
| 314 | // Start goroutine on the created mutex and histogram. |
| 315 | go func(idx int) { |
| 316 | // TODO: do warm up if necessary. |
| 317 | // Now relying on worker client to reserve time to do warm up. |
| 318 | // The worker client needs to wait for some time after client is created, |
| 319 | // before starting benchmark. |
| 320 | for { |
| 321 | start := time.Now() |
| 322 | if err := doRPC(stream, reqSize, respSize); err != nil { |
| 323 | return |
| 324 | } |
| 325 | elapse := time.Since(start) |
| 326 | bc.lockingHistograms[idx].add(int64(elapse)) |
| 327 | if ctx.Err() != nil { |
| 328 | return |
| 329 | } |
| 330 | } |
| 331 | }(idx) |
| 332 | } else { // Open loop. |
| 333 | timeBetweenRPCs := time.Duration((rand.ExpFloat64() / *poissonLambda) * float64(time.Second)) |
| 334 | time.AfterFunc(timeBetweenRPCs, func() { |
| 335 | bc.poissonStreaming(stream, idx, reqSize, respSize, *poissonLambda, doRPC) |
| 336 | }) |
| 337 | } |
| 338 | } |
| 339 | } |
| 340 | } |
| 341 | |
| 342 | func (bc *benchmarkClient) poissonUnary(client testgrpc.BenchmarkServiceClient, idx int, reqSize int, respSize int, lambda float64) { |
| 343 | go func() { |
no test coverage detected