import paramiko import pymysql from sshtunnel import SSHTunnelForwarder # SSH settings ssh_host = '129.159.146.88' ssh_user = 'ubuntu' ssh_key_path = 'C:/Users/kerts/OneDrive/Documents/Keys/Ubuntu_Oracle/ssh-key-2023-02-12.key' # MySQL settings mysql_host = 'localhost' # because we will connect through the SSH tunnel mysql_port = 3306 # the default MySQL port mysql_user = 'root' mysql_password = 'naP2tion' mysql_db = 'warbot' ssh_port = 22 dbTableName = 'conversations' # messages data table class DataBase(): # manages mySQL connection and send-receive def __init__(db, ssh_key_path = ssh_key_path, ssh_host=ssh_host, ssh_port=ssh_port, ssh_user=ssh_user, mysql_host=mysql_host,mysql_port=mysql_port,mysql_user=mysql_user,mysql_password=mysql_password,mysql_db=mysql_db): # create an SSH client and load the private key db.ssh_client = paramiko.SSHClient() db.ssh_client.load_system_host_keys() db.ssh_client.set_missing_host_key_policy(paramiko.AutoAddPolicy()) db.private_key = paramiko.RSAKey.from_private_key_file(ssh_key_path) db.ssh_key_path = ssh_key_path db.ssh_host=ssh_host db.ssh_port=ssh_port db.ssh_user=ssh_user db.mysql_host=mysql_host db.mysql_port=mysql_port db.mysql_user=mysql_user db.mysql_password=mysql_password db.mysql_db=mysql_db def getmessages(db,username = ""): # get the entire conversation with the specific user with SSHTunnelForwarder( (db.ssh_host, db.ssh_port), ssh_username=db.ssh_user, ssh_pkey=db.private_key, remote_bind_address=(db.mysql_host, db.mysql_port)) as tunnel: # connect to the MySQL server through the SSH tunnel mysql_conn = pymysql.connect( host='localhost', # will be due to the SSH tunneling issue port=tunnel.local_bind_port, user=db.mysql_user, password=db.mysql_password, db=db.mysql_db ) # send a query to the MySQL database and print the response table with mysql_conn.cursor() as cursor: query = f'SELECT * FROM {dbTableName} WHERE username = "{username}";' cursor.execute(query) rows = cursor.fetchall() messages = [(message[2],message[3]) for message in rows] # close the MySQL connection mysql_conn.close() # close the SSH client db.ssh_client.close() return messages def setmessages(db,username, message_text="", bot_reply=""): # Adding the record into the database with SSHTunnelForwarder( (db.ssh_host, db.ssh_port), ssh_username=db.ssh_user, ssh_pkey=db.private_key, remote_bind_address=(db.mysql_host, db.mysql_port)) as tunnel: # connect to the MySQL server through the SSH tunnel mysql_conn = pymysql.connect( host='localhost', # will be due to the SSH tunneling issue port=tunnel.local_bind_port, user=db.mysql_user, password=db.mysql_password, db=db.mysql_db ) # send a query to the MySQL database and print the response table with mysql_conn.cursor() as cursor: query = f"INSERT INTO {dbTableName} (username,message_text,bot_reply) " \ f"VALUES ('{username}','{message_text}','{bot_reply}');" cursor.execute(query) mysql_conn.commit() # close the MySQL connection mysql_conn.close() # close the SSH client db.ssh_client.close() def cleanup(db, username = "", remaining_messages = 3): # Cleanup the records, except the last N rows with SSHTunnelForwarder( (db.ssh_host, db.ssh_port), ssh_username=db.ssh_user, ssh_pkey=db.private_key, remote_bind_address=(db.mysql_host, db.mysql_port)) as tunnel: # connect to the MySQL server through the SSH tunnel mysql_conn = pymysql.connect( host='localhost', # will be due to the SSH tunneling issue port=tunnel.local_bind_port, user=db.mysql_user, password=db.mysql_password, db=db.mysql_db ) # send a query to the MySQL database to delete the records except the last ones with mysql_conn.cursor() as cursor: query = f"DELETE FROM {dbTableName} WHERE username = '{username}' AND id NOT IN " \ f"(SELECT id FROM (SELECT id FROM (SELECT id FROM {dbTableName} " \ f"WHERE username = '{username}' ORDER BY id DESC LIMIT {remaining_messages}) " \ f"subquery) subsubquery)" cursor.execute(query) mysql_conn.commit() # close the MySQL connection mysql_conn.close() # close the SSH client db.ssh_client.close() if __name__ == '__main__': # This is for testing purpose only: username = 'user1' db = DataBase() db.setmessages(username='user2',message_text='some message',bot_reply='some reply') #db.cleanup(username='user2',remaining_messages=1) messages = db.getmessages(username) print(messages)