import os import json import pandas as pd from google.cloud import bigquery from google.oauth2 import service_account import streamlit as st import requests import smtplib from email.mime.multipart import MIMEMultipart from email.mime.text import MIMEText from email.mime.base import MIMEBase from email import encoders from queries import queries from googleapiclient.discovery import build from googleapiclient.http import MediaFileUpload from google.auth import exceptions from google.auth import exceptions from google.auth.transport.requests import Request from google.auth.exceptions import DefaultCredentialsError from google.auth import credentials from google.auth import default from google.auth import exceptions from google.auth import default from google.auth.transport.requests import Request # Define your email server details EMAIL_HOST = 'smtp.gmail.com' EMAIL_PORT = 587 EMAIL_HOST_USER = 'alerter.response@gmail.com' EMAIL_HOST_PASSWORD = 'tmid yqlu eglt sfzv' SLACK_BOT_TOKEN = 'xoxb-2151238541-7506161157329-qYCCaocyGDJwwwtOOWLY2SMR' #Authenticate BigQuery def authenticate_bigquery(): creds = load_gcp_credentials() if not creds: st.error("Unable to load GCP credentials for BigQuery authentication.") return None return creds def authenticate_bigquery_updated(): gcp_credentials = load_gcp_credentials() if gcp_credentials: gcp_credentials = gcp_credentials.with_scopes([ "https://www.googleapis.com/auth/cloud-platform", "https://www.googleapis.com/auth/drive" ]) return gcp_credentials return None #Load GCP credentials def load_gcp_credentials(): try: # Retrieve GCP credentials from the environment variable gcp_credentials_str = os.getenv('GCP_CREDENTIALS') if not gcp_credentials_str: raise ValueError("GCP_CREDENTIALS environment variable not defined") # Parse the secret (assuming it's a JSON string) gcp_credentials = json.loads(gcp_credentials_str) # Save to a temporary file (Google Cloud uses a JSON file for authentication) with open("gcp_credentials.json", "w") as f: json.dump(gcp_credentials, f) # Authenticate using Google Cloud SDK credentials_from_file = service_account.Credentials.from_service_account_file("gcp_credentials.json") # Return the credentials to be used later return credentials_from_file except Exception as e: print(f"Error retrieving or loading GCP credentials: {str(e)}") return None # Upload to BQ def upload_to_bigquery(df, table_id): try: # Load the GCP credentials from Hugging Face secret bigquery_creds = load_gcp_credentials() if not bigquery_creds: st.error("Unable to load GCP credentials.") return # Initialize BigQuery client with the loaded credentials client = bigquery.Client(credentials=bigquery_creds) # Convert the DataFrame to a list of dictionaries records = df.to_dict(orient='records') # Prepare the table schema if needed (optional) job_config = bigquery.LoadJobConfig( write_disposition="WRITE_APPEND", # Use WRITE_TRUNCATE to overwrite, WRITE_APPEND to append ) # Load the data to BigQuery load_job = client.load_table_from_json(records, table_id, job_config=job_config) load_job.result() # Wait for the job to complete st.success("Data submitted") except Exception as e: st.error(f"An error occurred while uploading to BigQuery: {e}") def preprocess_csv(file_path): # Load the CSV file df = pd.read_csv(file_path) # Define columns to be converted date_columns = ['Order_Date', 'State_Date', 'Entry_Month'] if 'bag_id_cn' in df.columns: df['bag_id_cn'] = df['bag_id_cn'].replace({'\..*': ''}, regex=True).astype('Int64') # Convert specified columns from DD/MM/YY to 'YYYY-MM-DD 00:00:00 UTC' for column in date_columns: if column in df.columns: df[column] = pd.to_datetime(df[column], format='%d/%m/%y', errors='coerce').dt.strftime('%Y-%m-%d 00:00:00 UTC') # Save the preprocessed CSV preprocessed_file_path = 'preprocessed_' + os.path.basename(file_path) df.to_csv(preprocessed_file_path, index=False) return preprocessed_file_path # Function to read files from local path def read_file(path): try: with open(path, 'rb') as file: return file.read() except Exception as e: st.error(f"Failed to read file from {path}: {str(e)}") return None # Function to get file content type based on file extension def get_content_type(file_path): if file_path.lower().endswith('.pdf'): return 'application/pdf' elif file_path.lower().endswith('.csv'): return 'text/csv' elif file_path.lower().endswith('.xlsx'): return 'application/vnd.openxmlformats-officedocument.spreadsheetml.sheet' else: return 'application/octet-stream' # Encode the image to Base64 def get_base64_image(image_path): with open(image_path, "rb") as image_file: return base64.b64encode(image_file.read()).decode() def send_message_via_email(message, email_address, files, subject=None, body=None): try: # Set up the server server = smtplib.SMTP(EMAIL_HOST, EMAIL_PORT) server.starttls() server.login(EMAIL_HOST_USER, EMAIL_HOST_PASSWORD) # Create the email msg = MIMEMultipart() msg['From'] = EMAIL_HOST_USER msg['To'] = email_address msg['Subject'] = subject if subject else "🚨 Alerter" # Attach the message body msg.attach(MIMEText(body if body else message, 'plain')) # Attach each file if provided if files: for uploaded_file in files: part = MIMEBase('application', 'octet-stream') part.set_payload(uploaded_file.read()) encoders.encode_base64(part) part.add_header('Content-Disposition', f'attachment; filename={uploaded_file.name}') msg.attach(part) # Send the email server.sendmail(EMAIL_HOST_USER, email_address, msg.as_string()) server.quit() return True, "Message sent successfully" except Exception as e: return False, str(e) def send_message_via_webhook(message, webhook_url): try: payload = {"text": message} response = requests.post(webhook_url, json=payload) if response.status_code == 200: return True, "Message sent successfully" else: return False, f"Error {response.status_code}: {response.text}" except Exception as e: return False, str(e) def send_file_to_slack(file, webhook_url): files = { 'file': (file.name, file, 'application/octet-stream') # Send as binary } response = requests.post( webhook_url, files=files, headers={"Content-Type": "multipart/form-data"} ) return response def send_file_to_slack_up(file, webhook_url, channel_id): # Slack API URL for file upload slack_api_url = 'https://slack.com/api/files.upload' # Prepare headers for the API call headers = { 'Authorization': 'Bearer ' + 'SLACK_BOT_TOKEN', # Replace with your actual Slack bot token } # Prepare the payload and file in binary format files = { 'file': (file.name, file, 'application/octet-stream'), # Send the file in its original binary format 'channels': channel_id, # Specify the Slack channel ID where the file should be uploaded } # Make the POST request to upload the file to Slack response = requests.post(slack_api_url, headers=headers, files=files) return response def check_duplicates(client): """Check for duplicates using BigQuery with the provided credentials file.""" results = {} bigquery_creds = authenticate_bigquery() client = bigquery.Client(credentials=bigquery_creds) for i, (query_name, query) in enumerate(queries.items()): query_job = client.query(query) df = query_job.result().to_dataframe() # For debugging, write the DataFrame to the Streamlit app st.write(f"{query_name}:", df) button_styles = """ """ st.markdown(button_styles, unsafe_allow_html=True) if st.button(f"Copy Query", key=f"copy_query_{i}"): pyperclip.copy(query) st.success('Query copied to clipboard!') if not df.empty: duplicate_count = len(df) results[query_name] = duplicate_count return results def upload_to_drive(file_path, folder_id): try: # Authenticate with Google Drive using Hugging Face secrets creds = authenticate_google_drive() if not creds: return # Build the Google Drive service service = build('drive', 'v3', credentials=creds) # Define the file metadata file_metadata = {'name': os.path.basename(file_path), 'parents': [folder_id]} # Determine MIME type based on file extension mime_type = 'application/vnd.ms-excel' if file_path.endswith('.xlsx') else 'text/csv' media = MediaFileUpload(file_path, mimetype=mime_type) # Upload the file to Google Drive file = service.files().create(body=file_metadata, media_body=media, fields='id').execute() st.write("") except Exception as e: st.error(f"An error occurred: {e}") st.error("Ensure the folder ID is correct and the service account has permission to access the folder.") def authenticate_google_drive(): creds = load_gcp_credentials() if not creds: st.error("Unable to load GCP credentials for Google Drive authentication.") return None return creds def get_oauth_token(): try: # Define the required scopes for Dataform required_scopes = [ "https://www.googleapis.com/auth/cloud-platform", # General GCP access "https://www.googleapis.com/auth/bigquery", # BigQuery access "https://www.googleapis.com/auth/dataform" ] # Load the credentials with the specified scopes creds, _ = default(scopes=required_scopes) if creds is None: raise exceptions.DefaultCredentialsError("No valid credentials found.") # Refresh the credentials to get the latest access token creds.refresh(Request()) return creds.token except exceptions.GoogleAuthError as e: print(f"Authentication error: {e}") return None def get_task_logs(task_name="Tally_backup_Ninad"): logs = eventlog.read('Microsoft-Windows-TaskScheduler/Operational', limit=100) task_logs = [] for log in logs: if task_name in log['Message']: task_logs.append(log['Message']) return task_logs