weatherbot / app.py
datanerdke's picture
Upload app.py
4eddba6 verified
# Import Packages
from google.cloud import bigquery
import numpy as np
import os
import time
import datetime
import streamlit as st
import requests
import pandas as pd
import xml.etree.ElementTree as ET
import pyrebase
# Set Page Layout
st.set_page_config(page_title='Weather Data in Kenya',layout='wide')
# Set Padding
st.markdown('<style>div.block-container{padding-top:2.0rem}</style>',unsafe_allow_html=True)
# Set Page Header
title = """
<style>
.header{
font-size : 2.5rem;
font-family : sana-serif;
}
</style>
<b><center class='header'>🌀️ Open Weather MapπŸ‡°πŸ‡ͺ</center></b>
"""
st.markdown(title,unsafe_allow_html=True)
# Firebase initialization
config = {
'apiKey': "AIzaSyAQHikJeQ7Sru-Gdy9K3YNjz0adnmTSvuQ",
'authDomain': "disaster-tweets-manageme-72580.firebaseapp.com",
'databaseURL': "https://disaster-tweets-manageme-72580-default-rtdb.firebaseio.com",
'projectId': "disaster-tweets-manageme-72580",
'storageBucket': "disaster-tweets-manageme-72580.appspot.com"
}
firebase = pyrebase.initialize_app(config)
auth = firebase.auth()
# Initialize user as None
if 'user' not in st.session_state:
st.session_state.user = None
# User registration and sign-in
if st.session_state.user is None:
tab1, tab2 = st.tabs(['Sign In','Reset Password'])
with tab1:
email = st.text_input("Sign In Email")
password = st.text_input("Sign In Password", type="password")
if st.button("Sign In"):
try:
user = auth.sign_in_with_email_and_password(email, password)
if user:
st.session_state.user = user
st.success(f"Successfully signed in with email: {email}")
st.rerun() # Refresh the app to show the new section
else:
st.warning("Your email is not verified. Please check your email for a verification link.")
except Exception as e:
error_message = str(e)
if "INVALID_EMAIL" in error_message:
st.error("Invalid email. Please enter a valid email address.")
else:
st.error("Invalid password. Please check your password and try again.")
with tab2:
st.subheader("Password Reset")
reset_email = st.text_input("Email to reset password")
if st.button("Reset Password"):
try:
auth.send_password_reset_email(reset_email)
st.success("Password reset email sent. Please check your email for instructions.")
except Exception as e:
st.error(f"Error: {e}")
else:
st.success(f"Logged in as: {st.session_state.user['email']}! πŸŽ‰")
# define tabs for easy accessibility
tab1, tab2, tab3 = st.tabs(['Home','Get Data','View Data'])
with tab1:
header = """
<style>
.subheader{
font-size: 1.5rem;
font-family : sana-serif;
}
</style>
<h3 class='subheader'><b>Welcome to Open Weather Map Data Collection Site for Counties in Kenya.</b></h3>
"""
st.markdown(header,unsafe_allow_html=True)
st.markdown('This Streamlit app collects weather data from various locations across Kenya using the OpenWeatherMap API.')
st.markdown("""
<b>Data Collection Steps;</b>\n
- Fetch weather data from multiple locations in Kenya.
- Append collected data to a BigQuery database for further analysis.
""", unsafe_allow_html=True)
# Initialize Environment Variable
os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = 'adrianjuliusaluoch.json'
# Initialize Client Variable
client = bigquery.Client()
# Function to extract text from XML element safely
def get_xml_text(parent, tag, attrib=None):
element = parent.find(tag)
if element is not None:
if attrib:
return element.get(attrib)
return element.text
return None
# create function to retrieve data from the api
def get_weather():
def get_weather(api_key, location):
base_url = "http://api.openweathermap.org/data/2.5/weather"
params = {
'q': location,
'appid': api_key,
'units': 'metric', # You can use 'imperial' for Fahrenheit
}
try:
response = requests.get(base_url, params=params)
data = response.json()
# Check if the request was successful
if response.status_code == 200:
# Extract additional features from XML
xml_url = f'http://api.openweathermap.org/data/2.5/weather?q={location}&mode=xml&appid={api_key}'
xml_response = requests.get(xml_url)
xml_root = ET.fromstring(xml_response.content)
# Create a DataFrame with the relevant weather information
weather_data = {
'City': [location],
'Time_of_Data_Calculation': [pd.to_datetime(data['dt'], unit='s', utc=True)],
'Latitude': [data['coord']['lat']],
'Longitude': [data['coord']['lon']],
'Weather_ID': [data['weather'][0]['id']],
'Weather_Main': [data['weather'][0]['main']],
'Weather_Description': [data['weather'][0]['description']],
'Temperature': [data['main']['temp']],
'Feels_Like': [data['main']['feels_like']],
'Temp_Min': [data['main']['temp_min']],
'Temp_Max': [data['main']['temp_max']],
'Pressure': [data['main']['pressure']],
'Humidity': [data['main']['humidity']],
'Sea_Level': [data['main']['sea_level']] if 'sea_level' in data['main'] else None,
'Ground_Level': [data['main']['grnd_level']] if 'grnd_level' in data['main'] else None,
'Visibility': [data['visibility']],
'Wind_Speed': [data['wind']['speed']],
'Wind_Degree': [data['wind']['deg']],
'Wind_Gust': [data['wind']['gust']] if 'gust' in data['wind'] else None,
'Cloudiness': [data['clouds']['all']],
'Cloudiness_Name': [get_xml_text(xml_root, 'clouds', 'name')],
'Rain_1h': [data['rain']['1h']] if 'rain' in data and '1h' in data['rain'] else None,
'Rain_3h': [data['rain']['3h']] if 'rain' in data and '3h' in data['rain'] else None,
'Snow_1h': [data['snow']['1h']] if 'snow' in data and '1h' in data['snow'] else None,
'Snow_3h': [data['snow']['3h']] if 'snow' in data and '3h' in data['snow'] else None,
'Country_Code': [data['sys']['country']],
'Sunrise_Time': [pd.to_datetime(data['sys']['sunrise'], unit='s', utc=True)],
'Sunset_Time': [pd.to_datetime(data['sys']['sunset'], unit='s', utc=True)],
'Timezone': [data['timezone']],
'City_ID': [data['id']],
'City_Name': [data['name']]
}
df = pd.DataFrame(weather_data)
return df
else:
print(f"{location} not found in the OpenWeatherMap DataBase.")
return None
except Exception as e:
print(f"An error occurred: {str(e)}")
return None
if __name__ == "__main__":
# Replace 'YOUR_API_KEY' with your actual OpenWeatherMap API key
api_key = '30bc8c5f44c2f641d15a7f617af532c0'
# List of locations (cities or counties) for which you want to get weather data
locations = [
'Baringo', 'Bomet', 'Bungoma', 'Busia', 'Mandeni, KE', 'Embu, KE', 'Garissa', 'Homa Bay', 'Isiolo', 'Kajiado',
'Kakamega', 'Kericho', 'Kiambu', 'Kilifi', 'Kerugoya', 'Kisii', 'Kisumu', 'Kitui', 'Kwale, KE', 'Nanyuki',
'Lamu', 'Machakos', 'Makueni', 'Mandera', 'Marsabit', 'Meru', 'Migori', 'Mombasa', "Murang'a", 'Nairobi',
'Nakuru', 'Nandi, KE', 'Narok', 'Nyamira', 'Oljoro Orok', 'Nyeri', 'Maralal', 'Siaya, KE', 'Taveta',
'Chogoria', 'Kitale', 'Lodwar', 'Eldoret', 'Vihiga', 'Wajir', 'Kapenguria']
# Create an empty DataFrame to store the results
all_weather_data = pd.DataFrame()
# Loop through the list of locations and concatenate DataFrames
for location in locations:
weather_data = get_weather(api_key, location)
if weather_data is not None:
all_weather_data = pd.concat([all_weather_data, weather_data], ignore_index=True)
# Print the final DataFrame
st.write(f'Data Collection Successful for all {len(all_weather_data)} Counties in Kenya.')
all_weather_data.to_parquet('weather_data.gzip',compression='gzip',index=False)
st.dataframe(all_weather_data)
# Load data into cloud database
def append_data():
dataframe = pd.read_parquet('weather_data.gzip')
table_id = 'project-adrian-julius-aluoch.central_database.openweathermap'
job = client.load_table_from_dataframe(dataframe,table_id)
while job.state != 'DONE':
time.sleep(2)
job.reload()
st.write(job.state)
with tab2:
st.markdown("""
This section allows you to retrieve weather data from various locations across Kenya using the OpenWeatherMap API.\n
To get started, simply click the "Get Data" button below.\n
The app will collect real-time weather data for all specified locations and export the data to Google Cloud BigQuery Database for Storage.
""", unsafe_allow_html=True)
# add widgets
if st.button('Get Data from API'):
with st.spinner('In progress....'):
get_weather()
append_data()
st.success('Open Weather Data Successfully Exported to Google Cloud BigQuery', icon="βœ…")
with tab3:
st.markdown("""
**Data Viewing and Analysis:**
Welcome to the data viewing section. Here, you can explore the collected weather data for various regions in Kenya.
Click the "View Data" button below to display the collected data in a table format. You can verify the data and ensure that all necessary information has been successfully collected.
Additionally, you can download the data as a CSV file for offline analysis or sharing with others.
""")
if st.button('View Data'):
with st.spinner('In progress....'):
sql = (
'SELECT *'
'FROM `central_database.openweathermap`'
)
data = client.query(sql).to_dataframe()
data = data.sort_values(by='Time_of_Data_Calculation',ascending=False).copy().reset_index(drop=True)
today_date = datetime.date.today()
today_data = data[data['Time_of_Data_Calculation'].dt.date == today_date]
num_today = len(today_data)
st.dataframe(data)
st.divider()
st.markdown('<b><u>Open Weather Map Statistics</u></b>',unsafe_allow_html=True)
st.text(f"Data Collection Began : {data['Time_of_Data_Calculation'].min()}\nLast Collection Date : {data['Time_of_Data_Calculation'].max()}\nToday's Date : {datetime.datetime.now()}")
st.text(f"Time elapsed since data collection began: {np.subtract(data['Time_of_Data_Calculation'].max(),data['Time_of_Data_Calculation'].min())}")
st.text(f"Total Number of Runs : {data.shape[0] / 46}")
st.text(f"Number of Runs Today: {num_today / 46}")
st.text(f"Rows : {data.shape[0]:,.0f}\nColumns : {data.shape[1]}\nCounties : {len(data['City'].unique())}")
st.download_button(label='Download as csv',data=data.to_csv().encode('utf-8'),mime='text/csv',file_name='Open Weather Data in Kenya.csv')
st.divider()
if st.button("Sign Out"):
st.session_state.user = None
st.success("User successfully logged out")
st.rerun() # Refresh the app to show the login section again