| 396 | } |
| 397 | |
| 398 | func launchConnWithStreams(t *testing.T, ctx context.Context, listener net.Listener, streamsCount int) *grpc.ClientConn { |
| 399 | t.Helper() |
| 400 | |
| 401 | conn, err := grpc.NewClient(listener.Addr().String(), grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(10e6), grpc.MaxCallSendMsgSize(10e6))) |
| 402 | require.NoError(t, err) |
| 403 | |
| 404 | fc := middleware_test.NewEchoServerClient(conn) |
| 405 | |
| 406 | streams := make([]middleware_test.EchoServer_ProcessClient, streamsCount) |
| 407 | for i := 0; i < streamsCount; i++ { |
| 408 | stream, err := fc.Process(context.Background()) |
| 409 | require.NoError(t, err) |
| 410 | streams[i] = stream |
| 411 | } |
| 412 | |
| 413 | // Keep streams alive by continuously sending messages |
| 414 | for i := 0; i < len(streams); i++ { |
| 415 | go func(streamIndex int) { |
| 416 | for { |
| 417 | select { |
| 418 | case <-ctx.Done(): |
| 419 | if err := streams[streamIndex].CloseSend(); err != nil { |
| 420 | t.Log("Error closing stream", err) |
| 421 | } |
| 422 | return |
| 423 | default: |
| 424 | msg := &middleware_test.Msg{ |
| 425 | Body: []byte(generateString(100)), // Small message to keep stream alive |
| 426 | } |
| 427 | if err := streams[streamIndex].Send(msg); err != nil { |
| 428 | return |
| 429 | } |
| 430 | if _, err := streams[streamIndex].Recv(); err != nil { |
| 431 | return |
| 432 | } |
| 433 | time.Sleep(10 * time.Millisecond) |
| 434 | } |
| 435 | } |
| 436 | }(i) |
| 437 | } |
| 438 | |
| 439 | return conn |
| 440 | } |
| 441 | |
| 442 | type halfEcho struct { |
| 443 | log func(args ...interface{}) |