MCPcopy
hub / github.com/scrapy/scrapy / get_ftp_content_and_delete

Function get_ftp_content_and_delete

tests/test_pipeline_files.py:58–80  ·  view source on GitHub ↗
(
    path: str,
    host: str,
    port: int,
    username: str,
    password: str,
    use_active_mode: bool = False,
)

Source from the content-addressed store, hash-verified

56
57
58def get_ftp_content_and_delete(
59 path: str,
60 host: str,
61 port: int,
62 username: str,
63 password: str,
64 use_active_mode: bool = False,
65) -> bytes:
66 with FTP() as ftp:
67 ftp.connect(host, port)
68 ftp.login(username, password)
69 if use_active_mode:
70 ftp.set_pasv(False)
71 ftp_data: list[bytes] = []
72
73 def buffer_data(data: bytes) -> None:
74 ftp_data.append(data)
75
76 ftp.retrbinary(f"RETR {path}", buffer_data)
77 dirname, filename = split(path)
78 ftp.cwd(dirname)
79 ftp.delete(filename)
80 return b"".join(ftp_data)
81
82
83class DeferredFSFilesStore(FSFilesStore):

Callers 1

test_persistMethod · 0.85

Calls 4

loginMethod · 0.80
deleteMethod · 0.80
connectMethod · 0.45
joinMethod · 0.45

Tested by

no test coverage detected