| | import os |
| | import shutil |
| | import subprocess |
| | import sys |
| | import time |
| |
|
| | import pytest |
| |
|
| | import fsspec |
| | from fsspec.implementations.cached import CachingFileSystem |
| |
|
| |
|
| | @pytest.fixture() |
| | def m(): |
| | """ |
| | Fixture providing a memory filesystem. |
| | """ |
| | m = fsspec.filesystem("memory") |
| | m.store.clear() |
| | m.pseudo_dirs.clear() |
| | m.pseudo_dirs.append("") |
| | try: |
| | yield m |
| | finally: |
| | m.store.clear() |
| | m.pseudo_dirs.clear() |
| | m.pseudo_dirs.append("") |
| |
|
| |
|
| | @pytest.fixture |
| | def ftp_writable(tmpdir): |
| | """ |
| | Fixture providing a writable FTP filesystem. |
| | """ |
| | pytest.importorskip("pyftpdlib") |
| | from fsspec.implementations.ftp import FTPFileSystem |
| |
|
| | FTPFileSystem.clear_instance_cache() |
| | CachingFileSystem.clear_instance_cache() |
| | d = str(tmpdir) |
| | with open(os.path.join(d, "out"), "wb") as f: |
| | f.write(b"hello" * 10000) |
| | P = subprocess.Popen( |
| | [sys.executable, "-m", "pyftpdlib", "-d", d, "-u", "user", "-P", "pass", "-w"] |
| | ) |
| | try: |
| | time.sleep(1) |
| | yield "localhost", 2121, "user", "pass" |
| | finally: |
| | P.terminate() |
| | P.wait() |
| | try: |
| | shutil.rmtree(tmpdir) |
| | except Exception: |
| | pass |
| |
|