User history V2

#15
by Wauplin HF staff - opened
Files changed (4) hide show
  1. app.py +29 -11
  2. gallery_history.py +0 -129
  3. style.css +2 -3
  4. user_history.py +542 -0
app.py CHANGED
@@ -10,7 +10,7 @@ from diffusers import WuerstchenDecoderPipeline, WuerstchenPriorPipeline
10
  from diffusers.pipelines.wuerstchen import DEFAULT_STAGE_C_TIMESTEPS
11
  from previewer.modules import Previewer
12
 
13
- from gallery_history import fetch_gallery_history, show_gallery_history
14
 
15
  os.environ['TOKENIZERS_PARALLELISM'] = 'false'
16
 
@@ -79,6 +79,7 @@ def generate(
79
  # decoder_timesteps: List[float] = None,
80
  decoder_guidance_scale: float = 0.0,
81
  num_images_per_prompt: int = 2,
 
82
  ) -> PIL.Image.Image:
83
  generator = torch.Generator().manual_seed(seed)
84
 
@@ -111,6 +112,25 @@ def generate(
111
  generator=generator,
112
  output_type="pil",
113
  ).images
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
114
  yield decoder_output
115
 
116
 
@@ -119,7 +139,7 @@ examples = [
119
  "An astronaut riding a green horse",
120
  ]
121
 
122
- with gr.Blocks(css="style.css") as demo:
123
  gr.Markdown(DESCRIPTION)
124
  gr.DuplicateButton(
125
  value="Duplicate Space for private use",
@@ -213,8 +233,6 @@ with gr.Blocks(css="style.css") as demo:
213
  cache_examples=CACHE_EXAMPLES,
214
  )
215
 
216
- history = show_gallery_history()
217
-
218
  inputs = [
219
  prompt,
220
  negative_prompt,
@@ -240,8 +258,6 @@ with gr.Blocks(css="style.css") as demo:
240
  inputs=inputs,
241
  outputs=result,
242
  api_name="run",
243
- ).then(
244
- fn=fetch_gallery_history, inputs=[prompt, result], outputs=history, queue=False
245
  )
246
  negative_prompt.submit(
247
  fn=randomize_seed_fn,
@@ -254,8 +270,6 @@ with gr.Blocks(css="style.css") as demo:
254
  inputs=inputs,
255
  outputs=result,
256
  api_name=False,
257
- ).then(
258
- fn=fetch_gallery_history, inputs=[prompt, result], outputs=history, queue=False
259
  )
260
  run_button.click(
261
  fn=randomize_seed_fn,
@@ -268,9 +282,13 @@ with gr.Blocks(css="style.css") as demo:
268
  inputs=inputs,
269
  outputs=result,
270
  api_name=False,
271
- ).then(
272
- fn=fetch_gallery_history, inputs=[prompt, result], outputs=history, queue=False
273
  )
274
 
 
 
 
 
 
 
275
  if __name__ == "__main__":
276
- demo.queue(max_size=20).launch()
 
10
  from diffusers.pipelines.wuerstchen import DEFAULT_STAGE_C_TIMESTEPS
11
  from previewer.modules import Previewer
12
 
13
+ import user_history
14
 
15
  os.environ['TOKENIZERS_PARALLELISM'] = 'false'
16
 
 
79
  # decoder_timesteps: List[float] = None,
80
  decoder_guidance_scale: float = 0.0,
81
  num_images_per_prompt: int = 2,
82
+ profile: gr.OAuthProfile | None = None,
83
  ) -> PIL.Image.Image:
84
  generator = torch.Generator().manual_seed(seed)
85
 
 
112
  generator=generator,
113
  output_type="pil",
114
  ).images
115
+
116
+ # Save images
117
+ for image in decoder_output:
118
+ user_history.save_image(
119
+ profile=profile,
120
+ image=image,
121
+ label=prompt,
122
+ metadata={
123
+ "negative_prompt": negative_prompt,
124
+ "seed": seed,
125
+ "width": width,
126
+ "height": height,
127
+ "prior_guidance_scale": prior_guidance_scale,
128
+ "decoder_num_inference_steps": decoder_num_inference_steps,
129
+ "decoder_guidance_scale": decoder_guidance_scale,
130
+ "num_images_per_prompt": num_images_per_prompt,
131
+ },
132
+ )
133
+
134
  yield decoder_output
135
 
136
 
 
139
  "An astronaut riding a green horse",
140
  ]
141
 
142
+ with gr.Blocks() as demo:
143
  gr.Markdown(DESCRIPTION)
144
  gr.DuplicateButton(
145
  value="Duplicate Space for private use",
 
233
  cache_examples=CACHE_EXAMPLES,
234
  )
235
 
 
 
236
  inputs = [
237
  prompt,
238
  negative_prompt,
 
258
  inputs=inputs,
259
  outputs=result,
260
  api_name="run",
 
 
261
  )
262
  negative_prompt.submit(
263
  fn=randomize_seed_fn,
 
270
  inputs=inputs,
271
  outputs=result,
272
  api_name=False,
 
 
273
  )
274
  run_button.click(
275
  fn=randomize_seed_fn,
 
282
  inputs=inputs,
283
  outputs=result,
284
  api_name=False,
 
 
285
  )
286
 
287
+ with gr.Blocks(css="style.css") as demo_with_history:
288
+ with gr.Tab("App"):
289
+ demo.render()
290
+ with gr.Tab("Past generations"):
291
+ user_history.render()
292
+
293
  if __name__ == "__main__":
294
+ demo_with_history.queue(max_size=20).launch()
gallery_history.py DELETED
@@ -1,129 +0,0 @@
1
- """
2
- How to use:
3
- 1. Create a Space with a Persistent Storage attached. Filesystem will be available under `/data`.
4
- 2. Add `hf_oauth: true` to the Space metadata (README.md). Make sure to have Gradio>=3.41.0 configured.
5
- 3. Add `HISTORY_FOLDER` as a Space variable (example. `"/data/history"`).
6
- 4. Add `filelock` as dependency in `requirements.txt`.
7
- 5. Add history gallery to your Gradio app:
8
- a. Add imports: `from gallery_history import fetch_gallery_history, show_gallery_history`
9
- a. Add `history = show_gallery_history()` within `gr.Blocks` context.
10
- b. Add `.then(fn=fetch_gallery_history, inputs=[prompt, result], outputs=history)` on the generate event.
11
- """
12
- import json
13
- import os
14
- import shutil
15
- from pathlib import Path
16
- from typing import Dict, List, Optional, Tuple
17
- from uuid import uuid4
18
-
19
- import gradio as gr
20
- from filelock import FileLock
21
-
22
- _folder = os.environ.get("HISTORY_FOLDER")
23
- if _folder is None:
24
- print(
25
- "'HISTORY_FOLDER' environment variable not set. User history will be saved "
26
- "locally and will be lost when the Space instance is restarted."
27
- )
28
- _folder = Path(__file__).parent / "history"
29
- HISTORY_FOLDER_PATH = Path(_folder)
30
-
31
- IMAGES_FOLDER_PATH = HISTORY_FOLDER_PATH / "images"
32
- IMAGES_FOLDER_PATH.mkdir(parents=True, exist_ok=True)
33
-
34
-
35
- def show_gallery_history():
36
- gr.Markdown(
37
- "## Your past generations\n\n(Log in to keep a gallery of your previous generations."
38
- " Your history will be saved and available on your next visit.)"
39
- )
40
- with gr.Column():
41
- with gr.Row():
42
- gr.LoginButton(min_width=250)
43
- gr.LogoutButton(min_width=250)
44
- gallery = gr.Gallery(
45
- label="Past images",
46
- show_label=True,
47
- elem_id="gallery",
48
- object_fit="contain",
49
- columns=3,
50
- height=300,
51
- preview=False,
52
- show_share_button=False,
53
- show_download_button=False,
54
- )
55
- gr.Markdown(
56
- "Make sure to save your images from time to time, this gallery may be deleted in the future."
57
- )
58
- gallery.attach_load_event(fetch_gallery_history, every=None)
59
- return gallery
60
-
61
-
62
- def fetch_gallery_history(
63
- prompt: Optional[str] = None,
64
- result: Optional[Dict] = None,
65
- user: Optional[gr.OAuthProfile] = None,
66
- ):
67
- if user is None:
68
- return []
69
- try:
70
- if prompt is not None and result is not None: # None values means no new images
71
- return _update_user_history(
72
- user["preferred_username"], [(item["name"], prompt) for item in result]
73
- )
74
- else:
75
- return _read_user_history(user["preferred_username"])
76
- except Exception as e:
77
- raise gr.Error(f"Error while fetching history: {e}") from e
78
-
79
-
80
- ####################
81
- # Internal helpers #
82
- ####################
83
-
84
-
85
- def _read_user_history(username: str) -> List[Tuple[str, str]]:
86
- """Return saved history for that user."""
87
- with _user_lock(username):
88
- path = _user_history_path(username)
89
- if path.exists():
90
- return json.loads(path.read_text())
91
- return [] # No history yet
92
-
93
-
94
- def _update_user_history(
95
- username: str, new_images: List[Tuple[str, str]]
96
- ) -> List[Tuple[str, str]]:
97
- """Update history for that user and return it."""
98
- with _user_lock(username):
99
- # Read existing
100
- path = _user_history_path(username)
101
- if path.exists():
102
- images = json.loads(path.read_text())
103
- else:
104
- images = [] # No history yet
105
-
106
- # Copy images to persistent folder
107
- images = [
108
- (_copy_image(src_path), prompt) for src_path, prompt in new_images
109
- ] + images
110
-
111
- # Save and return
112
- path.write_text(json.dumps(images))
113
- return images
114
-
115
-
116
- def _user_history_path(username: str) -> Path:
117
- return HISTORY_FOLDER_PATH / f"{username}.json"
118
-
119
-
120
- def _user_lock(username: str) -> FileLock:
121
- """Ensure history is not corrupted if concurrent calls."""
122
- return FileLock(f"{_user_history_path(username)}.lock")
123
-
124
-
125
- def _copy_image(src: str) -> str:
126
- """Copy image to the persistent storage."""
127
- dst = IMAGES_FOLDER_PATH / f"{uuid4().hex}_{Path(src).name}" # keep file ext
128
- shutil.copyfile(src, dst)
129
- return str(dst)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
style.css CHANGED
@@ -9,9 +9,8 @@ h1 {
9
  border-radius: 100vh;
10
  }
11
 
12
- #component-0 {
13
- max-width: 730px;
14
- margin: auto;
15
  }
16
 
17
  #share-btn-container{padding-left: 0.5rem !important; padding-right: 0.5rem !important; background-color: #000000; justify-content: center; align-items: center; border-radius: 9999px !important; max-width: 13rem; margin-left: auto;margin-top: 0.35em;}
 
9
  border-radius: 100vh;
10
  }
11
 
12
+ .gradio-container {
13
+ max-width: 730px! important;
 
14
  }
15
 
16
  #share-btn-container{padding-left: 0.5rem !important; padding-right: 0.5rem !important; background-color: #000000; justify-content: center; align-items: center; border-radius: 9999px !important; max-width: 13rem; margin-left: auto;margin-top: 0.35em;}
user_history.py ADDED
@@ -0,0 +1,542 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """
2
+ User History is a plugin that you can add to your Spaces to cache generated images for your users.
3
+
4
+ Key features:
5
+ - 🤗 Sign in with Hugging Face
6
+ - Save generated images with their metadata: prompts, timestamp, hyper-parameters, etc.
7
+ - Export your history as zip.
8
+ - Delete your history to respect privacy.
9
+ - Compatible with Persistent Storage for long-term storage.
10
+ - Admin panel to check configuration and disk usage .
11
+
12
+ Useful links:
13
+ - Demo: https://huggingface.co/spaces/Wauplin/gradio-user-history
14
+ - README: https://huggingface.co/spaces/Wauplin/gradio-user-history/blob/main/README.md
15
+ - Source file: https://huggingface.co/spaces/Wauplin/gradio-user-history/blob/main/user_history.py
16
+ - Discussions: https://huggingface.co/spaces/Wauplin/gradio-user-history/discussions
17
+ """
18
+ import json
19
+ import os
20
+ import shutil
21
+ import warnings
22
+ from datetime import datetime
23
+ from functools import cache
24
+ from pathlib import Path
25
+ from typing import Callable, Dict, List, Tuple
26
+ from uuid import uuid4
27
+
28
+ import gradio as gr
29
+ import numpy as np
30
+ import requests
31
+ from filelock import FileLock
32
+ from PIL.Image import Image
33
+
34
+
35
+ def setup(folder_path: str | Path | None = None) -> None:
36
+ user_history = _UserHistory()
37
+ user_history.folder_path = _resolve_folder_path(folder_path)
38
+ user_history.initialized = True
39
+
40
+ # TODO: remove this section once all Spaces have migrated
41
+ _migrate_history()
42
+
43
+
44
+ def render() -> None:
45
+ user_history = _UserHistory()
46
+
47
+ # initialize with default config
48
+ if not user_history.initialized:
49
+ print(
50
+ "Initializing user history with default config. Use `user_history.setup(...)` to customize folder_path."
51
+ )
52
+ setup()
53
+
54
+ # Render user history tab
55
+ gr.Markdown(
56
+ "## Your past generations\n\nLog in to keep a gallery of your previous generations. Your history will be saved"
57
+ " and available on your next visit. Make sure to export your images from time to time as this gallery may be"
58
+ " deleted in the future."
59
+ )
60
+
61
+ if os.getenv("SYSTEM") == "spaces" and not os.path.exists("/data"):
62
+ gr.Markdown(
63
+ "**⚠️ Persistent storage is disabled, meaning your history will be lost if the Space gets restarted."
64
+ " Only the Space owner can setup a Persistent Storage. If you are not the Space owner, consider"
65
+ " duplicating this Space to set your own storage.⚠️**"
66
+ )
67
+
68
+ with gr.Row():
69
+ gr.LoginButton(min_width=250)
70
+ gr.LogoutButton(min_width=250)
71
+ refresh_button = gr.Button(
72
+ "Refresh",
73
+ icon="https://huggingface.co/spaces/Wauplin/gradio-user-history/resolve/main/assets/icon_refresh.png",
74
+ )
75
+ export_button = gr.Button(
76
+ "Export",
77
+ icon="https://huggingface.co/spaces/Wauplin/gradio-user-history/resolve/main/assets/icon_download.png",
78
+ )
79
+ delete_button = gr.Button(
80
+ "Delete history",
81
+ icon="https://huggingface.co/spaces/Wauplin/gradio-user-history/resolve/main/assets/icon_delete.png",
82
+ )
83
+
84
+ # "Export zip" row (hidden by default)
85
+ with gr.Row():
86
+ export_file = gr.File(
87
+ file_count="single",
88
+ file_types=[".zip"],
89
+ label="Exported history",
90
+ visible=False,
91
+ )
92
+
93
+ # "Config deletion" row (hidden by default)
94
+ with gr.Row():
95
+ confirm_button = gr.Button(
96
+ "Confirm delete all history", variant="stop", visible=False
97
+ )
98
+ cancel_button = gr.Button("Cancel", visible=False)
99
+
100
+ # Gallery
101
+ gallery = gr.Gallery(
102
+ label="Past images",
103
+ show_label=True,
104
+ elem_id="gallery",
105
+ object_fit="contain",
106
+ columns=5,
107
+ height=600,
108
+ preview=False,
109
+ show_share_button=False,
110
+ show_download_button=False,
111
+ )
112
+ gr.Markdown(
113
+ "User history is powered by"
114
+ " [Wauplin/gradio-user-history](https://huggingface.co/spaces/Wauplin/gradio-user-history). Integrate it to"
115
+ " your own Space in just a few lines of code!"
116
+ )
117
+ gallery.attach_load_event(_fetch_user_history, every=None)
118
+
119
+ # Interactions
120
+ refresh_button.click(
121
+ fn=_fetch_user_history, inputs=[], outputs=[gallery], queue=False
122
+ )
123
+ export_button.click(
124
+ fn=_export_user_history, inputs=[], outputs=[export_file], queue=False
125
+ )
126
+
127
+ # Taken from https://github.com/gradio-app/gradio/issues/3324#issuecomment-1446382045
128
+ delete_button.click(
129
+ lambda: [gr.update(visible=True), gr.update(visible=True)],
130
+ outputs=[confirm_button, cancel_button],
131
+ queue=False,
132
+ )
133
+ cancel_button.click(
134
+ lambda: [gr.update(visible=False), gr.update(visible=False)],
135
+ outputs=[confirm_button, cancel_button],
136
+ queue=False,
137
+ )
138
+ confirm_button.click(_delete_user_history).then(
139
+ lambda: [gr.update(visible=False), gr.update(visible=False)],
140
+ outputs=[confirm_button, cancel_button],
141
+ queue=False,
142
+ )
143
+
144
+ # Admin section (only shown locally or when logged in as Space owner)
145
+ _admin_section()
146
+
147
+
148
+ def save_image(
149
+ profile: gr.OAuthProfile | None,
150
+ image: Image | np.ndarray | str | Path,
151
+ label: str | None = None,
152
+ metadata: Dict | None = None,
153
+ ):
154
+ """Save an image in user history.
155
+
156
+ You are guaranteed that calling `save_image` will not interrupt the main process. However, it is possible that for
157
+ some reason the image is not saved correctly (wrong configuration, disk is full, image not valid,...). In that case
158
+ the exception is silently ignored and a log is printed for Space owners.
159
+ """
160
+ try:
161
+ _save_image(profile, image, label, metadata)
162
+ except Exception as e:
163
+ print("Error while saving image! ", e)
164
+
165
+
166
+ def _save_image(
167
+ profile: gr.OAuthProfile | None,
168
+ image: Image | np.ndarray | str | Path,
169
+ label: str | None = None,
170
+ metadata: Dict | None = None,
171
+ ):
172
+ # Ignore images from logged out users
173
+ if profile is None:
174
+ return
175
+ username = profile["preferred_username"]
176
+
177
+ # Ignore images if user history not used
178
+ user_history = _UserHistory()
179
+ if not user_history.initialized:
180
+ warnings.warn(
181
+ "User history is not set in Gradio demo. Saving image is ignored. You must use `user_history.render(...)`"
182
+ " first."
183
+ )
184
+ return
185
+
186
+ # Copy image to storage
187
+ image_path = _copy_image(image, dst_folder=user_history._user_images_path(username))
188
+
189
+ # Save new image + metadata
190
+ if metadata is None:
191
+ metadata = {}
192
+ if "datetime" not in metadata:
193
+ metadata["datetime"] = str(datetime.now())
194
+ data = {"path": str(image_path), "label": label, "metadata": metadata}
195
+ with user_history._user_lock(username):
196
+ with user_history._user_jsonl_path(username).open("a") as f:
197
+ f.write(json.dumps(data) + "\n")
198
+
199
+
200
+ #############
201
+ # Internals #
202
+ #############
203
+
204
+
205
+ class _UserHistory(object):
206
+ _instance = None
207
+ initialized: bool = False
208
+ folder_path: Path
209
+
210
+ def __new__(cls):
211
+ # Using singleton pattern => we don't want to expose an object (more complex to use) but still want to keep
212
+ # state between `render` and `save_image` calls.
213
+ if cls._instance is None:
214
+ cls._instance = super(_UserHistory, cls).__new__(cls)
215
+ return cls._instance
216
+
217
+ def _user_path(self, username: str) -> Path:
218
+ path = self.folder_path / username
219
+ path.mkdir(parents=True, exist_ok=True)
220
+ return path
221
+
222
+ def _user_lock(self, username: str) -> FileLock:
223
+ """Ensure history is not corrupted if concurrent calls."""
224
+ return FileLock(
225
+ self.folder_path / f"{username}.lock"
226
+ ) # lock outside of folder => better when exporting ZIP
227
+
228
+ def _user_jsonl_path(self, username: str) -> Path:
229
+ return self._user_path(username) / "history.jsonl"
230
+
231
+ def _user_images_path(self, username: str) -> Path:
232
+ path = self._user_path(username) / "images"
233
+ path.mkdir(parents=True, exist_ok=True)
234
+ return path
235
+
236
+
237
+ def _fetch_user_history(profile: gr.OAuthProfile | None) -> List[Tuple[str, str]]:
238
+ """Return saved history for that user, if it exists."""
239
+ # Cannot load history for logged out users
240
+ if profile is None:
241
+ return []
242
+ username = profile["preferred_username"]
243
+
244
+ user_history = _UserHistory()
245
+ if not user_history.initialized:
246
+ warnings.warn(
247
+ "User history is not set in Gradio demo. You must use `user_history.render(...)` first."
248
+ )
249
+ return []
250
+
251
+ with user_history._user_lock(username):
252
+ # No file => no history saved yet
253
+ jsonl_path = user_history._user_jsonl_path(username)
254
+ if not jsonl_path.is_file():
255
+ return []
256
+
257
+ # Read history
258
+ images = []
259
+ for line in jsonl_path.read_text().splitlines():
260
+ data = json.loads(line)
261
+ images.append((data["path"], data["label"] or ""))
262
+ return list(reversed(images))
263
+
264
+
265
+ def _export_user_history(profile: gr.OAuthProfile | None) -> Dict | None:
266
+ """Zip all history for that user, if it exists and return it as a downloadable file."""
267
+ # Cannot load history for logged out users
268
+ if profile is None:
269
+ return None
270
+ username = profile["preferred_username"]
271
+
272
+ user_history = _UserHistory()
273
+ if not user_history.initialized:
274
+ warnings.warn(
275
+ "User history is not set in Gradio demo. You must use `user_history.render(...)` first."
276
+ )
277
+ return None
278
+
279
+ # Zip history
280
+ with user_history._user_lock(username):
281
+ path = shutil.make_archive(
282
+ str(_archives_path() / f"history_{username}"),
283
+ "zip",
284
+ user_history._user_path(username),
285
+ )
286
+
287
+ return gr.update(visible=True, value=path)
288
+
289
+
290
+ def _delete_user_history(profile: gr.OAuthProfile | None) -> None:
291
+ """Delete all history for that user."""
292
+ # Cannot load history for logged out users
293
+ if profile is None:
294
+ return
295
+ username = profile["preferred_username"]
296
+
297
+ user_history = _UserHistory()
298
+ if not user_history.initialized:
299
+ warnings.warn(
300
+ "User history is not set in Gradio demo. You must use `user_history.render(...)` first."
301
+ )
302
+ return
303
+
304
+ with user_history._user_lock(username):
305
+ shutil.rmtree(user_history._user_path(username))
306
+
307
+
308
+ ####################
309
+ # Internal helpers #
310
+ ####################
311
+
312
+
313
+ def _copy_image(image: Image | np.ndarray | str | Path, dst_folder: Path) -> Path:
314
+ """Copy image to the images folder."""
315
+ # Already a path => copy it
316
+ if isinstance(image, str):
317
+ image = Path(image)
318
+ if isinstance(image, Path):
319
+ dst = dst_folder / f"{uuid4().hex}_{Path(image).name}" # keep file ext
320
+ shutil.copyfile(image, dst)
321
+ return dst
322
+
323
+ # Still a Python object => serialize it
324
+ if isinstance(image, np.ndarray):
325
+ image = Image.fromarray(image)
326
+ if isinstance(image, Image):
327
+ dst = dst_folder / f"{uuid4().hex}.png"
328
+ image.save(dst)
329
+ return dst
330
+
331
+ raise ValueError(f"Unsupported image type: {type(image)}")
332
+
333
+
334
+ def _resolve_folder_path(folder_path: str | Path | None) -> Path:
335
+ if folder_path is not None:
336
+ return Path(folder_path).expanduser().resolve()
337
+
338
+ if os.getenv("SYSTEM") == "spaces" and os.path.exists(
339
+ "/data"
340
+ ): # Persistent storage is enabled!
341
+ return Path("/data") / "_user_history"
342
+
343
+ # Not in a Space or Persistent storage not enabled => local folder
344
+ return Path(__file__).parent / "_user_history"
345
+
346
+
347
+ def _archives_path() -> Path:
348
+ # Doesn't have to be on persistent storage as it's only used for download
349
+ path = Path(__file__).parent / "_user_history_exports"
350
+ path.mkdir(parents=True, exist_ok=True)
351
+ return path
352
+
353
+
354
+ #################
355
+ # Admin section #
356
+ #################
357
+
358
+
359
+ def _admin_section() -> None:
360
+ title = gr.Markdown()
361
+ title.attach_load_event(_display_if_admin(), every=None)
362
+
363
+
364
+ def _display_if_admin() -> Callable:
365
+ def _inner(profile: gr.OAuthProfile | None) -> str:
366
+ if profile is None:
367
+ return ""
368
+ if profile["preferred_username"] in _fetch_admins():
369
+ return _admin_content()
370
+ return ""
371
+
372
+ return _inner
373
+
374
+
375
+ def _admin_content() -> str:
376
+ return f"""
377
+ ## Admin section
378
+
379
+ Running on **{os.getenv("SYSTEM", "local")}** (id: {os.getenv("SPACE_ID")}). {_get_msg_is_persistent_storage_enabled()}
380
+
381
+ Admins: {', '.join(_fetch_admins())}
382
+
383
+ {_get_nb_users()} user(s), {_get_nb_images()} image(s)
384
+
385
+ ### Configuration
386
+
387
+ History folder: *{_UserHistory().folder_path}*
388
+
389
+ Exports folder: *{_archives_path()}*
390
+
391
+ ### Disk usage
392
+
393
+ {_disk_space_warning_message()}
394
+ """
395
+
396
+
397
+ def _get_nb_users() -> int:
398
+ user_history = _UserHistory()
399
+ if not user_history.initialized:
400
+ return 0
401
+ if user_history.folder_path is not None:
402
+ return len(
403
+ [path for path in user_history.folder_path.iterdir() if path.is_dir()]
404
+ )
405
+ return 0
406
+
407
+
408
+ def _get_nb_images() -> int:
409
+ user_history = _UserHistory()
410
+ if not user_history.initialized:
411
+ return 0
412
+ if user_history.folder_path is not None:
413
+ return len([path for path in user_history.folder_path.glob("*/images/*")])
414
+ return 0
415
+
416
+
417
+ def _get_msg_is_persistent_storage_enabled() -> str:
418
+ if os.getenv("SYSTEM") == "spaces":
419
+ if os.path.exists("/data"):
420
+ return "Persistent storage is enabled."
421
+ else:
422
+ return (
423
+ "Persistent storage is not enabled. This means that user histories will be deleted when the Space is"
424
+ " restarted. Consider adding a Persistent Storage in your Space settings."
425
+ )
426
+ return ""
427
+
428
+
429
+ def _disk_space_warning_message() -> str:
430
+ user_history = _UserHistory()
431
+ if not user_history.initialized:
432
+ return ""
433
+
434
+ message = ""
435
+ if user_history.folder_path is not None:
436
+ total, used, _ = _get_disk_usage(user_history.folder_path)
437
+ message += f"History folder: **{used / 1e9 :.0f}/{total / 1e9 :.0f}GB** used ({100*used/total :.0f}%)."
438
+
439
+ total, used, _ = _get_disk_usage(_archives_path())
440
+ message += f"\n\nExports folder: **{used / 1e9 :.0f}/{total / 1e9 :.0f}GB** used ({100*used/total :.0f}%)."
441
+
442
+ return f"{message.strip()}"
443
+
444
+
445
+ def _get_disk_usage(path: Path) -> Tuple[int, int, int]:
446
+ for path in [path] + list(
447
+ path.parents
448
+ ): # first check target_dir, then each parents one by one
449
+ try:
450
+ return shutil.disk_usage(path)
451
+ except (
452
+ OSError
453
+ ): # if doesn't exist or can't read => fail silently and try parent one
454
+ pass
455
+ return 0, 0, 0
456
+
457
+
458
+ @cache
459
+ def _fetch_admins() -> List[str]:
460
+ # Running locally => fake user is admin
461
+ if os.getenv("SYSTEM") != "spaces":
462
+ return ["FakeGradioUser"]
463
+
464
+ # Running in Space but no space_id => ???
465
+ space_id = os.getenv("SPACE_ID")
466
+ if space_id is None:
467
+ return ["Unknown"]
468
+
469
+ # Running in Space => try to fetch organization members
470
+ # Otherwise, it's not an organization => namespace is the user
471
+ namespace = space_id.split("/")[0]
472
+ response = requests.get(
473
+ f"https://huggingface.co/api/organizations/{namespace}/members"
474
+ )
475
+ if response.status_code == 200:
476
+ return sorted(
477
+ (member["user"] for member in response.json()), key=lambda x: x.lower()
478
+ )
479
+ return [namespace]
480
+
481
+
482
+ ################################################################
483
+ # Legacy helpers to migrate image structure to new data format #
484
+ ################################################################
485
+ # TODO: remove this section once all Spaces have migrated
486
+
487
+
488
+ def _migrate_history():
489
+ """Script to migrate user history from v0 to v1."""
490
+ legacy_history_path = _legacy_get_history_folder_path()
491
+ if not legacy_history_path.exists():
492
+ return
493
+
494
+ error_count = 0
495
+ for json_path in legacy_history_path.glob("*.json"):
496
+ username = json_path.stem
497
+ print(f"Migrating history for user {username}...")
498
+ error_count += _legacy_move_user_history(username)
499
+ print("Done.")
500
+ print(f"Migration complete. {error_count} error(s) happened.")
501
+
502
+ if error_count == 0:
503
+ shutil.rmtree(legacy_history_path, ignore_errors=True)
504
+
505
+
506
+ def _legacy_move_user_history(username: str) -> int:
507
+ history = _legacy_read_user_history(username)
508
+ error_count = 0
509
+ for image, prompt in reversed(history):
510
+ try:
511
+ save_image(
512
+ label=prompt, image=image, profile={"preferred_username": username}
513
+ )
514
+ except Exception as e:
515
+ print("Issue while migrating image:", e)
516
+ error_count += 1
517
+ return error_count
518
+
519
+
520
+ def _legacy_get_history_folder_path() -> Path:
521
+ _folder = os.environ.get("HISTORY_FOLDER")
522
+ if _folder is None:
523
+ _folder = Path(__file__).parent / "history"
524
+ return Path(_folder)
525
+
526
+
527
+ def _legacy_read_user_history(username: str) -> List[Tuple[str, str]]:
528
+ """Return saved history for that user."""
529
+ with _legacy_user_lock(username):
530
+ path = _legacy_user_history_path(username)
531
+ if path.exists():
532
+ return json.loads(path.read_text())
533
+ return [] # No history yet
534
+
535
+
536
+ def _legacy_user_history_path(username: str) -> Path:
537
+ return _legacy_get_history_folder_path() / f"{username}.json"
538
+
539
+
540
+ def _legacy_user_lock(username: str) -> FileLock:
541
+ """Ensure history is not corrupted if concurrent calls."""
542
+ return FileLock(f"{_legacy_user_history_path(username)}.lock")