User history V2

#101
by Wauplin HF staff - opened
Files changed (3) hide show
  1. app.py +32 -10
  2. gallery_history.py +0 -134
  3. user_history.py +524 -0
app.py CHANGED
@@ -16,7 +16,7 @@ from diffusers import (
16
  )
17
  import time
18
  from share_btn import community_icon_html, loading_icon_html, share_js
19
- from gallery_history import fetch_gallery_history, show_gallery_history
20
  from illusion_style import css
21
 
22
  BASE_MODEL = "SG161222/Realistic_Vision_V5.1_noVAE"
@@ -117,7 +117,8 @@ def inference(
117
  upscaler_strength: float = 0.5,
118
  seed: int = -1,
119
  sampler = "DPM++ Karras SDE",
120
- progress = gr.Progress(track_tqdm=True)
 
121
  ):
122
  start_time = time.time()
123
  start_time_struct = time.localtime(start_time)
@@ -165,9 +166,28 @@ def inference(
165
  end_time_struct = time.localtime(end_time)
166
  end_time_formatted = time.strftime("%H:%M:%S", end_time_struct)
167
  print(f"Inference ended at {end_time_formatted}, taking {end_time-start_time}s")
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
168
  return out_image["images"][0], gr.update(visible=True), gr.update(visible=True), my_seed
169
 
170
- with gr.Blocks(css=css) as app:
171
  gr.Markdown(
172
  '''
173
  <center><h1>Illusion Diffusion HQ 🌀</h1></span>
@@ -205,7 +225,6 @@ with gr.Blocks(css=css) as app:
205
  loading_icon = gr.HTML(loading_icon_html)
206
  share_button = gr.Button("Share to community", elem_id="share-btn")
207
 
208
- history = show_gallery_history()
209
  prompt.submit(
210
  check_inputs,
211
  inputs=[prompt, control_image],
@@ -226,8 +245,6 @@ with gr.Blocks(css=css) as app:
226
  outputs=[result_image],
227
  queue=False,
228
  postprocess=False
229
- ).success(
230
- fn=fetch_gallery_history, inputs=[prompt, result_image], outputs=history, queue=False
231
  )
232
  run_btn.click(
233
  check_inputs,
@@ -249,11 +266,16 @@ with gr.Blocks(css=css) as app:
249
  outputs=[result_image],
250
  queue=False,
251
  postprocess=False
252
- ).success(
253
- fn=fetch_gallery_history, inputs=[prompt, result_image], outputs=history, queue=False
254
  )
255
  share_button.click(None, [], [], _js=share_js)
256
- app.queue(max_size=20)
 
 
 
 
 
 
 
257
 
258
  if __name__ == "__main__":
259
- app.launch(max_threads=400)
 
16
  )
17
  import time
18
  from share_btn import community_icon_html, loading_icon_html, share_js
19
+ import user_history
20
  from illusion_style import css
21
 
22
  BASE_MODEL = "SG161222/Realistic_Vision_V5.1_noVAE"
 
117
  upscaler_strength: float = 0.5,
118
  seed: int = -1,
119
  sampler = "DPM++ Karras SDE",
120
+ progress = gr.Progress(track_tqdm=True),
121
+ profile: gr.OAuthProfile | None = None,
122
  ):
123
  start_time = time.time()
124
  start_time_struct = time.localtime(start_time)
 
166
  end_time_struct = time.localtime(end_time)
167
  end_time_formatted = time.strftime("%H:%M:%S", end_time_struct)
168
  print(f"Inference ended at {end_time_formatted}, taking {end_time-start_time}s")
169
+
170
+ # Save image + metadata
171
+ user_history.save_image(
172
+ label=prompt,
173
+ image=out_image["images"][0],
174
+ profile=profile,
175
+ metadata={
176
+ "prompt": prompt,
177
+ "negative_prompt": negative_prompt,
178
+ "guidance_scale": guidance_scale,
179
+ "controlnet_conditioning_scale": controlnet_conditioning_scale,
180
+ "control_guidance_start": control_guidance_start,
181
+ "control_guidance_end": control_guidance_end,
182
+ "upscaler_strength": upscaler_strength,
183
+ "seed": seed,
184
+ "sampler": sampler,
185
+ },
186
+ )
187
+
188
  return out_image["images"][0], gr.update(visible=True), gr.update(visible=True), my_seed
189
 
190
+ with gr.Blocks() as app:
191
  gr.Markdown(
192
  '''
193
  <center><h1>Illusion Diffusion HQ 🌀</h1></span>
 
225
  loading_icon = gr.HTML(loading_icon_html)
226
  share_button = gr.Button("Share to community", elem_id="share-btn")
227
 
 
228
  prompt.submit(
229
  check_inputs,
230
  inputs=[prompt, control_image],
 
245
  outputs=[result_image],
246
  queue=False,
247
  postprocess=False
 
 
248
  )
249
  run_btn.click(
250
  check_inputs,
 
266
  outputs=[result_image],
267
  queue=False,
268
  postprocess=False
 
 
269
  )
270
  share_button.click(None, [], [], _js=share_js)
271
+
272
+ with gr.Blocks(css=css) as app_with_history:
273
+ with gr.Tab("Demo"):
274
+ app.render()
275
+ with gr.Tab("Past generations"):
276
+ user_history.render()
277
+
278
+ app_with_history.queue(max_size=20)
279
 
280
  if __name__ == "__main__":
281
+ app_with_history.launch(max_threads=400)
gallery_history.py DELETED
@@ -1,134 +0,0 @@
1
- """
2
- How to use:
3
- 1. Create a Space with a Persistent Storage attached. Filesystem will be available under `/data`.
4
- 2. Add `hf_oauth: true` to the Space metadata (README.md). Make sure to have Gradio>=3.41.0 configured.
5
- 3. Add `HISTORY_FOLDER` as a Space variable (example. `"/data/history"`).
6
- 4. Add `filelock` as dependency in `requirements.txt`.
7
- 5. Add history gallery to your Gradio app:
8
- a. Add imports: `from gallery_history import fetch_gallery_history, show_gallery_history`
9
- a. Add `history = show_gallery_history()` within `gr.Blocks` context.
10
- b. Add `.then(fn=fetch_gallery_history, inputs=[prompt, result], outputs=history)` on the generate event.
11
- """
12
- import json
13
- import os
14
- import numpy as np
15
- import shutil
16
- from pathlib import Path
17
- from PIL import Image
18
- from typing import Dict, List, Optional, Tuple
19
- from uuid import uuid4
20
-
21
- import gradio as gr
22
- from filelock import FileLock
23
-
24
- _folder = os.environ.get("HISTORY_FOLDER")
25
- if _folder is None:
26
- print(
27
- "'HISTORY_FOLDER' environment variable not set. User history will be saved "
28
- "locally and will be lost when the Space instance is restarted."
29
- )
30
- _folder = Path(__file__).parent / "history"
31
- if _folder.startswith("/data") and not os.path.exists("/data"):
32
- print(
33
- f"'HISTORY_FOLDER' environment variable is set to '{_folder}' which doesn't exist. User history will be saved "
34
- "locally and will be lost when the Space instance is restarted."
35
- )
36
- _folder = Path(__file__).parent / "history"
37
- HISTORY_FOLDER_PATH = Path(_folder)
38
-
39
- IMAGES_FOLDER_PATH = HISTORY_FOLDER_PATH / "images"
40
- IMAGES_FOLDER_PATH.mkdir(parents=True, exist_ok=True)
41
-
42
-
43
- def show_gallery_history():
44
- gr.Markdown(
45
- "## Your past generations\n\n(Log in to keep a gallery of your previous generations."
46
- " Your history will be saved and available on your next visit.)"
47
- )
48
- with gr.Column():
49
- with gr.Row():
50
- gr.LoginButton(min_width=250)
51
- gr.LogoutButton(min_width=250)
52
- gallery = gr.Gallery(
53
- label="Past images",
54
- show_label=True,
55
- elem_id="gallery",
56
- object_fit="contain",
57
- columns=4,
58
- height=512,
59
- preview=False,
60
- show_share_button=False,
61
- show_download_button=False,
62
- )
63
- gr.Markdown(
64
- "Make sure to save your images from time to time, this gallery may be deleted in the future."
65
- )
66
- gallery.attach_load_event(fetch_gallery_history, every=None)
67
- return gallery
68
-
69
-
70
- def fetch_gallery_history(
71
- prompt: Optional[str] = None,
72
- result: Optional[np.ndarray] = None,
73
- user: Optional[gr.OAuthProfile] = None,
74
- ):
75
- if user is None:
76
- return []
77
- try:
78
- if prompt is not None and result is not None: # None values means no new images
79
- new_image = Image.fromarray(result, 'RGB')
80
- return _update_user_history(user["preferred_username"], new_image, prompt)
81
- else:
82
- return _read_user_history(user["preferred_username"])
83
- except Exception as e:
84
- raise gr.Error(f"Error while fetching history: {e}") from e
85
-
86
-
87
- ####################
88
- # Internal helpers #
89
- ####################
90
-
91
-
92
- def _read_user_history(username: str) -> List[Tuple[str, str]]:
93
- """Return saved history for that user."""
94
- with _user_lock(username):
95
- path = _user_history_path(username)
96
- if path.exists():
97
- return json.loads(path.read_text())
98
- return [] # No history yet
99
-
100
-
101
- def _update_user_history(
102
- username: str, new_image: Image.Image, prompt: str
103
- ) -> List[Tuple[str, str]]:
104
- """Update history for that user and return it."""
105
- with _user_lock(username):
106
- # Read existing
107
- path = _user_history_path(username)
108
- if path.exists():
109
- images = json.loads(path.read_text())
110
- else:
111
- images = [] # No history yet
112
-
113
- # Copy image to persistent folder
114
- images = [(_copy_image(new_image), prompt)] + images
115
-
116
- # Save and return
117
- path.write_text(json.dumps(images))
118
- return images
119
-
120
-
121
- def _user_history_path(username: str) -> Path:
122
- return HISTORY_FOLDER_PATH / f"{username}.json"
123
-
124
-
125
- def _user_lock(username: str) -> FileLock:
126
- """Ensure history is not corrupted if concurrent calls."""
127
- return FileLock(f"{_user_history_path(username)}.lock")
128
-
129
-
130
- def _copy_image(new_image: Image.Image) -> str:
131
- """Copy image to the persistent storage."""
132
- dst = str(IMAGES_FOLDER_PATH / f"{uuid4().hex}.png")
133
- new_image.save(dst)
134
- return dst
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
user_history.py ADDED
@@ -0,0 +1,524 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """
2
+ User History is a plugin that you can add to your Spaces to cache generated images for your users.
3
+
4
+ Key features:
5
+ - 🤗 Sign in with Hugging Face
6
+ - Save generated images with their metadata: prompts, timestamp, hyper-parameters, etc.
7
+ - Export your history as zip.
8
+ - Delete your history to respect privacy.
9
+ - Compatible with Persistent Storage for long-term storage.
10
+ - Admin panel to check configuration and disk usage .
11
+
12
+ Useful links:
13
+ - Demo: https://huggingface.co/spaces/Wauplin/gradio-user-history
14
+ - README: https://huggingface.co/spaces/Wauplin/gradio-user-history/blob/main/README.md
15
+ - Source file: https://huggingface.co/spaces/Wauplin/gradio-user-history/blob/main/user_history.py
16
+ - Discussions: https://huggingface.co/spaces/Wauplin/gradio-user-history/discussions
17
+ """
18
+ import json
19
+ import os
20
+ import shutil
21
+ import warnings
22
+ from datetime import datetime
23
+ from functools import cache
24
+ from pathlib import Path
25
+ from typing import Callable, Dict, List, Tuple
26
+ from uuid import uuid4
27
+
28
+ import gradio as gr
29
+ import numpy as np
30
+ import requests
31
+ from filelock import FileLock
32
+ from PIL.Image import Image
33
+
34
+
35
+ def setup(folder_path: str | Path | None = None) -> None:
36
+ user_history = _UserHistory()
37
+ user_history.folder_path = _resolve_folder_path(folder_path)
38
+ user_history.initialized = True
39
+
40
+ # TODO: remove this section once all Spaces have migrated
41
+ _migrate_history()
42
+
43
+
44
+ def render() -> None:
45
+ user_history = _UserHistory()
46
+
47
+ # initialize with default config
48
+ if not user_history.initialized:
49
+ print(
50
+ "Initializing user history with default config. Use `user_history.setup(...)` to customize folder_path."
51
+ )
52
+ setup()
53
+
54
+ # Render user history tab
55
+ gr.Markdown(
56
+ "## Your past generations\n\nLog in to keep a gallery of your previous generations. Your history will be saved"
57
+ " and available on your next visit. Make sure to export your images from time to time as this gallery may be"
58
+ " deleted in the future."
59
+ )
60
+
61
+ if os.getenv("SYSTEM") == "spaces" and not os.path.exists("/data"):
62
+ gr.Markdown(
63
+ "**⚠️ Persistent storage is disabled, meaning your history will be lost if the Space gets restarted."
64
+ " Only the Space owner can setup a Persistent Storage. If you are not the Space owner, consider"
65
+ " duplicating this Space to set your own storage.⚠️**"
66
+ )
67
+
68
+ with gr.Row():
69
+ gr.LoginButton(min_width=250)
70
+ gr.LogoutButton(min_width=250)
71
+ refresh_button = gr.Button(
72
+ "Refresh",
73
+ icon="https://huggingface.co/spaces/Wauplin/gradio-user-history/resolve/main/assets/icon_refresh.png",
74
+ )
75
+ export_button = gr.Button(
76
+ "Export",
77
+ icon="https://huggingface.co/spaces/Wauplin/gradio-user-history/resolve/main/assets/icon_download.png",
78
+ )
79
+ delete_button = gr.Button(
80
+ "Delete history",
81
+ icon="https://huggingface.co/spaces/Wauplin/gradio-user-history/resolve/main/assets/icon_delete.png",
82
+ )
83
+
84
+ # "Export zip" row (hidden by default)
85
+ with gr.Row():
86
+ export_file = gr.File(
87
+ file_count="single",
88
+ file_types=[".zip"],
89
+ label="Exported history",
90
+ visible=False,
91
+ )
92
+
93
+ # "Config deletion" row (hidden by default)
94
+ with gr.Row():
95
+ confirm_button = gr.Button(
96
+ "Confirm delete all history", variant="stop", visible=False
97
+ )
98
+ cancel_button = gr.Button("Cancel", visible=False)
99
+
100
+ # Gallery
101
+ gallery = gr.Gallery(
102
+ label="Past images",
103
+ show_label=True,
104
+ elem_id="gallery",
105
+ object_fit="contain",
106
+ columns=5,
107
+ height=600,
108
+ preview=False,
109
+ show_share_button=False,
110
+ show_download_button=False,
111
+ )
112
+ gr.Markdown(
113
+ "User history is powered by"
114
+ " [Wauplin/gradio-user-history](https://huggingface.co/spaces/Wauplin/gradio-user-history). Integrate it to"
115
+ " your own Space in just a few lines of code!"
116
+ )
117
+ gallery.attach_load_event(_fetch_user_history, every=None)
118
+
119
+ # Interactions
120
+ refresh_button.click(
121
+ fn=_fetch_user_history, inputs=[], outputs=[gallery], queue=False
122
+ )
123
+ export_button.click(
124
+ fn=_export_user_history, inputs=[], outputs=[export_file], queue=False
125
+ )
126
+
127
+ # Taken from https://github.com/gradio-app/gradio/issues/3324#issuecomment-1446382045
128
+ delete_button.click(
129
+ lambda: [gr.update(visible=True), gr.update(visible=True)],
130
+ outputs=[confirm_button, cancel_button],
131
+ queue=False,
132
+ )
133
+ cancel_button.click(
134
+ lambda: [gr.update(visible=False), gr.update(visible=False)],
135
+ outputs=[confirm_button, cancel_button],
136
+ queue=False,
137
+ )
138
+ confirm_button.click(_delete_user_history).then(
139
+ lambda: [gr.update(visible=False), gr.update(visible=False)],
140
+ outputs=[confirm_button, cancel_button],
141
+ queue=False,
142
+ )
143
+
144
+ # Admin section (only shown locally or when logged in as Space owner)
145
+ _admin_section()
146
+
147
+
148
+ def save_image(
149
+ profile: gr.OAuthProfile | None,
150
+ image: Image | np.ndarray | str | Path,
151
+ label: str | None = None,
152
+ metadata: Dict | None = None,
153
+ ):
154
+ # Ignore images from logged out users
155
+ if profile is None:
156
+ return
157
+ username = profile["preferred_username"]
158
+
159
+ # Ignore images if user history not used
160
+ user_history = _UserHistory()
161
+ if not user_history.initialized:
162
+ warnings.warn(
163
+ "User history is not set in Gradio demo. Saving image is ignored. You must use `user_history.render(...)`"
164
+ " first."
165
+ )
166
+ return
167
+
168
+ # Copy image to storage
169
+ image_path = _copy_image(image, dst_folder=user_history._user_images_path(username))
170
+
171
+ # Save new image + metadata
172
+ if metadata is None:
173
+ metadata = {}
174
+ if "datetime" not in metadata:
175
+ metadata["datetime"] = str(datetime.now())
176
+ data = {"path": str(image_path), "label": label, "metadata": metadata}
177
+ with user_history._user_lock(username):
178
+ with user_history._user_jsonl_path(username).open("a") as f:
179
+ f.write(json.dumps(data) + "\n")
180
+
181
+
182
+ #############
183
+ # Internals #
184
+ #############
185
+
186
+
187
+ class _UserHistory(object):
188
+ _instance = None
189
+ initialized: bool = False
190
+ folder_path: Path
191
+
192
+ def __new__(cls):
193
+ # Using singleton pattern => we don't want to expose an object (more complex to use) but still want to keep
194
+ # state between `render` and `save_image` calls.
195
+ if cls._instance is None:
196
+ cls._instance = super(_UserHistory, cls).__new__(cls)
197
+ return cls._instance
198
+
199
+ def _user_path(self, username: str) -> Path:
200
+ path = self.folder_path / username
201
+ path.mkdir(parents=True, exist_ok=True)
202
+ return path
203
+
204
+ def _user_lock(self, username: str) -> FileLock:
205
+ """Ensure history is not corrupted if concurrent calls."""
206
+ return FileLock(
207
+ self.folder_path / f"{username}.lock"
208
+ ) # lock outside of folder => better when exporting ZIP
209
+
210
+ def _user_jsonl_path(self, username: str) -> Path:
211
+ return self._user_path(username) / "history.jsonl"
212
+
213
+ def _user_images_path(self, username: str) -> Path:
214
+ path = self._user_path(username) / "images"
215
+ path.mkdir(parents=True, exist_ok=True)
216
+ return path
217
+
218
+
219
+ def _fetch_user_history(profile: gr.OAuthProfile | None) -> List[Tuple[str, str]]:
220
+ """Return saved history for that user, if it exists."""
221
+ # Cannot load history for logged out users
222
+ if profile is None:
223
+ return []
224
+ username = profile["preferred_username"]
225
+
226
+ user_history = _UserHistory()
227
+ if not user_history.initialized:
228
+ warnings.warn(
229
+ "User history is not set in Gradio demo. You must use `user_history.render(...)` first."
230
+ )
231
+ return []
232
+
233
+ with user_history._user_lock(username):
234
+ # No file => no history saved yet
235
+ jsonl_path = user_history._user_jsonl_path(username)
236
+ if not jsonl_path.is_file():
237
+ return []
238
+
239
+ # Read history
240
+ images = []
241
+ for line in jsonl_path.read_text().splitlines():
242
+ data = json.loads(line)
243
+ images.append((data["path"], data["label"] or ""))
244
+ return list(reversed(images))
245
+
246
+
247
+ def _export_user_history(profile: gr.OAuthProfile | None) -> Dict | None:
248
+ """Zip all history for that user, if it exists and return it as a downloadable file."""
249
+ # Cannot load history for logged out users
250
+ if profile is None:
251
+ return None
252
+ username = profile["preferred_username"]
253
+
254
+ user_history = _UserHistory()
255
+ if not user_history.initialized:
256
+ warnings.warn(
257
+ "User history is not set in Gradio demo. You must use `user_history.render(...)` first."
258
+ )
259
+ return None
260
+
261
+ # Zip history
262
+ with user_history._user_lock(username):
263
+ path = shutil.make_archive(
264
+ str(_archives_path() / f"history_{username}"),
265
+ "zip",
266
+ user_history._user_path(username),
267
+ )
268
+
269
+ return gr.update(visible=True, value=path)
270
+
271
+
272
+ def _delete_user_history(profile: gr.OAuthProfile | None) -> None:
273
+ """Delete all history for that user."""
274
+ # Cannot load history for logged out users
275
+ if profile is None:
276
+ return
277
+ username = profile["preferred_username"]
278
+
279
+ user_history = _UserHistory()
280
+ if not user_history.initialized:
281
+ warnings.warn(
282
+ "User history is not set in Gradio demo. You must use `user_history.render(...)` first."
283
+ )
284
+ return
285
+
286
+ with user_history._user_lock(username):
287
+ shutil.rmtree(user_history._user_path(username))
288
+
289
+
290
+ ####################
291
+ # Internal helpers #
292
+ ####################
293
+
294
+
295
+ def _copy_image(image: Image | np.ndarray | str | Path, dst_folder: Path) -> Path:
296
+ """Copy image to the images folder."""
297
+ # Already a path => copy it
298
+ if isinstance(image, str):
299
+ image = Path(image)
300
+ if isinstance(image, Path):
301
+ dst = dst_folder / f"{uuid4().hex}_{Path(image).name}" # keep file ext
302
+ shutil.copyfile(image, dst)
303
+ return dst
304
+
305
+ # Still a Python object => serialize it
306
+ if isinstance(image, np.ndarray):
307
+ image = Image.fromarray(image)
308
+ if isinstance(image, Image):
309
+ dst = dst_folder / f"{uuid4().hex}.png"
310
+ image.save(dst)
311
+ return dst
312
+
313
+ raise ValueError(f"Unsupported image type: {type(image)}")
314
+
315
+
316
+ def _resolve_folder_path(folder_path: str | Path | None) -> Path:
317
+ if folder_path is not None:
318
+ return Path(folder_path).expanduser().resolve()
319
+
320
+ if os.getenv("SYSTEM") == "spaces" and os.path.exists(
321
+ "/data"
322
+ ): # Persistent storage is enabled!
323
+ return Path("/data") / "_user_history"
324
+
325
+ # Not in a Space or Persistent storage not enabled => local folder
326
+ return Path(__file__).parent / "_user_history"
327
+
328
+
329
+ def _archives_path() -> Path:
330
+ # Doesn't have to be on persistent storage as it's only used for download
331
+ path = Path(__file__).parent / "_user_history_exports"
332
+ path.mkdir(parents=True, exist_ok=True)
333
+ return path
334
+
335
+
336
+ #################
337
+ # Admin section #
338
+ #################
339
+
340
+
341
+ def _admin_section() -> None:
342
+ title = gr.Markdown()
343
+ title.attach_load_event(_display_if_admin(), every=None)
344
+
345
+
346
+ def _display_if_admin() -> Callable:
347
+ def _inner(profile: gr.OAuthProfile | None) -> str:
348
+ if profile is None:
349
+ return ""
350
+ if profile["preferred_username"] in _fetch_admins():
351
+ return _admin_content()
352
+ return ""
353
+
354
+ return _inner
355
+
356
+
357
+ def _admin_content() -> str:
358
+ return f"""
359
+ ## Admin section
360
+
361
+ Running on **{os.getenv("SYSTEM", "local")}** (id: {os.getenv("SPACE_ID")}). {_get_msg_is_persistent_storage_enabled()}
362
+
363
+ Admins: {', '.join(_fetch_admins())}
364
+
365
+ {_get_nb_users()} user(s), {_get_nb_images()} image(s)
366
+
367
+ ### Configuration
368
+
369
+ History folder: *{_UserHistory().folder_path}*
370
+
371
+ Exports folder: *{_archives_path()}*
372
+
373
+ ### Disk usage
374
+
375
+ {_disk_space_warning_message()}
376
+ """
377
+
378
+
379
+ def _get_nb_users() -> int:
380
+ user_history = _UserHistory()
381
+ if not user_history.initialized:
382
+ return 0
383
+ if user_history.folder_path is not None:
384
+ return len(
385
+ [path for path in user_history.folder_path.iterdir() if path.is_dir()]
386
+ )
387
+ return 0
388
+
389
+
390
+ def _get_nb_images() -> int:
391
+ user_history = _UserHistory()
392
+ if not user_history.initialized:
393
+ return 0
394
+ if user_history.folder_path is not None:
395
+ return len([path for path in user_history.folder_path.glob("*/images/*")])
396
+ return 0
397
+
398
+
399
+ def _get_msg_is_persistent_storage_enabled() -> str:
400
+ if os.getenv("SYSTEM") == "spaces":
401
+ if os.path.exists("/data"):
402
+ return "Persistent storage is enabled."
403
+ else:
404
+ return (
405
+ "Persistent storage is not enabled. This means that user histories will be deleted when the Space is"
406
+ " restarted. Consider adding a Persistent Storage in your Space settings."
407
+ )
408
+ return ""
409
+
410
+
411
+ def _disk_space_warning_message() -> str:
412
+ user_history = _UserHistory()
413
+ if not user_history.initialized:
414
+ return ""
415
+
416
+ message = ""
417
+ if user_history.folder_path is not None:
418
+ total, used, _ = _get_disk_usage(user_history.folder_path)
419
+ message += f"History folder: **{used / 1e9 :.0f}/{total / 1e9 :.0f}GB** used ({100*used/total :.0f}%)."
420
+
421
+ total, used, _ = _get_disk_usage(_archives_path())
422
+ message += f"\n\nExports folder: **{used / 1e9 :.0f}/{total / 1e9 :.0f}GB** used ({100*used/total :.0f}%)."
423
+
424
+ return f"{message.strip()}"
425
+
426
+
427
+ def _get_disk_usage(path: Path) -> Tuple[int, int, int]:
428
+ for path in [path] + list(
429
+ path.parents
430
+ ): # first check target_dir, then each parents one by one
431
+ try:
432
+ return shutil.disk_usage(path)
433
+ except (
434
+ OSError
435
+ ): # if doesn't exist or can't read => fail silently and try parent one
436
+ pass
437
+ return 0, 0, 0
438
+
439
+
440
+ @cache
441
+ def _fetch_admins() -> List[str]:
442
+ # Running locally => fake user is admin
443
+ if os.getenv("SYSTEM") != "spaces":
444
+ return ["FakeGradioUser"]
445
+
446
+ # Running in Space but no space_id => ???
447
+ space_id = os.getenv("SPACE_ID")
448
+ if space_id is None:
449
+ return ["Unknown"]
450
+
451
+ # Running in Space => try to fetch organization members
452
+ # Otherwise, it's not an organization => namespace is the user
453
+ namespace = space_id.split("/")[0]
454
+ response = requests.get(
455
+ f"https://huggingface.co/api/organizations/{namespace}/members"
456
+ )
457
+ if response.status_code == 200:
458
+ return sorted(
459
+ (member["user"] for member in response.json()), key=lambda x: x.lower()
460
+ )
461
+ return [namespace]
462
+
463
+
464
+ ################################################################
465
+ # Legacy helpers to migrate image structure to new data format #
466
+ ################################################################
467
+ # TODO: remove this section once all Spaces have migrated
468
+
469
+
470
+ def _migrate_history():
471
+ """Script to migrate user history from v0 to v1."""
472
+ legacy_history_path = _legacy_get_history_folder_path()
473
+ if not legacy_history_path.exists():
474
+ return
475
+
476
+ error_count = 0
477
+ for json_path in legacy_history_path.glob("*.json"):
478
+ username = json_path.stem
479
+ print(f"Migrating history for user {username}...")
480
+ error_count += _legacy_move_user_history(username)
481
+ print("Done.")
482
+ print(f"Migration complete. {error_count} error(s) happened.")
483
+
484
+ if error_count == 0:
485
+ shutil.rmtree(legacy_history_path, ignore_errors=True)
486
+
487
+
488
+ def _legacy_move_user_history(username: str) -> int:
489
+ history = _legacy_read_user_history(username)
490
+ error_count = 0
491
+ for image, prompt in reversed(history):
492
+ try:
493
+ save_image(
494
+ label=prompt, image=image, profile={"preferred_username": username}
495
+ )
496
+ except Exception as e:
497
+ print("Issue while migrating image:", e)
498
+ error_count += 1
499
+ return error_count
500
+
501
+
502
+ def _legacy_get_history_folder_path() -> Path:
503
+ _folder = os.environ.get("HISTORY_FOLDER")
504
+ if _folder is None:
505
+ _folder = Path(__file__).parent / "history"
506
+ return Path(_folder)
507
+
508
+
509
+ def _legacy_read_user_history(username: str) -> List[Tuple[str, str]]:
510
+ """Return saved history for that user."""
511
+ with _legacy_user_lock(username):
512
+ path = _legacy_user_history_path(username)
513
+ if path.exists():
514
+ return json.loads(path.read_text())
515
+ return [] # No history yet
516
+
517
+
518
+ def _legacy_user_history_path(username: str) -> Path:
519
+ return _legacy_get_history_folder_path() / f"{username}.json"
520
+
521
+
522
+ def _legacy_user_lock(username: str) -> FileLock:
523
+ """Ensure history is not corrupted if concurrent calls."""
524
+ return FileLock(f"{_legacy_user_history_path(username)}.lock")