File size: 1,245 Bytes
fe41391 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 |
import os
import shutil
import subprocess
import sys
import time
import pytest
import fsspec
from fsspec.implementations.cached import CachingFileSystem
@pytest.fixture()
def m():
"""
Fixture providing a memory filesystem.
"""
m = fsspec.filesystem("memory")
m.store.clear()
m.pseudo_dirs.clear()
m.pseudo_dirs.append("")
try:
yield m
finally:
m.store.clear()
m.pseudo_dirs.clear()
m.pseudo_dirs.append("")
@pytest.fixture
def ftp_writable(tmpdir):
"""
Fixture providing a writable FTP filesystem.
"""
pytest.importorskip("pyftpdlib")
from fsspec.implementations.ftp import FTPFileSystem
FTPFileSystem.clear_instance_cache() # remove lingering connections
CachingFileSystem.clear_instance_cache()
d = str(tmpdir)
with open(os.path.join(d, "out"), "wb") as f:
f.write(b"hello" * 10000)
P = subprocess.Popen(
[sys.executable, "-m", "pyftpdlib", "-d", d, "-u", "user", "-P", "pass", "-w"]
)
try:
time.sleep(1)
yield "localhost", 2121, "user", "pass"
finally:
P.terminate()
P.wait()
try:
shutil.rmtree(tmpdir)
except Exception:
pass
|