(
path: str,
host: str,
port: int,
username: str,
password: str,
use_active_mode: bool = False,
)
| 56 | |
| 57 | |
| 58 | def get_ftp_content_and_delete( |
| 59 | path: str, |
| 60 | host: str, |
| 61 | port: int, |
| 62 | username: str, |
| 63 | password: str, |
| 64 | use_active_mode: bool = False, |
| 65 | ) -> bytes: |
| 66 | with FTP() as ftp: |
| 67 | ftp.connect(host, port) |
| 68 | ftp.login(username, password) |
| 69 | if use_active_mode: |
| 70 | ftp.set_pasv(False) |
| 71 | ftp_data: list[bytes] = [] |
| 72 | |
| 73 | def buffer_data(data: bytes) -> None: |
| 74 | ftp_data.append(data) |
| 75 | |
| 76 | ftp.retrbinary(f"RETR {path}", buffer_data) |
| 77 | dirname, filename = split(path) |
| 78 | ftp.cwd(dirname) |
| 79 | ftp.delete(filename) |
| 80 | return b"".join(ftp_data) |
| 81 | |
| 82 | |
| 83 | class DeferredFSFilesStore(FSFilesStore): |
no test coverage detected