MCPcopy
hub / github.com/grpc/grpc-go / streamingLoop

Method streamingLoop

benchmark/worker/benchmark_client.go:296–340  ·  view source on GitHub ↗
(ctx context.Context, conns []*grpc.ClientConn, rpcCountPerConn int, reqSize int, respSize int, payloadType string, poissonLambda *float64)

Source from the content-addressed store, hash-verified

294}
295
296func (bc *benchmarkClient) streamingLoop(ctx context.Context, conns []*grpc.ClientConn, rpcCountPerConn int, reqSize int, respSize int, payloadType string, poissonLambda *float64) {
297 var doRPC func(testgrpc.BenchmarkService_StreamingCallClient, int, int) error
298 if payloadType == "bytebuf" {
299 doRPC = benchmark.DoByteBufStreamingRoundTrip
300 } else {
301 doRPC = benchmark.DoStreamingRoundTrip
302 }
303 for ic, conn := range conns {
304 // For each connection, create rpcCountPerConn goroutines to do rpc.
305 for j := 0; j < rpcCountPerConn; j++ {
306 c := testgrpc.NewBenchmarkServiceClient(conn)
307 stream, err := c.StreamingCall(context.Background())
308 if err != nil {
309 logger.Fatalf("%v.StreamingCall(_) = _, %v", c, err)
310 }
311 idx := ic*rpcCountPerConn + j
312 bc.lockingHistograms[idx].histogram = stats.NewHistogram(bc.histogramOptions)
313 if poissonLambda == nil { // Closed loop.
314 // Start goroutine on the created mutex and histogram.
315 go func(idx int) {
316 // TODO: do warm up if necessary.
317 // Now relying on worker client to reserve time to do warm up.
318 // The worker client needs to wait for some time after client is created,
319 // before starting benchmark.
320 for {
321 start := time.Now()
322 if err := doRPC(stream, reqSize, respSize); err != nil {
323 return
324 }
325 elapse := time.Since(start)
326 bc.lockingHistograms[idx].add(int64(elapse))
327 if ctx.Err() != nil {
328 return
329 }
330 }
331 }(idx)
332 } else { // Open loop.
333 timeBetweenRPCs := time.Duration((rand.ExpFloat64() / *poissonLambda) * float64(time.Second))
334 time.AfterFunc(timeBetweenRPCs, func() {
335 bc.poissonStreaming(stream, idx, reqSize, respSize, *poissonLambda, doRPC)
336 })
337 }
338 }
339 }
340}
341
342func (bc *benchmarkClient) poissonUnary(client testgrpc.BenchmarkServiceClient, idx int, reqSize int, respSize int, lambda float64) {
343 go func() {

Callers 1

performRPCsFunction · 0.80

Calls 7

StreamingCallMethod · 0.95
poissonStreamingMethod · 0.95
NewHistogramFunction · 0.92
NowMethod · 0.80
ErrMethod · 0.80
FatalfMethod · 0.65
addMethod · 0.45

Tested by

no test coverage detected