Ark-kun commited on
Commit
5452051
·
1 Parent(s): fc6254d

chore: Updated the backend to latest dev

Browse files
backend CHANGED
@@ -1 +1 @@
1
- Subproject commit 868a3f7c85cdc522b43315d946b0c32686cf3712
 
1
+ Subproject commit 7fce49dce66090b9313701f82921dda2c21194e6
huggingface_overlay/cloud_pipelines_backend/launchers/huggingface_launchers.py DELETED
@@ -1,470 +0,0 @@
1
- import copy
2
- import dataclasses
3
- import datetime
4
- import logging
5
- import pathlib
6
- import typing
7
- from typing import Any, Optional
8
-
9
- import huggingface_hub
10
-
11
- from cloud_pipelines.orchestration.launchers import naming_utils
12
- from ..storage_providers import huggingface_repo_storage
13
- from .. import component_structures as structures
14
- from . import container_component_utils
15
- from . import interfaces
16
-
17
-
18
- _logger = logging.getLogger(__name__)
19
-
20
- _MAX_INPUT_VALUE_SIZE = 10000
21
-
22
- _CONTAINER_FILE_NAME = "data"
23
-
24
-
25
- class HuggingFaceJobsContainerLauncher(
26
- interfaces.ContainerTaskLauncher["LaunchedHuggingFaceJobContainer"]
27
- ):
28
- """Launcher that uses HuggingFace Jobs installed locally"""
29
-
30
- def __init__(
31
- self,
32
- *,
33
- client: Optional[huggingface_hub.HfApi] = None,
34
- namespace: Optional[str] = None,
35
- hf_token: Optional[str] = None,
36
- hf_job_token: Optional[str] = None,
37
- job_timeout: Optional[int | float | str] = None,
38
- ):
39
- # The HF Jobs that we launch need token to write the output artifacts and logs
40
- hf_token = hf_token or huggingface_hub.get_token()
41
- hf_job_token = hf_job_token or hf_token
42
- self._api_client = client or huggingface_hub.HfApi(token=hf_token)
43
- self._namespace: str = namespace or self._api_client.whoami()["name"]
44
- self._storage_provider = (
45
- huggingface_repo_storage.HuggingFaceRepoStorageProvider()
46
- )
47
- self._job_timeout = job_timeout
48
- self._hf_job_token = hf_job_token
49
- # Test the access
50
- # _ = self._api_client.list_models(filter="non-existing")
51
- _ = self._api_client.list_jobs(namespace=self._namespace)
52
-
53
- def launch_container_task(
54
- self,
55
- *,
56
- component_spec: structures.ComponentSpec,
57
- # Input arguments may be updated with new downloaded values and new URIs of uploaded values.
58
- input_arguments: dict[str, interfaces.InputArgument],
59
- output_uris: dict[str, str],
60
- log_uri: str,
61
- annotations: dict[str, Any] | None = None,
62
- ) -> "LaunchedHuggingFaceJobContainer":
63
- if not isinstance(
64
- component_spec.implementation, structures.ContainerImplementation
65
- ):
66
- raise TypeError(
67
- f"Container launchers only support container implementations. Got {component_spec=}"
68
- )
69
- container_spec = component_spec.implementation.container
70
-
71
- # TODO: Validate the input/output URIs.
72
- container_inputs_root = pathlib.PurePosixPath("/tmp/component/inputs")
73
- container_outputs_root = pathlib.PurePosixPath("/tmp/component/outputs")
74
-
75
- # download_input_uris: dict[huggingface_repo_storage.HuggingFaceRepoUri, str] = {}
76
- # upload_output_uris: dict[str, huggingface_repo_storage.HuggingFaceRepoUri] = {}
77
- download_input_uris: dict[str, str] = {}
78
- upload_output_uris: dict[str, str] = {}
79
- # TODO: Derive common prefix for the upload_output_uris (also log_uri) and upload everything at once
80
-
81
- # Callbacks for the command-line resolving
82
- # Their main purpose is to return input/output path or value.
83
- # They add volumes and volume mounts when needed.
84
- # They also upload/download artifact data when needed.
85
- def get_input_value(input_name: str) -> str:
86
- input_argument = input_arguments[input_name]
87
- if input_argument.is_dir:
88
- raise interfaces.LauncherError(
89
- f"Cannot consume directory as value. {input_name=}, {input_argument=}"
90
- )
91
- if input_argument.total_size > _MAX_INPUT_VALUE_SIZE:
92
- raise interfaces.LauncherError(
93
- f"Artifact is too big to consume as value. Consume it as file instead. {input_name=}, {input_argument=}"
94
- )
95
- value = input_argument.value
96
- if value is None:
97
- # Download artifact data
98
- if not input_argument.uri:
99
- raise interfaces.LauncherError(
100
- f"Artifact data has no value and no uri. This cannot happen. {input_name=}, {input_argument=}"
101
- )
102
- uri_reader = self._storage_provider.make_uri(
103
- input_argument.uri
104
- ).get_reader()
105
- try:
106
- data = uri_reader.download_as_bytes()
107
- except Exception as ex:
108
- raise interfaces.LauncherError(
109
- f"Error downloading artifact data. {input_name=}, {input_argument.uri=}"
110
- ) from ex
111
- try:
112
- value = data.decode("utf-8")
113
- except Exception as ex:
114
- raise interfaces.LauncherError(
115
- f"Error converting artifact data to text. {input_name=}, {input_argument.uri=}"
116
- ) from ex
117
- # Updating the input_arguments with the downloaded value
118
- input_argument.value = value
119
- return value
120
-
121
- def get_input_path(input_name: str) -> str:
122
- input_argument = input_arguments[input_name]
123
- uri = input_argument.uri
124
- if not uri:
125
- if input_argument.value is None:
126
- raise interfaces.LauncherError(
127
- f"Artifact data has no value and no uri. This cannot happen. {input_name=}, {input_argument=}"
128
- )
129
- uri_writer = self._storage_provider.make_uri(
130
- input_argument.staging_uri
131
- ).get_writer()
132
- try:
133
- uri_writer.upload_from_text(input_argument.value)
134
- except Exception as ex:
135
- raise interfaces.LauncherError(
136
- f"Error uploading argument value. {input_name=}, {input_argument=}"
137
- ) from ex
138
- uri = input_argument.staging_uri
139
- # Updating the input_arguments with the URI of the uploaded value
140
- input_argument.uri = uri
141
-
142
- container_path = (
143
- container_inputs_root
144
- / naming_utils.sanitize_file_name(input_name)
145
- / _CONTAINER_FILE_NAME
146
- ).as_posix()
147
- # hf_uri = huggingface_repo_storage.HuggingFaceRepoUri.parse(uri)
148
- download_input_uris[uri] = container_path
149
- return container_path
150
-
151
- def get_output_path(output_name: str) -> str:
152
- uri = output_uris[output_name]
153
- # container_path = (
154
- # container_outputs_root
155
- # / naming_utils.sanitize_file_name(output_name)
156
- # / _CONTAINER_FILE_NAME
157
- # ).as_posix()
158
- hf_uri = huggingface_repo_storage.HuggingFaceRepoUri.parse(uri)
159
- uri_path_in_repo = hf_uri.path
160
- container_path = str(container_outputs_root / uri_path_in_repo)
161
- upload_output_uris[container_path] = uri
162
- return container_path
163
-
164
- def get_log_path() -> str:
165
- # TODO: Use common URI here
166
- hf_uri = huggingface_repo_storage.HuggingFaceRepoUri.parse(log_uri)
167
- uri_path_in_repo = hf_uri.path
168
- container_path = str(container_outputs_root / uri_path_in_repo)
169
- return container_path
170
-
171
- def get_exit_code_path() -> str:
172
- # TODO: Use common URI here
173
- hf_uri = huggingface_repo_storage.HuggingFaceRepoUri.parse(log_uri)
174
- uri_path_in_repo = hf_uri.path
175
- container_path = str(
176
- (container_outputs_root / uri_path_in_repo).with_name("exit_code.txt")
177
- )
178
- return container_path
179
-
180
- container_log_path = get_log_path()
181
- exit_code_path = get_exit_code_path()
182
-
183
- # Resolving the command line.
184
- # Also indirectly populates volumes and volume_mounts.
185
- resolved_cmd = container_component_utils.resolve_container_command_line(
186
- component_spec=component_spec,
187
- provided_input_names=set(input_arguments.keys()),
188
- get_input_value=get_input_value,
189
- get_input_path=get_input_path,
190
- get_output_path=get_output_path,
191
- )
192
-
193
- # Preparing the artifact uploader wrapper
194
- # TODO: Use common URI here
195
- # TODO: Add: --commit-message '{path_in_repo}' once path_in_repo becomes non-empty
196
- path_in_repo = ""
197
- # commit_message = path_in_repo
198
-
199
- hf_repo_uri = huggingface_repo_storage.HuggingFaceRepoUri.parse(log_uri)
200
-
201
- # It's hard to download data from HuggingFace.
202
- # First, there is no way to download a directory:
203
- # 1. The CLI only downloads to cache. So we have to correctly find the data location in the cache sna copy the data out.
204
- # 2. We cannot specify which directory to download, so we have to use the --include filter.
205
- # But `hf download --include path` has an issue that it cannot download a directory unless path ends with slash (and for files, there should be no slash).
206
- # Adding "*" works for both files and directories. It's imperfect, but fine.
207
- # Another problem: The files in the snapshot are actually symlinks.
208
- # cp options:
209
- # -H follow command-line symbolic links in SOURCE
210
- # -l, --link hard link files instead of copying
211
- # -L, --dereference always follow symbolic links in SOURCE
212
- input_download_lines = [
213
- f'mkdir -p "$(dirname "{container_path}")"'
214
- f' && snapshot_dir=`uv run --with huggingface_hub[cli] hf download --repo-type "{hf_uri.repo_type}" "{hf_uri.repo_id}" --include "{hf_uri.path}*"`'
215
- f' && cp -r -L "$snapshot_dir/{hf_uri.path}" "{container_path}"'
216
- for hf_uri, container_path in (
217
- (huggingface_repo_storage.HuggingFaceRepoUri.parse(uri), container_path)
218
- for uri, container_path in download_input_uris.items()
219
- )
220
- ]
221
- input_download_code = "\n".join(input_download_lines)
222
-
223
- artifact_uploader_script = f"""
224
- set -e -x -o pipefail
225
- # Installing uv
226
- url="https://astral.sh/uv/install.sh"
227
- if command -v curl &>/dev/null; then
228
- # -s: silent, -L: follow redirects
229
- curl -s -L "$url" | sh
230
- elif command -v wget &>/dev/null; then
231
- wget -q -O - "$url" | sh
232
- else
233
- echo "Error: Neither curl nor wget found." >&2
234
- exit 1
235
- fi
236
-
237
- export PATH="$HOME/.local/bin:$PATH"
238
-
239
- uv run --with huggingface_hub[cli] hf version
240
-
241
- # Downloading the input data
242
- {input_download_code}
243
-
244
- # Running the program
245
- log_path='{container_log_path}'
246
- exit_code_path='{exit_code_path}'
247
- mkdir -p "$(dirname "$log_path")"
248
- mkdir -p "$(dirname "$exit_code_path")"
249
- # We need to capture the exit code while piping the stderr and stdout to a log file. Not all shells support `${{PIPEFAIL[0]}}`
250
- set +e +x
251
- {{ "$0" "$@"; echo $? >"$exit_code_path";}} 2>&1 | tee "$log_path"
252
- set -e +x
253
-
254
- exit_code=`cat "$exit_code_path"`
255
-
256
- uv run --with huggingface_hub[cli] hf upload --repo-type '{hf_repo_uri.repo_type}' '{hf_repo_uri.repo_id}' '{container_outputs_root}' '{path_in_repo}'
257
- exit "$exit_code"
258
- """
259
-
260
- container_env = container_spec.env or {}
261
-
262
- # Passing HF token to the Job
263
- secrets: dict[str, str] = {}
264
- if self._hf_job_token:
265
- secrets["HF_TOKEN"] = self._hf_job_token
266
-
267
- command_line = list(resolved_cmd.command or []) + list(resolved_cmd.args or [])
268
- command_line = ["sh", "-c", artifact_uploader_script] + command_line
269
- job = self._api_client.run_job(
270
- image=container_spec.image,
271
- command=command_line,
272
- env=dict(container_env),
273
- timeout=self._job_timeout,
274
- namespace=self._namespace,
275
- secrets=secrets,
276
- # flavor=...,
277
- )
278
-
279
- _logger.info(f"Launched HF Job {job.id=}, {job.url=}")
280
- launched_container = LaunchedHuggingFaceJobContainer(
281
- id=job.id,
282
- namespace=self._namespace,
283
- job=job,
284
- output_uris=output_uris,
285
- log_uri=log_uri,
286
- )
287
- return launched_container
288
-
289
- def deserialize_launched_container_from_dict(
290
- self, launched_container_dict: dict[str, Any]
291
- ) -> "LaunchedHuggingFaceJobContainer":
292
- launched_container = LaunchedHuggingFaceJobContainer.from_dict(
293
- launched_container_dict, api_client=self._api_client
294
- )
295
- return launched_container
296
-
297
- def get_refreshed_launched_container_from_dict(
298
- self, launched_container_dict: dict[str, Any]
299
- ) -> "LaunchedHuggingFaceJobContainer":
300
- launched_container = LaunchedHuggingFaceJobContainer.from_dict(
301
- launched_container_dict, api_client=self._api_client
302
- )
303
- job = self._api_client.inspect_job(
304
- job_id=launched_container.id,
305
- namespace=launched_container._namespace,
306
- )
307
- new_launched_container = copy.copy(launched_container)
308
- new_launched_container._job = job
309
- return new_launched_container
310
-
311
-
312
- class LaunchedHuggingFaceJobContainer(interfaces.LaunchedContainer):
313
- def __init__(
314
- self,
315
- id: str,
316
- namespace: str,
317
- job: huggingface_hub.JobInfo,
318
- output_uris: dict[str, str],
319
- log_uri: str,
320
- api_client: huggingface_hub.HfApi | None = None,
321
- ):
322
- self._id: str = id
323
- self._namespace: str = namespace
324
- self._job = job
325
- self._output_uris: dict[str, str] = output_uris
326
- self._log_uri: str = log_uri
327
- self._api_client: huggingface_hub.HfApi | None = api_client
328
-
329
- def _get_api_client(self):
330
- if not self._api_client:
331
- raise interfaces.LauncherError(
332
- "This action requires an API client, but this instance was constructed without one."
333
- )
334
- return self._api_client
335
-
336
- @property
337
- def id(self) -> str:
338
- return self._id
339
-
340
- @property
341
- def status(self) -> interfaces.ContainerStatus:
342
- status_str = self._job.status.stage
343
- # status_message = self._job.status.message
344
- if status_str == huggingface_hub.JobStage.RUNNING:
345
- return interfaces.ContainerStatus.RUNNING
346
- elif status_str == huggingface_hub.JobStage.COMPLETED:
347
- return interfaces.ContainerStatus.SUCCEEDED
348
- elif status_str == huggingface_hub.JobStage.ERROR:
349
- return interfaces.ContainerStatus.FAILED
350
- elif status_str == huggingface_hub.JobStage.CANCELED:
351
- return interfaces.ContainerStatus.FAILED
352
- else: # "DELETED"
353
- return interfaces.ContainerStatus.ERROR
354
-
355
- @property
356
- def exit_code(self) -> Optional[int]:
357
- # HF Jobs do not provide exit code
358
- if not self.has_ended:
359
- return None
360
- return None
361
-
362
- @property
363
- def has_ended(self) -> bool:
364
- return self.status in (
365
- interfaces.ContainerStatus.SUCCEEDED,
366
- interfaces.ContainerStatus.FAILED,
367
- interfaces.ContainerStatus.ERROR,
368
- )
369
-
370
- @property
371
- def has_succeeded(self) -> bool:
372
- return self.status == interfaces.ContainerStatus.SUCCEEDED
373
-
374
- @property
375
- def has_failed(self) -> bool:
376
- return self.status == interfaces.ContainerStatus.FAILED
377
-
378
- @property
379
- def started_at(self) -> datetime.datetime | None:
380
- # HF Jobs do not provide started_at, so using created_at
381
- return self._job.created_at
382
-
383
- @property
384
- def ended_at(self) -> datetime.datetime | None:
385
- # HF Jobs do not provide ended_at
386
- # Fudging the value by returning the current time.
387
- if self.has_ended:
388
- return datetime.datetime.now(datetime.timezone.utc)
389
- return None
390
-
391
- @property
392
- def launcher_error_message(self) -> str | None:
393
- if self._job.status.message:
394
- # TODO: Check what kind of messages this returns and when.
395
- _logger.info(
396
- f"launcher_error_message: {self._id=}: {self._job.status.message=}"
397
- )
398
- return self._job.status.message
399
- return None
400
-
401
- def get_log(self) -> str:
402
- if self.has_ended:
403
- try:
404
- return (
405
- huggingface_repo_storage.HuggingFaceRepoStorageProvider()
406
- .make_uri(self._log_uri)
407
- .get_reader()
408
- .download_as_text()
409
- )
410
- except Exception as ex:
411
- _logger.warning(
412
- f"get_log: {self._id=}: Error getting log from URI: {self._log_uri}",
413
- ex,
414
- )
415
- return "\n".join(
416
- self._get_api_client().fetch_job_logs(
417
- job_id=self._id,
418
- )
419
- )
420
-
421
- def upload_log(self):
422
- # Logs should be uploaded automatically by the modified command-line wrapper
423
- pass
424
-
425
- def stream_log_lines(self) -> typing.Iterator[str]:
426
- return (
427
- self._get_api_client()
428
- .fetch_job_logs(
429
- job_id=self._id,
430
- namespace=self._namespace,
431
- )
432
- .__iter__()
433
- )
434
-
435
- def terminate(self):
436
- self._get_api_client().cancel_job(job_id=self._id, namespace=self._namespace)
437
-
438
- def to_dict(self) -> dict[str, Any]:
439
- debug_job_info = dataclasses.asdict(self._job)
440
- # Fix JSON serialization of datetime
441
- del debug_job_info["created_at"]
442
- return dict(
443
- huggingface_job=dict(
444
- id=self.id,
445
- namespace=self._namespace,
446
- output_uris=self._output_uris,
447
- log_uri=self._log_uri,
448
- # For debugging purposes, not needed otherwise
449
- debug_job_info=debug_job_info,
450
- )
451
- )
452
-
453
- @classmethod
454
- def from_dict(
455
- cls,
456
- d: dict[str, Any],
457
- api_client: huggingface_hub.HfApi | None = None,
458
- ) -> "LaunchedHuggingFaceJobContainer":
459
- container_dict = d["huggingface_job"]
460
- job_info = huggingface_hub.JobInfo(
461
- **container_dict["debug_job_info"],
462
- )
463
- return LaunchedHuggingFaceJobContainer(
464
- id=container_dict["id"],
465
- namespace=container_dict["namespace"],
466
- job=job_info,
467
- output_uris=container_dict["output_uris"],
468
- log_uri=container_dict["log_uri"],
469
- api_client=api_client,
470
- )