(t *testing.T)
| 2436 | } |
| 2437 | |
| 2438 | func TestConnCopyFromCanceled(t *testing.T) { |
| 2439 | t.Parallel() |
| 2440 | |
| 2441 | ctx, cancel := context.WithTimeout(context.Background(), 120*time.Second) |
| 2442 | defer cancel() |
| 2443 | |
| 2444 | pgConn, err := pgconn.Connect(ctx, os.Getenv("PGX_TEST_DATABASE")) |
| 2445 | require.NoError(t, err) |
| 2446 | defer closeConn(t, pgConn) |
| 2447 | |
| 2448 | _, err = pgConn.Exec(ctx, `create temporary table foo( |
| 2449 | a int4, |
| 2450 | b varchar |
| 2451 | )`).ReadAll() |
| 2452 | require.NoError(t, err) |
| 2453 | |
| 2454 | r, w := io.Pipe() |
| 2455 | go func() { |
| 2456 | for i := range 1_000_000 { |
| 2457 | a := strconv.Itoa(i) |
| 2458 | b := "foo " + a + " bar" |
| 2459 | _, err := w.Write(fmt.Appendf(nil, "%s,\"%s\"\n", a, b)) |
| 2460 | if err != nil { |
| 2461 | return |
| 2462 | } |
| 2463 | time.Sleep(time.Microsecond) |
| 2464 | } |
| 2465 | }() |
| 2466 | |
| 2467 | ctx, cancel = context.WithTimeout(ctx, 100*time.Millisecond) |
| 2468 | copySql := "COPY foo FROM STDIN WITH (FORMAT csv)" |
| 2469 | if pgConn.ParameterStatus("crdb_version") != "" { |
| 2470 | copySql = "COPY foo FROM STDIN WITH CSV" |
| 2471 | } |
| 2472 | ct, err := pgConn.CopyFrom(ctx, r, copySql) |
| 2473 | cancel() |
| 2474 | assert.Equal(t, int64(0), ct.RowsAffected()) |
| 2475 | assert.Error(t, err) |
| 2476 | |
| 2477 | assert.True(t, pgConn.IsClosed()) |
| 2478 | select { |
| 2479 | case <-pgConn.CleanupDone(): |
| 2480 | case <-time.After(5 * time.Second): |
| 2481 | t.Fatal("Connection cleanup exceeded maximum time") |
| 2482 | } |
| 2483 | } |
| 2484 | |
| 2485 | func TestConnCopyFromPrecanceled(t *testing.T) { |
| 2486 | t.Parallel() |
nothing calls this directly
no test coverage detected