Spaces:
Sleeping
Sleeping
import os | |
import json | |
import pandas as pd | |
from google.cloud import bigquery | |
from google.oauth2 import service_account | |
import streamlit as st | |
import requests | |
import smtplib | |
from email.mime.multipart import MIMEMultipart | |
from email.mime.text import MIMEText | |
from email.mime.base import MIMEBase | |
from email import encoders | |
from queries import queries | |
from googleapiclient.discovery import build | |
from googleapiclient.http import MediaFileUpload | |
from google.auth import exceptions | |
from google.auth import exceptions | |
from google.auth.transport.requests import Request | |
from google.auth.exceptions import DefaultCredentialsError | |
from google.auth import credentials | |
from google.auth import default | |
from google.auth import exceptions | |
from google.auth import default | |
from google.auth.transport.requests import Request | |
# Define your email server details | |
EMAIL_HOST = 'smtp.gmail.com' | |
EMAIL_PORT = 587 | |
EMAIL_HOST_USER = 'alerter.response@gmail.com' | |
EMAIL_HOST_PASSWORD = 'tmid yqlu eglt sfzv' | |
SLACK_BOT_TOKEN = 'xoxb-2151238541-7506161157329-qYCCaocyGDJwwwtOOWLY2SMR' | |
#Authenticate BigQuery | |
def authenticate_bigquery(): | |
creds = load_gcp_credentials() | |
if not creds: | |
st.error("Unable to load GCP credentials for BigQuery authentication.") | |
return None | |
return creds | |
def authenticate_bigquery_updated(): | |
gcp_credentials = load_gcp_credentials() | |
if gcp_credentials: | |
gcp_credentials = gcp_credentials.with_scopes([ | |
"https://www.googleapis.com/auth/cloud-platform", | |
"https://www.googleapis.com/auth/drive" | |
]) | |
return gcp_credentials | |
return None | |
#Load GCP credentials | |
def load_gcp_credentials(): | |
try: | |
# Retrieve GCP credentials from the environment variable | |
gcp_credentials_str = os.getenv('GCP_CREDENTIALS') | |
if not gcp_credentials_str: | |
raise ValueError("GCP_CREDENTIALS environment variable not defined") | |
# Parse the secret (assuming it's a JSON string) | |
gcp_credentials = json.loads(gcp_credentials_str) | |
# Save to a temporary file (Google Cloud uses a JSON file for authentication) | |
with open("gcp_credentials.json", "w") as f: | |
json.dump(gcp_credentials, f) | |
# Authenticate using Google Cloud SDK | |
credentials_from_file = service_account.Credentials.from_service_account_file("gcp_credentials.json") | |
# Return the credentials to be used later | |
return credentials_from_file | |
except Exception as e: | |
print(f"Error retrieving or loading GCP credentials: {str(e)}") | |
return None | |
# Upload to BQ | |
def upload_to_bigquery(df, table_id): | |
try: | |
# Load the GCP credentials from Hugging Face secret | |
bigquery_creds = load_gcp_credentials() | |
if not bigquery_creds: | |
st.error("Unable to load GCP credentials.") | |
return | |
# Initialize BigQuery client with the loaded credentials | |
client = bigquery.Client(credentials=bigquery_creds) | |
# Convert the DataFrame to a list of dictionaries | |
records = df.to_dict(orient='records') | |
# Prepare the table schema if needed (optional) | |
job_config = bigquery.LoadJobConfig( | |
write_disposition="WRITE_APPEND", # Use WRITE_TRUNCATE to overwrite, WRITE_APPEND to append | |
) | |
# Load the data to BigQuery | |
load_job = client.load_table_from_json(records, table_id, job_config=job_config) | |
load_job.result() # Wait for the job to complete | |
st.success("Data submitted") | |
except Exception as e: | |
st.error(f"An error occurred while uploading to BigQuery: {e}") | |
def preprocess_csv(file_path): | |
# Load the CSV file | |
df = pd.read_csv(file_path) | |
# Define columns to be converted | |
date_columns = ['Order_Date', 'State_Date', 'Entry_Month'] | |
if 'bag_id_cn' in df.columns: | |
df['bag_id_cn'] = df['bag_id_cn'].replace({'\..*': ''}, regex=True).astype('Int64') | |
# Convert specified columns from DD/MM/YY to 'YYYY-MM-DD 00:00:00 UTC' | |
for column in date_columns: | |
if column in df.columns: | |
df[column] = pd.to_datetime(df[column], format='%d/%m/%y', errors='coerce').dt.strftime('%Y-%m-%d 00:00:00 UTC') | |
# Save the preprocessed CSV | |
preprocessed_file_path = 'preprocessed_' + os.path.basename(file_path) | |
df.to_csv(preprocessed_file_path, index=False) | |
return preprocessed_file_path | |
# Function to read files from local path | |
def read_file(path): | |
try: | |
with open(path, 'rb') as file: | |
return file.read() | |
except Exception as e: | |
st.error(f"Failed to read file from {path}: {str(e)}") | |
return None | |
# Function to get file content type based on file extension | |
def get_content_type(file_path): | |
if file_path.lower().endswith('.pdf'): | |
return 'application/pdf' | |
elif file_path.lower().endswith('.csv'): | |
return 'text/csv' | |
elif file_path.lower().endswith('.xlsx'): | |
return 'application/vnd.openxmlformats-officedocument.spreadsheetml.sheet' | |
else: | |
return 'application/octet-stream' | |
# Encode the image to Base64 | |
def get_base64_image(image_path): | |
with open(image_path, "rb") as image_file: | |
return base64.b64encode(image_file.read()).decode() | |
def send_message_via_email(message, email_address, files, subject=None, body=None): | |
try: | |
# Set up the server | |
server = smtplib.SMTP(EMAIL_HOST, EMAIL_PORT) | |
server.starttls() | |
server.login(EMAIL_HOST_USER, EMAIL_HOST_PASSWORD) | |
# Create the email | |
msg = MIMEMultipart() | |
msg['From'] = EMAIL_HOST_USER | |
msg['To'] = email_address | |
msg['Subject'] = subject if subject else "🚨 Alerter" | |
# Attach the message body | |
msg.attach(MIMEText(body if body else message, 'plain')) | |
# Attach each file if provided | |
if files: | |
for uploaded_file in files: | |
part = MIMEBase('application', 'octet-stream') | |
part.set_payload(uploaded_file.read()) | |
encoders.encode_base64(part) | |
part.add_header('Content-Disposition', f'attachment; filename={uploaded_file.name}') | |
msg.attach(part) | |
# Send the email | |
server.sendmail(EMAIL_HOST_USER, email_address, msg.as_string()) | |
server.quit() | |
return True, "Message sent successfully" | |
except Exception as e: | |
return False, str(e) | |
def send_message_via_webhook(message, webhook_url): | |
try: | |
payload = {"text": message} | |
response = requests.post(webhook_url, json=payload) | |
if response.status_code == 200: | |
return True, "Message sent successfully" | |
else: | |
return False, f"Error {response.status_code}: {response.text}" | |
except Exception as e: | |
return False, str(e) | |
def send_file_to_slack(file, webhook_url): | |
files = { | |
'file': (file.name, file, 'application/octet-stream') # Send as binary | |
} | |
response = requests.post( | |
webhook_url, | |
files=files, | |
headers={"Content-Type": "multipart/form-data"} | |
) | |
return response | |
def send_file_to_slack_up(file, webhook_url, channel_id): | |
# Slack API URL for file upload | |
slack_api_url = 'https://slack.com/api/files.upload' | |
# Prepare headers for the API call | |
headers = { | |
'Authorization': 'Bearer ' + 'SLACK_BOT_TOKEN', # Replace with your actual Slack bot token | |
} | |
# Prepare the payload and file in binary format | |
files = { | |
'file': (file.name, file, 'application/octet-stream'), # Send the file in its original binary format | |
'channels': channel_id, # Specify the Slack channel ID where the file should be uploaded | |
} | |
# Make the POST request to upload the file to Slack | |
response = requests.post(slack_api_url, headers=headers, files=files) | |
return response | |
def check_duplicates(client): | |
"""Check for duplicates using BigQuery with the provided credentials file.""" | |
results = {} | |
bigquery_creds = authenticate_bigquery() | |
client = bigquery.Client(credentials=bigquery_creds) | |
for i, (query_name, query) in enumerate(queries.items()): | |
query_job = client.query(query) | |
df = query_job.result().to_dataframe() | |
# For debugging, write the DataFrame to the Streamlit app | |
st.write(f"{query_name}:", df) | |
button_styles = """ | |
<style> | |
div.stButton > button { | |
color: #ffffff; /* Text color */ | |
font-size: 30px; | |
background-image: linear-gradient(to right, #800000, #ff0000); /* Maroon to light red gradient */ | |
border: none; | |
padding: 10px 20px; | |
cursor: pointer; | |
border-radius: 15px; | |
display: inline-block; | |
box-shadow: 0 4px 6px rgba(0, 0, 0, 0.1), 0 8px 15px rgba(0, 0, 0, 0.1); /* Box shadow */ | |
transition: all 0.3s ease; /* Smooth transition on hover */ | |
} | |
div.stButton > button:hover { | |
background-color: #00ff00; /* Hover background color */ | |
color: #ff0000; /* Hover text color */ | |
box-shadow: 0 6px 10px rgba(0, 0, 0, 0.2), 0 12px 20px rgba(0, 0, 0, 0.2); /* Box shadow on hover */ | |
} | |
</style> | |
""" | |
st.markdown(button_styles, unsafe_allow_html=True) | |
if st.button(f"Copy Query", key=f"copy_query_{i}"): | |
pyperclip.copy(query) | |
st.success('Query copied to clipboard!') | |
if not df.empty: | |
duplicate_count = len(df) | |
results[query_name] = duplicate_count | |
return results | |
def upload_to_drive(file_path, folder_id): | |
try: | |
# Authenticate with Google Drive using Hugging Face secrets | |
creds = authenticate_google_drive() | |
if not creds: | |
return | |
# Build the Google Drive service | |
service = build('drive', 'v3', credentials=creds) | |
# Define the file metadata | |
file_metadata = {'name': os.path.basename(file_path), 'parents': [folder_id]} | |
# Determine MIME type based on file extension | |
mime_type = 'application/vnd.ms-excel' if file_path.endswith('.xlsx') else 'text/csv' | |
media = MediaFileUpload(file_path, mimetype=mime_type) | |
# Upload the file to Google Drive | |
file = service.files().create(body=file_metadata, media_body=media, fields='id').execute() | |
st.write("") | |
except Exception as e: | |
st.error(f"An error occurred: {e}") | |
st.error("Ensure the folder ID is correct and the service account has permission to access the folder.") | |
def authenticate_google_drive(): | |
creds = load_gcp_credentials() | |
if not creds: | |
st.error("Unable to load GCP credentials for Google Drive authentication.") | |
return None | |
return creds | |
def get_oauth_token(): | |
try: | |
# Define the required scopes for Dataform | |
required_scopes = [ | |
"https://www.googleapis.com/auth/cloud-platform", # General GCP access | |
"https://www.googleapis.com/auth/bigquery", # BigQuery access | |
"https://www.googleapis.com/auth/dataform" | |
] | |
# Load the credentials with the specified scopes | |
creds, _ = default(scopes=required_scopes) | |
if creds is None: | |
raise exceptions.DefaultCredentialsError("No valid credentials found.") | |
# Refresh the credentials to get the latest access token | |
creds.refresh(Request()) | |
return creds.token | |
except exceptions.GoogleAuthError as e: | |
print(f"Authentication error: {e}") | |
return None | |
def get_task_logs(task_name="Tally_backup_Ninad"): | |
logs = eventlog.read('Microsoft-Windows-TaskScheduler/Operational', limit=100) | |
task_logs = [] | |
for log in logs: | |
if task_name in log['Message']: | |
task_logs.append(log['Message']) | |
return task_logs |