MCPcopy
hub / github.com/jackc/pgx / CopyFrom

Method CopyFrom

pgconn/pgconn.go:1438–1566  ·  view source on GitHub ↗

CopyFrom executes the copy command sql and copies all of r to the PostgreSQL server. Note: context cancellation will only interrupt operations on the underlying PostgreSQL network connection. Reads on r could still block.

(ctx context.Context, r io.Reader, sql string)

Source from the content-addressed store, hash-verified

1436// Note: context cancellation will only interrupt operations on the underlying PostgreSQL network connection. Reads on r
1437// could still block.
1438func (pgConn *PgConn) CopyFrom(ctx context.Context, r io.Reader, sql string) (CommandTag, error) {
1439 if err := pgConn.lock(); err != nil {
1440 return CommandTag{}, err
1441 }
1442 defer pgConn.unlock()
1443
1444 if ctx != context.Background() {
1445 select {
1446 case <-ctx.Done():
1447 return CommandTag{}, newContextAlreadyDoneError(ctx)
1448 default:
1449 }
1450 pgConn.contextWatcher.Watch(ctx)
1451 defer pgConn.contextWatcher.Unwatch()
1452 }
1453
1454 // Send copy from query
1455 pgConn.frontend.SendQuery(&pgproto3.Query{String: sql})
1456 err := pgConn.flushWithPotentialWriteReadDeadlock()
1457 if err != nil {
1458 pgConn.asyncClose()
1459 return CommandTag{}, err
1460 }
1461
1462 // Send copy data
1463 abortCopyChan := make(chan struct{})
1464 copyErrChan := make(chan error, 1)
1465 signalMessageChan := pgConn.signalMessage()
1466 var wg sync.WaitGroup
1467 wg.Go(func() {
1468 buf := iobufpool.Get(65536)
1469 defer iobufpool.Put(buf)
1470 (*buf)[0] = 'd'
1471
1472 for {
1473 n, readErr := r.Read((*buf)[5:cap(*buf)])
1474 if n > 0 {
1475 *buf = (*buf)[0 : n+5]
1476 pgio.SetInt32((*buf)[1:], int32(n+4))
1477
1478 writeErr := pgConn.frontend.SendUnbufferedEncodedCopyData(*buf)
1479 if writeErr != nil {
1480 // Write errors are always fatal, but we can't use asyncClose because we are in a different goroutine. Not
1481 // setting pgConn.status or closing pgConn.cleanupDone for the same reason.
1482 pgConn.conn.Close()
1483
1484 copyErrChan <- writeErr
1485 return
1486 }
1487 }
1488 if readErr != nil {
1489 copyErrChan <- readErr
1490 return
1491 }
1492
1493 select {
1494 case <-abortCopyChan:
1495 return

Callers

nothing calls this directly

Calls 15

lockMethod · 0.95
unlockMethod · 0.95
asyncCloseMethod · 0.95
signalMessageMethod · 0.95
receiveMessageMethod · 0.95
makeCommandTagMethod · 0.95
GetFunction · 0.92
PutFunction · 0.92
SetInt32Function · 0.92
normalizeTimeoutErrorFunction · 0.85

Tested by

no test coverage detected