Spaces:
Sleeping
Sleeping
import os | |
import subprocess | |
import time | |
from multiprocessing import Process | |
import pytest | |
try: | |
pytest.importorskip("fuse") # noqa: E402 | |
except OSError: | |
# can succeed in importing fuse, but fail to load so | |
pytest.importorskip("nonexistent") # noqa: E402 | |
from fsspec.fuse import main, run | |
from fsspec.implementations.memory import MemoryFileSystem | |
def host_fuse(mountdir): | |
fs = MemoryFileSystem() | |
fs.touch("/mounted/testfile") | |
run(fs, "/mounted/", mountdir) | |
def test_basic(tmpdir, capfd): | |
mountdir = str(tmpdir.mkdir("mount")) | |
fuse_process = Process(target=host_fuse, args=(str(mountdir),)) | |
fuse_process.start() | |
try: | |
timeout = 10 | |
while True: | |
try: | |
# can fail with device not ready while waiting for fuse | |
if "testfile" in os.listdir(mountdir): | |
break | |
except Exception: | |
pass | |
timeout -= 1 | |
time.sleep(1) | |
if not timeout > 0: | |
import pdb | |
pdb.set_trace() | |
pytest.skip(msg="fuse didn't come live") | |
fn = os.path.join(mountdir, "test") | |
with open(fn, "wb") as f: | |
f.write(b"data") | |
with open(fn) as f: | |
assert f.read() == "data" | |
os.remove(fn) | |
os.mkdir(fn) | |
assert os.listdir(fn) == [] | |
os.mkdir(fn + "/inner") | |
with pytest.raises(OSError): | |
os.rmdir(fn) | |
captured = capfd.readouterr() | |
assert "Traceback" not in captured.out | |
assert "Traceback" not in captured.err | |
os.rmdir(fn + "/inner") | |
os.rmdir(fn) | |
finally: | |
fuse_process.terminate() | |
fuse_process.join(timeout=10) | |
if fuse_process.is_alive(): | |
fuse_process.kill() | |
fuse_process.join() | |
def host_mount_local(source_dir, mount_dir, debug_log): | |
main(["local", source_dir, mount_dir, "-l", debug_log, "--ready-file"]) | |
def mount_local(tmpdir): | |
source_dir = tmpdir.mkdir("source") | |
mount_dir = tmpdir.mkdir("local") | |
debug_log = tmpdir / "debug.log" | |
fuse_process = Process( | |
target=host_mount_local, args=(str(source_dir), str(mount_dir), str(debug_log)) | |
) | |
fuse_process.start() | |
ready_file = mount_dir / ".fuse_ready" | |
for _ in range(20): | |
if ready_file.exists() and open(ready_file).read() == b"ready": | |
break | |
time.sleep(0.1) | |
try: | |
yield (source_dir, mount_dir) | |
finally: | |
fuse_process.terminate() | |
fuse_process.join(timeout=10) | |
if fuse_process.is_alive(): | |
fuse_process.kill() | |
fuse_process.join() | |
def test_mount(mount_local): | |
source_dir, mount_dir = mount_local | |
assert os.listdir(mount_dir) == [] | |
assert os.listdir(source_dir) == [] | |
mount_dir.mkdir("a") | |
assert os.listdir(mount_dir) == ["a"] | |
assert os.listdir(source_dir) == ["a"] | |
def test_chmod(mount_local): | |
source_dir, mount_dir = mount_local | |
open(mount_dir / "text", "w").write("test") | |
assert os.listdir(source_dir) == ["text"] | |
cp = subprocess.run( | |
["cp", str(mount_dir / "text"), str(mount_dir / "new")], | |
stdout=subprocess.PIPE, | |
stderr=subprocess.PIPE, | |
check=False, | |
) | |
assert cp.stderr == b"" | |
assert cp.stdout == b"" | |
assert set(os.listdir(source_dir)) == {"text", "new"} | |
assert open(mount_dir / "new").read() == "test" | |
def test_seek_rw(mount_local): | |
source_dir, mount_dir = mount_local | |
fh = open(mount_dir / "text", "w") | |
fh.write("teST") | |
fh.seek(2) | |
fh.write("st") | |
fh.close() | |
fh = open(mount_dir / "text", "r") | |
assert fh.read() == "test" | |
fh.seek(2) | |
assert fh.read() == "st" | |
fh.close() | |