Wauplin HF staff commited on
Commit
65e637b
1 Parent(s): 1a0a79f

leaner ParquetScheduler

Browse files
Files changed (1) hide show
  1. app_parquet.py +77 -40
app_parquet.py CHANGED
@@ -6,13 +6,14 @@ import shutil
6
  import tempfile
7
  import uuid
8
  from pathlib import Path
9
- from typing import Any, Dict, List
10
 
11
  import gradio as gr
12
  import pyarrow as pa
13
  import pyarrow.parquet as pq
14
  from gradio_client import Client
15
  from huggingface_hub import CommitScheduler
 
16
 
17
  #######################
18
  # Parquet scheduler #
@@ -21,54 +22,95 @@ from huggingface_hub import CommitScheduler
21
 
22
 
23
  class ParquetScheduler(CommitScheduler):
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
24
  def append(self, row: Dict[str, Any]) -> None:
 
25
  with self.lock:
26
- if not hasattr(self, "rows") or self.rows is None:
27
- self.rows = []
28
- self.rows.append(row)
29
-
30
- def set_schema(self, schema: Dict[str, Dict[str, str]]) -> None:
31
- """
32
- Define a schema to help `datasets` load the generated library.
33
-
34
- This method is optional and can be called once just after the scheduler had been created. If it is not called,
35
- the schema is automatically inferred before pushing the data to the Hub.
36
-
37
- See https://huggingface.co/docs/datasets/main/en/package_reference/main_classes#datasets.Value for the list of
38
- possible values.
39
-
40
- Example:
41
- ```py
42
- scheduler.set_schema({
43
- "prompt": {"_type": "Value", "dtype": "string"},
44
- "negative_prompt": {"_type": "Value", "dtype": "string"},
45
- "guidance_scale": {"_type": "Value", "dtype": "int64"},
46
- "image": {"_type": "Image"},
47
- })
48
- ```
49
- """
50
- self._schema = schema
51
 
52
  def push_to_hub(self):
53
  # Check for new rows to push
54
  with self.lock:
55
- rows = getattr(self, "rows", None)
56
- self.rows = None
57
  if not rows:
58
  return
59
  print(f"Got {len(rows)} item(s) to commit.")
60
 
61
  # Load images + create 'features' config for datasets library
62
- hf_features: Dict[str, Dict] = getattr(self, "_schema", None) or {}
63
  path_to_cleanup: List[Path] = []
64
  for row in rows:
65
  for key, value in row.items():
66
  # Infer schema (for `datasets` library)
67
- if key not in hf_features:
68
- hf_features[key] = _infer_schema(key, value)
69
 
70
  # Load binary files if necessary
71
- if hf_features[key]["_type"] in ("Image", "Audio"):
72
  # It's an image or audio: we load the bytes and remember to cleanup the file
73
  file_path = Path(value)
74
  if file_path.is_file():
@@ -80,7 +122,7 @@ class ParquetScheduler(CommitScheduler):
80
 
81
  # Complete rows if needed
82
  for row in rows:
83
- for feature in hf_features:
84
  if feature not in row:
85
  row[feature] = None
86
 
@@ -89,7 +131,7 @@ class ParquetScheduler(CommitScheduler):
89
 
90
  # Add metadata (used by datasets library)
91
  table = table.replace_schema_metadata(
92
- {"huggingface": json.dumps({"info": {"features": hf_features}})}
93
  )
94
 
95
  # Write to parquet file
@@ -142,12 +184,7 @@ def _infer_schema(key: str, value: Any) -> Dict[str, str]:
142
  PARQUET_DATASET_DIR = Path("parquet_dataset")
143
  PARQUET_DATASET_DIR.mkdir(parents=True, exist_ok=True)
144
 
145
- scheduler = ParquetScheduler(
146
- repo_id="example-space-to-dataset-parquet",
147
- repo_type="dataset",
148
- folder_path=PARQUET_DATASET_DIR,
149
- path_in_repo="data",
150
- )
151
 
152
  client = Client("stabilityai/stable-diffusion")
153
 
 
6
  import tempfile
7
  import uuid
8
  from pathlib import Path
9
+ from typing import Any, Dict, List, Optional, Union
10
 
11
  import gradio as gr
12
  import pyarrow as pa
13
  import pyarrow.parquet as pq
14
  from gradio_client import Client
15
  from huggingface_hub import CommitScheduler
16
+ from huggingface_hub.hf_api import HfApi
17
 
18
  #######################
19
  # Parquet scheduler #
 
22
 
23
 
24
  class ParquetScheduler(CommitScheduler):
25
+ """
26
+ Usage: configure the scheduler with a repo id. Once started, you can add data to be uploaded to the Hub. 1 `.append`
27
+ call will result in 1 row in your final dataset.
28
+
29
+ ```py
30
+ # Start scheduler
31
+ >>> scheduler = ParquetScheduler(repo_id="my-parquet-dataset")
32
+
33
+ # Append some data to be uploaded
34
+ >>> scheduler.append({...})
35
+ >>> scheduler.append({...})
36
+ >>> scheduler.append({...})
37
+ ```
38
+
39
+ The scheduler will automatically infer the schema from the data it pushes.
40
+ Optionally, you can manually set the schema yourself:
41
+
42
+ ```py
43
+ >>> scheduler = ParquetScheduler(
44
+ ... repo_id="my-parquet-dataset",
45
+ ... schema={
46
+ ... "prompt": {"_type": "Value", "dtype": "string"},
47
+ ... "negative_prompt": {"_type": "Value", "dtype": "string"},
48
+ ... "guidance_scale": {"_type": "Value", "dtype": "int64"},
49
+ ... "image": {"_type": "Image"},
50
+ ... },
51
+ ... )
52
+
53
+ See https://huggingface.co/docs/datasets/main/en/package_reference/main_classes#datasets.Value for the list of
54
+ possible values.
55
+ """
56
+
57
+ def __init__(
58
+ self,
59
+ *,
60
+ repo_id: str,
61
+ schema: Optional[Dict[str, Dict[str, str]]] = None,
62
+ every: Union[int, float] = 5,
63
+ path_in_repo: Optional[str] = "data",
64
+ repo_type: Optional[str] = "dataset",
65
+ revision: Optional[str] = None,
66
+ private: bool = False,
67
+ token: Optional[str] = None,
68
+ allow_patterns: Union[List[str], str, None] = None,
69
+ ignore_patterns: Union[List[str], str, None] = None,
70
+ hf_api: Optional[HfApi] = None,
71
+ ) -> None:
72
+ super().__init__(
73
+ repo_id=repo_id,
74
+ folder_path="dummy", # not used by the scheduler
75
+ every=every,
76
+ path_in_repo=path_in_repo,
77
+ repo_type=repo_type,
78
+ revision=revision,
79
+ private=private,
80
+ token=token,
81
+ allow_patterns=allow_patterns,
82
+ ignore_patterns=ignore_patterns,
83
+ hf_api=hf_api,
84
+ )
85
+
86
+ self._rows: List[Dict[str, Any]] = []
87
+ self._schema = schema
88
+
89
  def append(self, row: Dict[str, Any]) -> None:
90
+ """Add a new item to be uploaded."""
91
  with self.lock:
92
+ self._rows.append(row)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
93
 
94
  def push_to_hub(self):
95
  # Check for new rows to push
96
  with self.lock:
97
+ rows = self._rows
98
+ self._rows = []
99
  if not rows:
100
  return
101
  print(f"Got {len(rows)} item(s) to commit.")
102
 
103
  # Load images + create 'features' config for datasets library
104
+ schema: Dict[str, Dict] = self._schema or {}
105
  path_to_cleanup: List[Path] = []
106
  for row in rows:
107
  for key, value in row.items():
108
  # Infer schema (for `datasets` library)
109
+ if key not in schema:
110
+ schema[key] = _infer_schema(key, value)
111
 
112
  # Load binary files if necessary
113
+ if schema[key]["_type"] in ("Image", "Audio"):
114
  # It's an image or audio: we load the bytes and remember to cleanup the file
115
  file_path = Path(value)
116
  if file_path.is_file():
 
122
 
123
  # Complete rows if needed
124
  for row in rows:
125
+ for feature in schema:
126
  if feature not in row:
127
  row[feature] = None
128
 
 
131
 
132
  # Add metadata (used by datasets library)
133
  table = table.replace_schema_metadata(
134
+ {"huggingface": json.dumps({"info": {"features": schema}})}
135
  )
136
 
137
  # Write to parquet file
 
184
  PARQUET_DATASET_DIR = Path("parquet_dataset")
185
  PARQUET_DATASET_DIR.mkdir(parents=True, exist_ok=True)
186
 
187
+ scheduler = ParquetScheduler(repo_id="example-space-to-dataset-parquet")
 
 
 
 
 
188
 
189
  client = Client("stabilityai/stable-diffusion")
190