CopyFrom executes the copy command sql and copies all of r to the PostgreSQL server. Note: context cancellation will only interrupt operations on the underlying PostgreSQL network connection. Reads on r could still block.
(ctx context.Context, r io.Reader, sql string)
| 1436 | // Note: context cancellation will only interrupt operations on the underlying PostgreSQL network connection. Reads on r |
| 1437 | // could still block. |
| 1438 | func (pgConn *PgConn) CopyFrom(ctx context.Context, r io.Reader, sql string) (CommandTag, error) { |
| 1439 | if err := pgConn.lock(); err != nil { |
| 1440 | return CommandTag{}, err |
| 1441 | } |
| 1442 | defer pgConn.unlock() |
| 1443 | |
| 1444 | if ctx != context.Background() { |
| 1445 | select { |
| 1446 | case <-ctx.Done(): |
| 1447 | return CommandTag{}, newContextAlreadyDoneError(ctx) |
| 1448 | default: |
| 1449 | } |
| 1450 | pgConn.contextWatcher.Watch(ctx) |
| 1451 | defer pgConn.contextWatcher.Unwatch() |
| 1452 | } |
| 1453 | |
| 1454 | // Send copy from query |
| 1455 | pgConn.frontend.SendQuery(&pgproto3.Query{String: sql}) |
| 1456 | err := pgConn.flushWithPotentialWriteReadDeadlock() |
| 1457 | if err != nil { |
| 1458 | pgConn.asyncClose() |
| 1459 | return CommandTag{}, err |
| 1460 | } |
| 1461 | |
| 1462 | // Send copy data |
| 1463 | abortCopyChan := make(chan struct{}) |
| 1464 | copyErrChan := make(chan error, 1) |
| 1465 | signalMessageChan := pgConn.signalMessage() |
| 1466 | var wg sync.WaitGroup |
| 1467 | wg.Go(func() { |
| 1468 | buf := iobufpool.Get(65536) |
| 1469 | defer iobufpool.Put(buf) |
| 1470 | (*buf)[0] = 'd' |
| 1471 | |
| 1472 | for { |
| 1473 | n, readErr := r.Read((*buf)[5:cap(*buf)]) |
| 1474 | if n > 0 { |
| 1475 | *buf = (*buf)[0 : n+5] |
| 1476 | pgio.SetInt32((*buf)[1:], int32(n+4)) |
| 1477 | |
| 1478 | writeErr := pgConn.frontend.SendUnbufferedEncodedCopyData(*buf) |
| 1479 | if writeErr != nil { |
| 1480 | // Write errors are always fatal, but we can't use asyncClose because we are in a different goroutine. Not |
| 1481 | // setting pgConn.status or closing pgConn.cleanupDone for the same reason. |
| 1482 | pgConn.conn.Close() |
| 1483 | |
| 1484 | copyErrChan <- writeErr |
| 1485 | return |
| 1486 | } |
| 1487 | } |
| 1488 | if readErr != nil { |
| 1489 | copyErrChan <- readErr |
| 1490 | return |
| 1491 | } |
| 1492 | |
| 1493 | select { |
| 1494 | case <-abortCopyChan: |
| 1495 | return |
nothing calls this directly
no test coverage detected