MCPcopy
hub / github.com/jackc/pgx / TestConnCopyFromCanceled

Function TestConnCopyFromCanceled

pgconn/pgconn_test.go:2438–2483  ·  view source on GitHub ↗
(t *testing.T)

Source from the content-addressed store, hash-verified

2436}
2437
2438func TestConnCopyFromCanceled(t *testing.T) {
2439 t.Parallel()
2440
2441 ctx, cancel := context.WithTimeout(context.Background(), 120*time.Second)
2442 defer cancel()
2443
2444 pgConn, err := pgconn.Connect(ctx, os.Getenv("PGX_TEST_DATABASE"))
2445 require.NoError(t, err)
2446 defer closeConn(t, pgConn)
2447
2448 _, err = pgConn.Exec(ctx, `create temporary table foo(
2449 a int4,
2450 b varchar
2451 )`).ReadAll()
2452 require.NoError(t, err)
2453
2454 r, w := io.Pipe()
2455 go func() {
2456 for i := range 1_000_000 {
2457 a := strconv.Itoa(i)
2458 b := "foo " + a + " bar"
2459 _, err := w.Write(fmt.Appendf(nil, "%s,\"%s\"\n", a, b))
2460 if err != nil {
2461 return
2462 }
2463 time.Sleep(time.Microsecond)
2464 }
2465 }()
2466
2467 ctx, cancel = context.WithTimeout(ctx, 100*time.Millisecond)
2468 copySql := "COPY foo FROM STDIN WITH (FORMAT csv)"
2469 if pgConn.ParameterStatus("crdb_version") != "" {
2470 copySql = "COPY foo FROM STDIN WITH CSV"
2471 }
2472 ct, err := pgConn.CopyFrom(ctx, r, copySql)
2473 cancel()
2474 assert.Equal(t, int64(0), ct.RowsAffected())
2475 assert.Error(t, err)
2476
2477 assert.True(t, pgConn.IsClosed())
2478 select {
2479 case <-pgConn.CleanupDone():
2480 case <-time.After(5 * time.Second):
2481 t.Fatal("Connection cleanup exceeded maximum time")
2482 }
2483}
2484
2485func TestConnCopyFromPrecanceled(t *testing.T) {
2486 t.Parallel()

Callers

nothing calls this directly

Calls 11

ConnectFunction · 0.92
ReadAllMethod · 0.80
ParameterStatusMethod · 0.80
RowsAffectedMethod · 0.80
CleanupDoneMethod · 0.80
closeConnFunction · 0.70
ExecMethod · 0.65
CopyFromMethod · 0.65
WriteMethod · 0.45
ErrorMethod · 0.45
IsClosedMethod · 0.45

Tested by

no test coverage detected