sleep_data / analysis_db.py
thelou1s's picture
test app.py with db ok
9492e32
raw history blame
No virus
4.05 kB
import sqlite3
import sys
from util.debug import debug_print
from draw_db import draw_lists, min_max_list, up_down_list
from util.date_util import format_date
def get_filename(file_obj):
return file_obj.name
def read_db(db_file):
db_name = get_filename(db_file)
debug_print('read_db, db_name = ', db_name)
return read_section(db_name, 'tb_section', '')
def read_section(db_name, tb_name='tb_section', section_id=''):
res_fig = None
debug_print('read_section, db_name = ', db_name, ', tb_name = ', tb_name, ', section_id = ', section_id)
conn = sqlite3.connect(db_name)
if not tb_name:
table_cursor = conn.cursor()
table_cursor.execute(' SELECT name FROM sqlite_master WHERE type=\'table\'; ')
table_names = table_cursor.fetchall()
print('table_names = ', str(table_names))
return
if not section_id:
section_cursor = conn.execute(' SELECT section_id, section_date, section_end_date FROM ' + tb_name)
for section_row in section_cursor:
section_id = section_row[0]
section_date = section_row[1]
section_end_date = section_row[2]
print('section_id = ', section_id, ', section_date = ', format_date(section_date), ', section_end_date = ',
format_date(section_end_date))
res_fig = read_sample(conn, section_id, tb_name)
else:
res_fig = read_sample(conn, section_id, tb_name)
conn.close()
return res_fig
def read_sample(conn, section_id, tb_name):
sql_str = ' SELECT section_id, _sample_id, _sample_end_id, section_date, section_end_date FROM ' + str(
tb_name) + ' WHERE section_id == ' + str(section_id)
section_cursor = conn.execute(sql_str)
for section_row in section_cursor:
section_id = section_row[0]
_sample_id = section_row[1]
_sample_end_id = section_row[2]
section_date = section_row[3]
section_end_date = section_row[4]
debug_print('section_id = ', section_id, ', _sample_id = ', _sample_id, ', _sample_end_id = ',
_sample_end_id)
date_list = []
max_list = []
min_list = []
acc_list = []
light_list = []
screen_list = []
sample_sql = "SELECT _id, date, max, min, acc_max_dx, acc_max_dy, acc_max_dz, avg, max_snoring FROM sample_table WHERE _id >= " + str(
_sample_id) + ' AND ' + ' _id <= ' + str(_sample_end_id)
sample_cursor = conn.execute(sample_sql)
for sample_row in sample_cursor:
_id = sample_row[0]
date = sample_row[1]
_max = sample_row[2]
_min = sample_row[3]
acc_max = sample_row[4] + sample_row[5] + sample_row[6]
light = sample_row[7]
screen = sample_row[8]
debug_print('_id = ', _id, 'date = ', format_date(date))
date_list.append(format_date(date))
max_list.append(_max)
min_list.append(_min)
acc_list.append(acc_max)
light_list.append(light)
screen_list.append(screen)
title_str = str(section_id) + ', ' + str(format_date(section_date)) + ', ' + str(format_date(section_end_date))
if len(light_list) > 0:
light_min = min(light_list)
light_max = max(light_list)
screen_list = min_max_list(screen_list, light_min, light_max)
screen_list = up_down_list(screen_list)
return draw_lists(title_str, date_list, max=max_list, min=min_list, acc=acc_list, screen=screen_list,
light=light_list)
if __name__ == '__main__':
argv = sys.argv[1:]
argc = len(sys.argv[1:])
# print('sys.argv[1:] = ', argv, ', ', str(len(argv)))
if argc < 1:
print('USAGE: python analysis_db.py xxx.db')
exit(1)
db_uri = argv[0] if argc >= 1 else ''
tb_name = argv[1] if argc >= 2 else ''
section_id = argv[2] if argc >= 3 else ''
read_section(db_uri, tb_name, section_id)