File size: 3,175 Bytes
2609fac
 
 
94004b3
 
2609fac
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
import streamlit as st
from pathlib import Path
from urllib.parse import unquote
from datetime import datetime, timedelta
from typing import Mapping, MutableMapping
from streamlit.components.v1 import components


build_path = Path(__file__).parent / 'build'
_component_func = components.declare_component("CookieManager.sync_cookies", path=str(build_path))


class CookieManager(MutableMapping[str, str]):
    def __init__(self, *, path: str = None, prefix=""):
        self._queue = st.session_state.setdefault('CookieManager.queue', {})
        self._prefix = prefix
        raw_cookie = self._run_component(save_only=False, key="CookieManager.sync_cookies")
        if raw_cookie is None:
            self._cookies = None
        else:
            self._cookies = parse_cookies(raw_cookie)
            self._clean_queue()
        self._default_expiry = datetime.now() + timedelta(days=365)
        self._path = path if path is not None else "/"

    def ready(self) -> bool:
        return self._cookies is not None

    def save(self):
        if self._queue:
            self._run_component(save_only=True, key="CookieManager.sync_cookies.save")

    def _run_component(self, save_only: bool, key: str):
        queue = {
            self._prefix + k: v for k, v in self._queue.items()
        }
        return _component_func(queue=queue, saveOnly=save_only, key=key)

    def _clean_queue(self):
        for name, spec in list(self._queue.items()):
            value = self._cookies.get(self._prefix + name)
            if value == spec['value']:
                del self._queue[name]

    def __repr__(self):
        if self.ready():
            return f'<CookieManager: {dict(self)!r}>'
        return '<CookieManager: not ready>'

    def __getitem__(self, k: str) -> str:
        return self._get_cookies()[k]

    def __iter__(self):
        return iter(self._get_cookies())

    def __len__(self):
        return len(self._get_cookies())

    def __setitem__(self, key: str, value: str) -> None:
        if self._cookies.get(key) != value:
            self._queue[key] = dict(
                value=value,
                expires_at=self._default_expiry.isoformat(),
                path=self._path,
            )

    def __delitem__(self, key: str) -> None:
        if key in self._cookies:
            self._queue[key] = dict(value=None, path=self._path)

    def _get_cookies(self) -> Mapping[str, str]:
        if self._cookies is None:
            raise CookiesNotReady()
        cookies = {
            k[len(self._prefix):]: v
            for k, v in self._cookies.items()
            if k.startswith(self._prefix)
        }
        for name, spec in self._queue.items():
            if spec['value'] is not None:
                cookies[name] = spec['value']
            else:
                cookies.pop(name, None)
        return cookies


def parse_cookies(raw_cookie):
    cookies = {}
    for part in raw_cookie.split(';'):
        part = part.strip()
        if not part:
            continue
        name, value = part.split('=', 1)
        cookies[unquote(name)] = unquote(value)
    return cookies


class CookiesNotReady(Exception):
    pass