File size: 9,802 Bytes
235adef
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
import json
import os
import shutil
import warnings
from datetime import datetime
from pathlib import Path
from typing import Dict, List, Tuple
from uuid import uuid4

import gradio as gr
import numpy as np
from filelock import FileLock
from PIL.Image import Image


def setup(
    folder_path: str | Path | None = None,
    delete_button: bool = True,
    export_button: bool = True,
) -> None:
    user_history = _UserHistory()
    user_history.folder_path = _resolve_folder_path(folder_path)
    user_history.delete_button = delete_button
    user_history.export_button = export_button
    user_history.initialized = True


def render() -> None:
    user_history = _UserHistory()

    # initialize with default config
    if not user_history.initialized:
        print("Initializing user history with default config. Use `user_history.setup(...)` to customize.")
        setup()

    # deactivate if no persistent storage
    if user_history.folder_path is None:
        gr.Markdown(
            "User history is deactivated as no Persistent Storage volume has been found. Please contact the Space"
            " owner to either assign a [Persistent Storage](https://huggingface.co/docs/hub/spaces-storage) or set"
            " `folder_path` to a temporary folder."
        )
        return

    # Render user history tab
    gr.Markdown(
        "## Your past generations\n\n(Log in to keep a gallery of your previous generations."
        " Your history will be saved and available on your next visit.)"
    )
    with gr.Row():
        gr.LoginButton(min_width=250)
        gr.LogoutButton(min_width=250)
        refresh_button = gr.Button("Refresh", icon="./assets/icon_refresh.png")
        export_button = gr.Button("Export", icon="./assets/icon_download.png")
        delete_button = gr.Button("Delete history", icon="./assets/icon_delete.png")

    # "Export zip" row (hidden by default)
    with gr.Row():
        export_file = gr.File(file_count="single", file_types=[".zip"], label="Exported history", visible=False)

    # "Config deletion" row (hidden by default)
    with gr.Row():
        confirm_button = gr.Button("Confirm delete all history", variant="stop", visible=False)
        cancel_button = gr.Button("Cancel", visible=False)

    # Gallery
    gallery = gr.Gallery(
        label="Past images",
        show_label=True,
        elem_id="gallery",
        object_fit="contain",
        columns=5,
        height=600,
        preview=False,
        show_share_button=False,
        show_download_button=False,
    )
    gr.Markdown("Make sure to save your images from time to time, this gallery may be deleted in the future.")
    gallery.attach_load_event(_fetch_user_history, every=None)

    # Interactions
    refresh_button.click(fn=_fetch_user_history, inputs=[], outputs=[gallery], queue=False)
    export_button.click(fn=_export_user_history, inputs=[], outputs=[export_file], queue=False)

    # Taken from https://github.com/gradio-app/gradio/issues/3324#issuecomment-1446382045
    delete_button.click(
        lambda: [gr.update(visible=True), gr.update(visible=True)],
        outputs=[confirm_button, cancel_button],
        queue=False,
    )
    cancel_button.click(
        lambda: [gr.update(visible=False), gr.update(visible=False)],
        outputs=[confirm_button, cancel_button],
        queue=False,
    )
    confirm_button.click(_delete_user_history).then(
        lambda: [gr.update(visible=False), gr.update(visible=False)],
        outputs=[confirm_button, cancel_button],
        queue=False,
    )


def save_image(
    profile: gr.OAuthProfile | None,
    image: Image | np.ndarray | str | Path,
    label: str | None = None,
    metadata: Dict | None = None,
):
    # Ignore images from logged out users
    if profile is None:
        return
    username = profile["preferred_username"]

    # Ignore images if user history not used
    user_history = _UserHistory()
    if not user_history.initialized:
        warnings.warn(
            "User history is not set in Gradio demo. Saving image is ignored. You must use `user_history.render(...)`"
            " first."
        )
        return

    # Copy image to storage
    image_path = _copy_image(image, dst_folder=user_history._user_images_path(username))

    # Save new image + metadata
    if metadata is None:
        metadata = {}
    if "datetime" not in metadata:
        metadata["datetime"] = str(datetime.now())
    data = {"path": str(image_path), "label": label, "metadata": metadata}
    with user_history._user_lock(username):
        with user_history._user_jsonl_path(username).open("a") as f:
            f.write(json.dumps(data) + "\n")


#############
# Internals #
#############


class _UserHistory(object):
    _instance = None
    initialized: bool = False

    folder_path: Path | None
    delete_button: bool
    export_button: bool

    def __new__(cls):
        # Using singleton pattern => we don't want to expose an object (more complex to use) but still want to keep
        # state between `render` and `save_image` calls.
        if cls._instance is None:
            cls._instance = super(_UserHistory, cls).__new__(cls)
        return cls._instance

    def _user_path(self, username: str) -> Path:
        if self.folder_path is None:
            raise Exception("User history is deactivated.")
        path = self.folder_path / username
        path.mkdir(parents=True, exist_ok=True)
        return path

    def _user_lock(self, username: str) -> FileLock:
        """Ensure history is not corrupted if concurrent calls."""
        if self.folder_path is None:
            raise Exception("User history is deactivated.")
        return FileLock(self.folder_path / f"{username}.lock")  # lock outside of folder => better when exporting ZIP

    def _user_jsonl_path(self, username: str) -> Path:
        return self._user_path(username) / "history.jsonl"

    def _user_images_path(self, username: str) -> Path:
        path = self._user_path(username) / "images"
        path.mkdir(parents=True, exist_ok=True)
        return path


def _fetch_user_history(profile: gr.OAuthProfile | None) -> List[Tuple[str, str]]:
    """Return saved history for that user, if it exists."""
    # Cannot load history for logged out users
    if profile is None:
        return []
    username = profile["preferred_username"]

    user_history = _UserHistory()
    if not user_history.initialized:
        warnings.warn("User history is not set in Gradio demo. You must use `user_history.render(...)` first.")
        return []

    with user_history._user_lock(username):
        # No file => no history saved yet
        jsonl_path = user_history._user_jsonl_path(username)
        if not jsonl_path.is_file():
            return []

        # Read history
        images = []
        for line in jsonl_path.read_text().splitlines():
            data = json.loads(line)
            images.append((data["path"], data["label"] or ""))
        return list(reversed(images))


def _export_user_history(profile: gr.OAuthProfile | None) -> Dict | None:
    """Zip all history for that user, if it exists and return it as a downloadable file."""
    # Cannot load history for logged out users
    if profile is None:
        return None
    username = profile["preferred_username"]

    user_history = _UserHistory()
    if not user_history.initialized:
        warnings.warn("User history is not set in Gradio demo. You must use `user_history.render(...)` first.")
        return None

    # Zip history
    with user_history._user_lock(username):
        path = shutil.make_archive(
            str(_archives_path() / f"history_{username}"), "zip", user_history._user_path(username)
        )

    return gr.update(visible=True, value=path)


def _delete_user_history(profile: gr.OAuthProfile | None) -> None:
    """Delete all history for that user."""
    # Cannot load history for logged out users
    if profile is None:
        return
    username = profile["preferred_username"]

    user_history = _UserHistory()
    if not user_history.initialized:
        warnings.warn("User history is not set in Gradio demo. You must use `user_history.render(...)` first.")
        return

    with user_history._user_lock(username):
        shutil.rmtree(user_history._user_path(username))


####################
# Internal helpers #
####################


def _copy_image(image: Image | np.ndarray | str | Path, dst_folder: Path) -> Path:
    """Copy image to the images folder."""
    # Already a path => copy it
    if isinstance(image, str):
        image = Path(image)
    if isinstance(image, Path):
        dst = dst_folder / f"{uuid4().hex}_{Path(image).name}"  # keep file ext
        shutil.copyfile(image, dst)
        return dst

    # Still a Python object => serialize it
    if isinstance(image, np.ndarray):
        image = Image.fromarray(image)
    if isinstance(image, Image):
        dst = dst_folder / f"{uuid4().hex}.png"
        image.save(dst)
        return dst

    raise ValueError(f"Unsupported image type: {type(image)}")


def _resolve_folder_path(folder_path: str | Path | None) -> Path | None:
    if folder_path is not None:
        return Path(folder_path).expanduser().resolve()

    if os.getenv("SYSTEM") == "spaces":
        if os.path.exists("/data"):  # Persistent storage is enabled!
            return Path("/data") / "user_history"
        else:
            return None  # No persistent storage => no user history

    # Not in a Space => local folder
    return Path(__file__).parent / "user_history"


def _archives_path() -> Path:
    # Doesn't have to be on persistent storage as it's only used for download
    path = Path(__file__).parent / "_history_snapshots"
    path.mkdir(parents=True, exist_ok=True)
    return path