File size: 3,213 Bytes
2609fac
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
import streamlit as st
from pathlib import Path
from typing import Mapping
from datetime import datetime
from datetime import timedelta
from urllib.parse import unquote
from typing import MutableMapping
from streamlit.components.v1 import components


build_path = Path(__file__).parent / 'build'
_component_func = components.declare_component("CookieManager.sync_cookies", path=str(build_path))


class CookieManager(MutableMapping[str, str]):
    def __init__(self, *, path: str = None, prefix=""):
        self._queue = st.session_state.setdefault('CookieManager.queue', {})
        self._prefix = prefix
        raw_cookie = self._run_component(save_only=False, key="CookieManager.sync_cookies")
        if raw_cookie is None:
            self._cookies = None
        else:
            self._cookies = parse_cookies(raw_cookie)
            self._clean_queue()
        self._default_expiry = datetime.now() + timedelta(days=365)
        self._path = path if path is not None else "/"

    def ready(self) -> bool:
        return self._cookies is not None

    def save(self):
        if self._queue:
            self._run_component(save_only=True, key="CookieManager.sync_cookies.save")

    def _run_component(self, save_only: bool, key: str):
        queue = {
            self._prefix + k: v for k, v in self._queue.items()
        }
        return _component_func(queue=queue, saveOnly=save_only, key=key)

    def _clean_queue(self):
        for name, spec in list(self._queue.items()):
            value = self._cookies.get(self._prefix + name)
            if value == spec['value']:
                del self._queue[name]

    def __repr__(self):
        if self.ready():
            return f'<CookieManager: {dict(self)!r}>'
        return '<CookieManager: not ready>'

    def __getitem__(self, k: str) -> str:
        return self._get_cookies()[k]

    def __iter__(self):
        return iter(self._get_cookies())

    def __len__(self):
        return len(self._get_cookies())

    def __setitem__(self, key: str, value: str) -> None:
        if self._cookies.get(key) != value:
            self._queue[key] = dict(
                value=value,
                expires_at=self._default_expiry.isoformat(),
                path=self._path,
            )

    def __delitem__(self, key: str) -> None:
        if key in self._cookies:
            self._queue[key] = dict(value=None, path=self._path)

    def _get_cookies(self) -> Mapping[str, str]:
        if self._cookies is None:
            raise CookiesNotReady()
        cookies = {
            k[len(self._prefix):]: v
            for k, v in self._cookies.items()
            if k.startswith(self._prefix)
        }
        for name, spec in self._queue.items():
            if spec['value'] is not None:
                cookies[name] = spec['value']
            else:
                cookies.pop(name, None)
        return cookies


def parse_cookies(raw_cookie):
    cookies = {}
    for part in raw_cookie.split(';'):
        part = part.strip()
        if not part:
            continue
        name, value = part.split('=', 1)
        cookies[unquote(name)] = unquote(value)
    return cookies


class CookiesNotReady(Exception):
    pass