Spaces:
Running
Running
File size: 5,631 Bytes
b72ab63 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 |
import datetime
import logging
import os
import types
import uuid
from stat import S_ISDIR, S_ISLNK
import paramiko
from .. import AbstractFileSystem
from ..utils import infer_storage_options
logger = logging.getLogger("fsspec.sftp")
class SFTPFileSystem(AbstractFileSystem):
"""Files over SFTP/SSH
Peer-to-peer filesystem over SSH using paramiko.
Note: if using this with the ``open`` or ``open_files``, with full URLs,
there is no way to tell if a path is relative, so all paths are assumed
to be absolute.
"""
protocol = "sftp", "ssh"
def __init__(self, host, **ssh_kwargs):
"""
Parameters
----------
host: str
Hostname or IP as a string
temppath: str
Location on the server to put files, when within a transaction
ssh_kwargs: dict
Parameters passed on to connection. See details in
https://docs.paramiko.org/en/3.3/api/client.html#paramiko.client.SSHClient.connect
May include port, username, password...
"""
if self._cached:
return
super().__init__(**ssh_kwargs)
self.temppath = ssh_kwargs.pop("temppath", "/tmp") # remote temp directory
self.host = host
self.ssh_kwargs = ssh_kwargs
self._connect()
def _connect(self):
logger.debug("Connecting to SFTP server %s", self.host)
self.client = paramiko.SSHClient()
self.client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
self.client.connect(self.host, **self.ssh_kwargs)
self.ftp = self.client.open_sftp()
@classmethod
def _strip_protocol(cls, path):
return infer_storage_options(path)["path"]
@staticmethod
def _get_kwargs_from_urls(urlpath):
out = infer_storage_options(urlpath)
out.pop("path", None)
out.pop("protocol", None)
return out
def mkdir(self, path, create_parents=True, mode=511):
logger.debug("Creating folder %s", path)
if self.exists(path):
raise FileExistsError(f"File exists: {path}")
if create_parents:
self.makedirs(path)
else:
self.ftp.mkdir(path, mode)
def makedirs(self, path, exist_ok=False, mode=511):
if self.exists(path) and not exist_ok:
raise FileExistsError(f"File exists: {path}")
parts = path.split("/")
new_path = "/" if path[:1] == "/" else ""
for part in parts:
if part:
new_path = f"{new_path}/{part}" if new_path else part
if not self.exists(new_path):
self.ftp.mkdir(new_path, mode)
def rmdir(self, path):
logger.debug("Removing folder %s", path)
self.ftp.rmdir(path)
def info(self, path):
stat = self._decode_stat(self.ftp.stat(path))
stat["name"] = path
return stat
@staticmethod
def _decode_stat(stat, parent_path=None):
if S_ISDIR(stat.st_mode):
t = "directory"
elif S_ISLNK(stat.st_mode):
t = "link"
else:
t = "file"
out = {
"name": "",
"size": stat.st_size,
"type": t,
"uid": stat.st_uid,
"gid": stat.st_gid,
"time": datetime.datetime.fromtimestamp(
stat.st_atime, tz=datetime.timezone.utc
),
"mtime": datetime.datetime.fromtimestamp(
stat.st_mtime, tz=datetime.timezone.utc
),
}
if parent_path:
out["name"] = "/".join([parent_path.rstrip("/"), stat.filename])
return out
def ls(self, path, detail=False):
logger.debug("Listing folder %s", path)
stats = [self._decode_stat(stat, path) for stat in self.ftp.listdir_iter(path)]
if detail:
return stats
else:
paths = [stat["name"] for stat in stats]
return sorted(paths)
def put(self, lpath, rpath, callback=None, **kwargs):
logger.debug("Put file %s into %s", lpath, rpath)
self.ftp.put(lpath, rpath)
def get_file(self, rpath, lpath, **kwargs):
if self.isdir(rpath):
os.makedirs(lpath, exist_ok=True)
else:
self.ftp.get(self._strip_protocol(rpath), lpath)
def _open(self, path, mode="rb", block_size=None, **kwargs):
"""
block_size: int or None
If 0, no buffering, if 1, line buffering, if >1, buffer that many
bytes, if None use default from paramiko.
"""
logger.debug("Opening file %s", path)
if kwargs.get("autocommit", True) is False:
# writes to temporary file, move on commit
path2 = "/".join([self.temppath, str(uuid.uuid4())])
f = self.ftp.open(path2, mode, bufsize=block_size if block_size else -1)
f.temppath = path2
f.targetpath = path
f.fs = self
f.commit = types.MethodType(commit_a_file, f)
f.discard = types.MethodType(discard_a_file, f)
else:
f = self.ftp.open(path, mode, bufsize=block_size if block_size else -1)
return f
def _rm(self, path):
if self.isdir(path):
self.ftp.rmdir(path)
else:
self.ftp.remove(path)
def mv(self, old, new):
logger.debug("Renaming %s into %s", old, new)
self.ftp.posix_rename(old, new)
def commit_a_file(self):
self.fs.mv(self.temppath, self.targetpath)
def discard_a_file(self):
self.fs._rm(self.temppath)
|