Images_Prompts_Generator / fileDict3.py
lovelyai999's picture
Upload 7 files
e8e1967 verified
import sqlite3
import time
import atexit
class FileDict:
def __init__(self, file_path, buffer_size=100000, buffer_idle_time=5, table='filedict'):
self.file_path = file_path
self.table = table
self.conn = sqlite3.connect(file_path,check_same_thread=False)
self.conn.execute('CREATE TABLE IF NOT EXISTS {} (key TEXT PRIMARY KEY, value TEXT)'.format(self.table))
self.buffer = []
self.buffer_size = buffer_size
self.last_commit_time = time.time()
self.buffer_idle_time = buffer_idle_time
atexit.register(self.close)
def get(self, key):
try:return self.__getitem__(key)
except KeyError: return None
def __getitem__(self, key):
self._check_buffer()
cursor = self.conn.execute('SELECT value FROM {} WHERE key = ?'.format(self.table), (key,))
result = cursor.fetchone()
if result is None:
raise KeyError(key)
return result[0]
def Tables(self):
cursor = self.conn.execute("SELECT name FROM sqlite_master WHERE type='table';")
tables = cursor.fetchall()
table_names = [t[0] for t in tables]
return table_names
def __setitem__(self, key, value):
try:
self.check_key(key)
self.buffer.append(('set', key, value))
self._check_buffer()
except sqlite3.IntegrityError:
self.buffer.append(('update', key, value))
self._check_buffer()
def __delitem__(self, key):
self.buffer.append(('del', key))
self._check_buffer()
def __iter__(self):
self._check_buffer()
cursor = self.conn.execute('SELECT key FROM {}'.format(self.table))
while True:
result=cursor.fetchone()
if not result or result is None: break
yield result[0]
cursor.close()
#raise StopIteration
def items(self):
self._check_buffer()
cursor = self.conn.execute('SELECT key, value FROM {}'.format(self.table))
while True:
result=cursor.fetchone()
if not result or result is None: break
yield result
cursor.close()
#raise StopIteration
#return cursor.fetchall()
def from_dict(self, dict):
self.check_dict(dict)
self.conn.execute('DROP TABLE IF EXISTS {}'.format(self.table))
self.conn.execute('CREATE TABLE {} (key TEXT PRIMARY KEY, value TEXT)'.format(self.table))
self.conn.executemany('INSERT INTO {} (key, value) VALUES (?, ?)'.format(self.table), dict.items())
self.conn.commit()
def add_items(self, items):
for key, value in items.items():
try:
self.check_key(key)
self.buffer.append(('set', key, value))
self._check_buffer()
except sqlite3.IntegrityError:
self.buffer.append(('update', key, value))
self._check_buffer()
self._check_buffer()
def _check_buffer(self):
if not self.buffer:
return
idle_time = time.time() - self.last_commit_time
if len(self.buffer) >= self.buffer_size or idle_time >= self.buffer_idle_time:
self._commit()
def _commit(self):
if not self.buffer:
return
cursor = self.conn.cursor()
for op in self.buffer:
if op[0] == 'set':
cursor.execute('INSERT OR REPLACE INTO {} (key, value) VALUES (?, ?)'.format(self.table), (op[1], op[2]))
elif op[0] == 'update':
cursor.execute('UPDATE {} SET value = ? WHERE key = ?'.format(self.table), (op[2], op[1]))
elif op[0] == 'del':
cursor.execute('DELETE FROM {} WHERE key = ?'.format(self.table), (op[1],))
self.buffer = []
self.last_commit_time = time.time()
self.conn.commit()
def check_dict(self, dictionary):
for key in dictionary:
self.check_key(key)
def check_key(self, key):
if not isinstance(key, str):
raise TypeError('Keys must be strings.')
if not key:
raise ValueError('Keys cannot be empty strings.')
def search_keys(self, pattern, like=True, values=False):
self._check_buffer()
operator = 'LIKE' if like else '='
cursor = self.conn.cursor()
cursor.execute(f"SELECT key FROM {self.table} WHERE key {operator} ?", (pattern,))
while True:
result=cursor.fetchone()
if not result or result is None: break
yield result[0]
cursor.close()
def close(self):
self._commit()
try:
self.conn.commit()
except:
pass
self.conn.close()