File size: 2,801 Bytes
8f4b172
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
d7974b7
8f4b172
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
import sqlite3
from pathlib import Path
from typing import List, Tuple
import json


class Database:
    def __init__(self, db_path=None):
        if db_path is None:
            raise ValueError("db_path must be provided")
        self.db_path = db_path
        self.db_file = self.db_path / "cache.db"
        if not self.db_file.exists():
            print("Creating database")
            print("DB_FILE", self.db_file)
            db = sqlite3.connect(self.db_file)
            with open(Path("schema.sql"), "r") as f:
                db.executescript(f.read())
            db.commit()
            db.close()

    def get_db(self):
        db = sqlite3.connect(self.db_file, check_same_thread=False)
        db.row_factory = sqlite3.Row
        return db

    def __enter__(self):
        self.db = self.get_db()
        return self.db

    def __exit__(self, exc_type, exc_value, traceback):
        self.db.close()

    def __call__(self):
        return self

    def insert(self, data: List[Tuple[str, str, str]]):
        with self() as db:
            cursor = db.cursor()
            try:
                for entry in data:
                    url, title, entries = entry
                    cursor.execute(
                        "INSERT OR REPLACE INTO cache (url, title, entries) VALUES (?, ?, ?)",
                        (url, title, entries),
                    )
            except Exception as e:
                print(e)
            db.commit()

    def filter(self, category: str):
        with self() as db:
            entries = db.execute("SELECT url, title, entries FROM cache").fetchall()
            out = []
            for row in entries:
                # parse json
                data = json.loads(row["entries"])
                try:
                    data = [
                        {
                            "title": entry["title"],
                            "link": entry["link"],
                            "published": entry["published"]
                            if "published" in entry
                            else entry["pubDate"]
                            if "pubDate" in entry
                            else "",
                            "summary": entry["summary"] if "summary" in entry else "",
                        }
                        for entry in data["entries"]
                    ]
                    if len(data) > 0:
                        out.append(
                            {
                                "entries": data,
                                "url": row["url"],
                                "title": row["title"],
                            }
                        )
                except Exception as e:
                    print(f"Errro on {row['url']}: {e}")
            return out