auto-draft / worker.py
shaocongma
fix bug.
78f4b41
'''
This script is only used for service-side host.
'''
import boto3
import os, time
from wrapper import generator_wrapper
from sqlalchemy import create_engine, Table, MetaData, update, select
from sqlalchemy.orm import sessionmaker
from sqlalchemy import inspect
QUEUE_URL = os.getenv('QUEUE_URL')
AWS_ACCESS_KEY_ID = os.getenv('AWS_ACCESS_KEY_ID')
AWS_SECRET_ACCESS_KEY = os.getenv('AWS_SECRET_ACCESS_KEY')
BUCKET_NAME = os.getenv('BUCKET_NAME')
DB_STRING = os.getenv('DATABASE_STRING')
# Create engine
ENGINE = create_engine(DB_STRING)
SESSION = sessionmaker(bind=ENGINE)
#######################################################################################################################
# Amazon SQS Handler
#######################################################################################################################
def get_sqs_client():
sqs = boto3.client('sqs', region_name="us-east-2",
aws_access_key_id=AWS_ACCESS_KEY_ID,
aws_secret_access_key=AWS_SECRET_ACCESS_KEY)
return sqs
def receive_message():
sqs = get_sqs_client()
message = sqs.receive_message(QueueUrl=QUEUE_URL)
if message.get('Messages') is not None:
receipt_handle = message['Messages'][0]['ReceiptHandle']
else:
receipt_handle = None
return message, receipt_handle
def delete_message(receipt_handle):
sqs = get_sqs_client()
response = sqs.delete_message(QueueUrl=QUEUE_URL, ReceiptHandle=receipt_handle)
return response
#######################################################################################################################
# AWS S3 Handler
#######################################################################################################################
def get_s3_client():
access_key_id = os.getenv('AWS_ACCESS_KEY_ID')
secret_access_key = os.getenv('AWS_SECRET_ACCESS_KEY')
session = boto3.Session(
aws_access_key_id=access_key_id,
aws_secret_access_key=secret_access_key,
)
s3 = session.resource('s3')
bucket = s3.Bucket(BUCKET_NAME)
return s3, bucket
def upload_file(file_name, target_name=None):
s3, _ = get_s3_client()
if target_name is None:
target_name = file_name
s3.meta.client.upload_file(Filename=file_name, Bucket=BUCKET_NAME, Key=target_name)
print(f"The file {file_name} has been uploaded!")
def download_file(file_name):
""" Download `file_name` from the bucket.
Bucket (str) – The name of the bucket to download from.
Key (str) – The name of the key to download from.
Filename (str) – The path to the file to download to.
"""
s3, _ = get_s3_client()
s3.meta.client.download_file(Bucket=BUCKET_NAME, Key=file_name, Filename=os.path.basename(file_name))
print(f"The file {file_name} has been downloaded!")
#######################################################################################################################
# AWS SQL Handler
#######################################################################################################################
def modify_status(task_id, new_status):
session = SESSION()
metadata = MetaData()
task_to_update = task_id
task_table = Table('task', metadata, autoload_with=ENGINE)
stmt = select(task_table).where(task_table.c.task_id == task_to_update)
# Execute the statement
with ENGINE.connect() as connection:
result = connection.execute(stmt)
# Fetch the first result (if exists)
task_data = result.fetchone()
# If user_data is not None, the user exists and we can update the password
if task_data:
# Update statement
stmt = (
update(task_table).
where(task_table.c.task_id == task_to_update).
values(status=new_status)
)
# Execute the statement and commit
result = connection.execute(stmt)
connection.commit()
# Close the session
session.close()
#######################################################################################################################
# Pipline
#######################################################################################################################
def pipeline(message_count=0, query_interval=10):
# status: 0 - pending (default), 1 - running, 2 - completed, 3 - failed
# Query a message from SQS
msg, handle = receive_message()
if handle is None:
print("No message in SQS. ")
time.sleep(query_interval)
else:
print("===============================================================================================")
print(f"MESSAGE COUNT: {message_count}")
print("===============================================================================================")
config_s3_path = msg['Messages'][0]['Body']
config_s3_dir = os.path.dirname(config_s3_path)
config_local_path = os.path.basename(config_s3_path)
task_id, _ = os.path.splitext(config_local_path)
print("Initializing ...")
print("Configuration file on S3: ", config_s3_path)
print("Configuration file on S3 (Directory): ", config_s3_dir)
print("Local file path: ", config_local_path)
print("Task id: ", task_id)
print(f"Success in receiving message: {msg}")
print(f"Configuration file path: {config_s3_path}")
# Process the downloaded configuration file
download_file(config_s3_path)
modify_status(task_id, 1) # status: 0 - pending (default), 1 - running, 2 - completed, 3 - failed
delete_message(handle)
print(f"Success in the initialization. Message deleted.")
print("Running ...")
# try:
zip_path = generator_wrapper(config_local_path)
# Upload the generated file to S3
upload_to = os.path.join(config_s3_dir, zip_path).replace("\\", "/")
print("Local file path (ZIP): ", zip_path)
print("Upload to S3: ", upload_to)
upload_file(zip_path, upload_to)
modify_status(task_id, 2) # status: 0 - pending (default), 1 - running, 2 - completed, 3 - failed, 4 - deleted
print(f"Success in generating the paper.")
# Complete.
print("Task completed.")
def initialize_everything():
# Clear S3
# Clear SQS
pass
if __name__ == "__main__":
pipeline()