import sqlite3 import time import atexit class FileDict: def __init__(self, file_path, buffer_size=100000, buffer_idle_time=5, table='filedict'): self.file_path = file_path self.table = table self.conn = sqlite3.connect(file_path,check_same_thread=False) self.conn.execute('CREATE TABLE IF NOT EXISTS {} (key TEXT PRIMARY KEY, value TEXT)'.format(self.table)) self.buffer = [] self.buffer_size = buffer_size self.last_commit_time = time.time() self.buffer_idle_time = buffer_idle_time atexit.register(self.close) def get(self, key): try:return self.__getitem__(key) except KeyError: return None def __getitem__(self, key): self._check_buffer() cursor = self.conn.execute('SELECT value FROM {} WHERE key = ?'.format(self.table), (key,)) result = cursor.fetchone() if result is None: raise KeyError(key) return result[0] def Tables(self): cursor = self.conn.execute("SELECT name FROM sqlite_master WHERE type='table';") tables = cursor.fetchall() table_names = [t[0] for t in tables] return table_names def __setitem__(self, key, value): try: self.check_key(key) self.buffer.append(('set', key, value)) self._check_buffer() except sqlite3.IntegrityError: self.buffer.append(('update', key, value)) self._check_buffer() def __delitem__(self, key): self.buffer.append(('del', key)) self._check_buffer() def __iter__(self): self._check_buffer() cursor = self.conn.execute('SELECT key FROM {}'.format(self.table)) while True: result=cursor.fetchone() if not result or result is None: break yield result[0] cursor.close() #raise StopIteration def items(self): self._check_buffer() cursor = self.conn.execute('SELECT key, value FROM {}'.format(self.table)) while True: result=cursor.fetchone() if not result or result is None: break yield result cursor.close() #raise StopIteration #return cursor.fetchall() def from_dict(self, dict): self.check_dict(dict) self.conn.execute('DROP TABLE IF EXISTS {}'.format(self.table)) self.conn.execute('CREATE TABLE {} (key TEXT PRIMARY KEY, value TEXT)'.format(self.table)) self.conn.executemany('INSERT INTO {} (key, value) VALUES (?, ?)'.format(self.table), dict.items()) self.conn.commit() def add_items(self, items): for key, value in items.items(): try: self.check_key(key) self.buffer.append(('set', key, value)) self._check_buffer() except sqlite3.IntegrityError: self.buffer.append(('update', key, value)) self._check_buffer() self._check_buffer() def _check_buffer(self): if not self.buffer: return idle_time = time.time() - self.last_commit_time if len(self.buffer) >= self.buffer_size or idle_time >= self.buffer_idle_time: self._commit() def _commit(self): if not self.buffer: return cursor = self.conn.cursor() for op in self.buffer: if op[0] == 'set': cursor.execute('INSERT OR REPLACE INTO {} (key, value) VALUES (?, ?)'.format(self.table), (op[1], op[2])) elif op[0] == 'update': cursor.execute('UPDATE {} SET value = ? WHERE key = ?'.format(self.table), (op[2], op[1])) elif op[0] == 'del': cursor.execute('DELETE FROM {} WHERE key = ?'.format(self.table), (op[1],)) self.buffer = [] self.last_commit_time = time.time() self.conn.commit() def check_dict(self, dictionary): for key in dictionary: self.check_key(key) def check_key(self, key): if not isinstance(key, str): raise TypeError('Keys must be strings.') if not key: raise ValueError('Keys cannot be empty strings.') def search_keys(self, pattern, like=True, values=False): self._check_buffer() operator = 'LIKE' if like else '=' cursor = self.conn.cursor() cursor.execute(f"SELECT key FROM {self.table} WHERE key {operator} ?", (pattern,)) while True: result=cursor.fetchone() if not result or result is None: break yield result[0] cursor.close() def close(self): self._commit() try: self.conn.commit() except: pass self.conn.close()