# ------------------------ Libraries --------------------------
import os
import pandas as pd
import streamlit as st
import plotly.graph_objs as go
import logging
import subprocess
import threading
from dotenv import load_dotenv
from requests.exceptions import ConnectionError, Timeout, TooManyRedirects
import plotly.express as px
import json
import networkx as nx
import time
# ------------------------ Environment Variables --------------------------
load_dotenv()
log_folder = os.getenv("LOG_FOLDER")
# Logging
log_folder = os.getenv("LOG_STREAMLIT")
os.makedirs(log_folder, exist_ok=True)
log_file = os.path.join(log_folder, "front.log")
log_format = "%(asctime)s [%(levelname)s] - %(message)s"
logging.basicConfig(filename=log_file, level=logging.INFO, format=log_format)
logging.info("Streamlit app has started")
# Create output folder if it doesn't exist
if not os.path.exists("output"):
os.makedirs("output")
#-------------------------------------back----------------------------------
def safe_read_csv(file_path, sep=','):
if os.path.exists(file_path) and os.path.getsize(file_path) > 0:
return pd.read_csv(file_path, sep=sep)
else:
logging.warning(f"File {file_path} is empty or does not exist.")
return pd.DataFrame() # return an empty DataFrame
# etherscan
## Load the data from the CSV files
df_etherscan = pd.DataFrame()
for filename in os.listdir('output'):
if filename.endswith('.csv') and 'transactions_' in filename:
df_temp = safe_read_csv(os.path.join('output', filename), sep=',')
df_etherscan = pd.concat([df_etherscan, df_temp], ignore_index=True)
# CMC
## Load cmc data
df_cmc = safe_read_csv("output/top_100_update.csv", sep=',')
df_cmc = df_cmc[df_cmc["last_updated"] == df_cmc["last_updated"].max()]
# Global metrics about the market
def load_global_metrics():
try:
return pd.read_csv("output/global_metrics.csv")
except FileNotFoundError:
logging.warning("Global metrics file not found.")
return pd.DataFrame() # Return an empty DataFrame if file is not found
# Load influencers
def load_influencers():
try:
with open("ressources/dict_influencers_addr.json", "r") as file:
return json.load(file)
except Exception as e:
st.error(f"Error loading influencers: {e}")
return {}
# Load influencers
def load_tokens():
try:
with open("ressources/dict_tokens_addr.json", "r") as file:
return json.load(file)
except Exception as e:
st.error(f"Error loading influencers: {e}")
return {}
def create_dominance_pie_chart(df_global_metrics):
# Extract BTC and ETH dominance
btc_dominance = df_global_metrics['btc_dominance'].iloc[0]
eth_dominance = df_global_metrics['eth_dominance'].iloc[0]
# Calculate the dominance of other cryptocurrencies
others_dominance = 100 - btc_dominance - eth_dominance
#print(btc_dominance,eth_dominance,others_dominance)
# Prepare data for pie chart
dominance_data = {
'Cryptocurrency': ['BTC', 'ETH', 'Others'],
'Dominance': [btc_dominance, eth_dominance, others_dominance]
}
df_dominance = pd.DataFrame(dominance_data)
# Create a pie chart
fig = px.pie(df_dominance, values='Dominance', names='Cryptocurrency', title='Market Cap Dominance')
return fig
def display_greed_fear_index():
try:
df = pd.read_csv('output/greed_fear_index.csv')
# Prepare data for plotting
time_periods = ['One Year Ago', 'One Month Ago', 'One Week Ago', 'Previous Close', 'Now']
values = [
df['fgi_oneYearAgo_value'].iloc[0],
df['fgi_oneMonthAgo_value'].iloc[0],
df['fgi_oneWeekAgo_value'].iloc[0],
df['fgi_previousClose_value'].iloc[0],
df['fgi_now_value'].iloc[0]
]
labels = [
df['fgi_oneYearAgo_valueText'].iloc[0],
df['fgi_oneMonthAgo_valueText'].iloc[0],
df['fgi_oneWeekAgo_valueText'].iloc[0],
df['fgi_previousClose_valueText'].iloc[0],
df['fgi_now_valueText'].iloc[0]
]
# Create a Plotly figure
fig = go.Figure(data=[
go.Scatter(x=time_periods, y=values, mode='lines+markers+text', text=labels, textposition='top center')
])
# Update layout
fig.update_layout(
title='Fear and Greed Index Over Time',
xaxis_title='Time Period',
yaxis_title='Index Value',
yaxis=dict(range=[0, 100]) # Fear and Greed index ranges from 0 to 100
)
# Display the figure
st.plotly_chart(fig)
except FileNotFoundError:
st.error("Greed and Fear index data not available. Please wait for the next update cycle.")
def load_token_balances():
try:
return pd.read_csv("output/influencers_token_balances.csv")
except FileNotFoundError:
logging.warning("Token balances file not found.")
return pd.DataFrame() # Return an empty DataFrame if file is not found
def create_token_balance_bar_plot(df):
if df.empty:
return go.Figure() # Return an empty figure if there is no data
fig = px.bar(df, x="Influencer", y="Balance", color="Token", barmode="group")
fig.update_layout(
title="Token Balances of Influencers",
xaxis_title="Influencer",
yaxis_title="Token Balance",
legend_title="Token"
)
return fig
def get_top_buyers(df, token, top_n=5):
# Filter for selected token
token_df = df[df['tokenSymbol'] == token]
# Assuming 'value' column holds the amount bought and 'from' column holds the buyer's address
top_buyers = token_df.groupby('from')['value'].sum().sort_values(ascending=False).head(top_n)
return top_buyers.reset_index()
def plot_top_buyers(df):
fig = px.bar(df, x='from', y='value', title=f'Top 5 Buyers of {selected_token}',orientation="h")
fig.update_layout(xaxis_title="Address", yaxis_title="Total Amount Bought")
return fig
def load_influencer_interactions(influencer_name):
try:
# Load the influencer addresses dictionary
with open("ressources/dict_influencers_addr.json", "r") as file:
influencers = json.load(file)
# Get the address of the specified influencer
influencer_address = influencers.get(influencer_name, None)
if influencer_address is None:
return pd.DataFrame(), None
file_path = f"output/interactions_{influencer_name}.csv"
df = pd.read_csv(file_path)
# Keep only the 'from', 'to', and 'value' columns and remove duplicates
df = df[['from', 'to', 'value']].drop_duplicates()
return df, influencer_address
except FileNotFoundError:
return pd.DataFrame(), None
def create_network_graph(df, influencer_name, influencer_address):
G = nx.Graph()
# Consider bidirectional interactions
df_bi = pd.concat([df.rename(columns={'from': 'to', 'to': 'from'}), df])
interaction_counts = df_bi.groupby(['from', 'to']).size().reset_index(name='count')
top_interactions = interaction_counts.sort_values('count', ascending=False).head(20)
# Add edges and nodes to the graph
for _, row in top_interactions.iterrows():
G.add_edge(row['from'], row['to'], weight=row['count'])
G.add_node(row['from'], type='sender')
G.add_node(row['to'], type='receiver')
# Node positions
pos = nx.spring_layout(G, weight='weight')
# Edge trace
edge_x = []
edge_y = []
edge_hover = []
for edge in G.edges(data=True):
x0, y0 = pos[edge[0]]
x1, y1 = pos[edge[1]]
edge_x.extend([x0, x1, None])
edge_y.extend([y0, y1, None])
edge_hover.append(f'Interactions: {edge[2]["weight"]}')
edge_trace = go.Scatter(
x=edge_x, y=edge_y,
line=dict(width=2, color='#888'),
hoverinfo='text',
text=edge_hover,
mode='lines')
# Node trace
node_x = []
node_y = []
node_hover = []
node_size = []
for node in G.nodes():
x, y = pos[node]
node_x.append(x)
node_y.append(y)
connections = len(G.edges(node))
interaction_sum = interaction_counts[interaction_counts['from'].eq(node) | interaction_counts['to'].eq(node)]['count'].sum()
node_hover_info = f'Address: {node}
# of connections: {connections}
# of interactions: {interaction_sum}'
if node == influencer_address:
node_hover_info = f'Influencer: {influencer_name}
' + node_hover_info
node_size.append(30) # Central node size
else:
node_size.append(20) # Other nodes size
node_hover.append(node_hover_info)
node_trace = go.Scatter(
x=node_x, y=node_y,
mode='markers',
hoverinfo='text',
text=node_hover,
marker=dict(
showscale=False,
color='blue',
size=node_size,
line=dict(width=2, color='black')))
# Create figure
fig = go.Figure(data=[edge_trace, node_trace],
layout=go.Layout(
title=f'
Network graph of wallet interactions for {influencer_name}',
titlefont=dict(size=16),
showlegend=False,
hovermode='closest',
margin=dict(b=20, l=5, r=5, t=40),
xaxis=dict(showgrid=False, zeroline=False, showticklabels=False),
yaxis=dict(showgrid=False, zeroline=False, showticklabels=False)))
return fig, top_interactions
# Function to read the last update time from a file
def read_last_update_time():
try:
with open("ressources/last_update.txt", "r") as file:
return file.read()
except FileNotFoundError:
return ""
# Initialize last_update_time using the function
st.session_state.last_update_time = read_last_update_time()
# Update Data Button with Timer Decorator
def update_data_with_timer():
# Execute the scripts in the 'utils' folder to update data
subprocess.call(["python", "utils/scrap_etherscan.py"])
subprocess.call(["python", "utils/scrap_cmc.py"])
subprocess.call(["python", "utils/scrap_influencers_balance.py"])
subprocess.call(["python", "utils/scrap_cmc_global_metrics.py"])
subprocess.call(["python", "utils/scrap_greed_fear_index.py"])
subprocess.call(["python", "utils/extract_tokens_balances.py"])
# Update the last_update_time variable
last_update_time = time.strftime("%Y-%m-%d %H:%M:%S")
st.session_state.last_update_time = last_update_time
# Write the last update time to the file
with open("ressources/last_update.txt", "w") as file:
file.write(last_update_time)
# Update Data Button with Timer Decorator
def update_interactions():
# Execute the scripts in the 'utils' folder to update data
subprocess.call(["python", "utils/extract_wallet_interactions.py"])
# Update the last_update_time variable
#-------------------------------------scheduler ----------------------------------
# # Function to execute the scraping functions
# def execute_etherscan_scraping():
# subprocess.call(["python", "utils/scrap_etherscan.py"])
# logging.info("Etherscan scraping completed")
# threading.Timer(3600, execute_etherscan_scraping).start()
# # Balancer scrapping
# def execute_influencers_scraping():
# subprocess.call(["python", "utils/scrap_influencers_balance.py"])
# logging.info("Influencers balance scraping completed")
# threading.Timer(3600, execute_influencers_scraping).start()
# # Function to execute the scraping functions
# def execute_cmc_scraping():
# subprocess.call(["python", "utils/scrap_cmc.py"])
# logging.info("CMC scraping completed")
# threading.Timer(2592000 / 9000, execute_cmc_scraping).start()
# # Function to execute the global metrics scraping
# def execute_global_metrics_scraping():
# subprocess.call(["python", "utils/scrap_cmc_global_metrics.py"])
# logging.info("Global metrics scraping completed")
# threading.Timer(2592000 / 9000, execute_influencers_scraping).start()
# def execute_greed_fear_index_scraping():
# subprocess.call(["python", "utils/scrap_greed_fear_index.py"])
# logging.info("Greed and Fear index scraping completed")
# threading.Timer(3600, execute_greed_fear_index_scraping).start()
# def execute_token_balances_scraping():
# subprocess.call(["python", "utils/extract_tokens_balances.py"])
# logging.info("Token balances scraping completed")
# threading.Timer(3600, execute_token_balances_scraping).start()
# if "initialized" not in st.session_state:
# # Start the scraping threads
# threading.Thread(target=execute_etherscan_scraping).start()
# threading.Thread(target=execute_cmc_scraping).start()
# threading.Thread(target=execute_influencers_scraping).start()
# threading.Thread(target=execute_global_metrics_scraping).start()
# threading.Thread(target=execute_greed_fear_index_scraping).start()
# threading.Thread(target=execute_token_balances_scraping).start()
# st.session_state["initialized"] = True
#-------------------------------------streamlit ----------------------------------
# Set the title and other page configurations
st.title('Crypto Analysis')
st.write("Welcome to the Crypto Analysis app. Please note that data is not updated automatically due to API plan limitations.")
# Display the last update time
st.write(f"Time of last update: {st.session_state.last_update_time}")
# Update Data Button with Timer Decorator
if st.button("Scrap new data", on_click=update_data_with_timer):
st.success("Data updated.")
st.header("Global Cryptocurrency Market Metrics")
# Create two columns for the two plots
col1, col2 = st.columns(2)
global_metrics_df = load_global_metrics()
display_greed_fear_index()
st.write(global_metrics_df)
with col1:
# Create and display the pie chart
dominance_fig = create_dominance_pie_chart(global_metrics_df)
dominance_fig.update_layout(
autosize=False,
width=300,
height=300,)
st.plotly_chart(dominance_fig)
with col2:
# cmc
selected_var = st.selectbox('Select Var', ["percent_change_24h","percent_change_7d","percent_change_90d"], index=0)
# Sort the DataFrame by the 'percent_change_24h' column in ascending order
df_sorted = df_cmc.sort_values(by=selected_var, ascending=False)
# Select the top 10 and worst 10 rows
top_10 = df_sorted.head(10)
worst_10 = df_sorted.tail(10)
# Combine the top and worst dataframes for plotting
combined_df = pd.concat([top_10, worst_10], axis=0)
max_abs_val = max(abs(combined_df[selected_var].min()), abs(combined_df[selected_var].max()))
# Create a bar plot for the top 10 with a green color scale
fig = go.Figure(data=[
go.Bar(
x=top_10["symbol"],
y=top_10[selected_var],
marker_color='rgb(0,100,0)', # Green color for top 10
hovertext= "Name : "+top_10["name"].astype(str)+ '
' +
selected_var + " : " + top_10["percent_tokens_circulation"].astype(str) + '
' +
'Market Cap: ' + top_10["market_cap"].astype(str) + '
' +
'Fully Diluted Market Cap: ' + top_10["fully_diluted_market_cap"].astype(str) + '
' +
'Last Updated: ' + top_10["last_updated"].astype(str),
name="top_10"
)
])
# Add the worst 10 to the same plot with a red color scale
fig.add_traces(go.Bar(
x=worst_10["symbol"],
y=worst_10[selected_var],
marker_color='rgb(255,0,0)', # Red color for worst 10
hovertext="Name:"+worst_10["name"].astype(str)+ '
' +
selected_var + " : " + worst_10["percent_tokens_circulation"].astype(str) + '
' +
'Market Cap: ' + worst_10["market_cap"].astype(str) + '
' +
'Fully Diluted Market Cap: ' + worst_10["fully_diluted_market_cap"].astype(str) + '
' +
'Last Updated: ' + worst_10["last_updated"].astype(str),
name="worst_10"
)
)
# Customize aspect
fig.update_traces(marker_line_color='rgb(8,48,107)', marker_line_width=1.5, opacity=0.8)
fig.update_layout(title_text=f'Top 10 and Worst 10 by {selected_var.split("_")[-1]} Percentage Change')
fig.update_xaxes(categoryorder='total ascending')
fig.update_layout(
autosize=False,
width=300,
height=300,
#paper_bgcolor="LightSteelBlue",
)
st.plotly_chart(fig)
st.header("Deep Dive into Specific Coins")
col1, col2 = st.columns(2)
tokens = load_tokens()
selected_token = st.selectbox('Select Token', df_etherscan['tokenSymbol'].unique(), index=0)
token_input = st.text_input("Add new token", placeholder="e.g., APE:0x123...ABC")
if st.button("Add Token"):
if ":" in token_input:
try:
new_token_name, new_token_addr = token_input.split(":")
tokens[new_token_name.strip()] = new_token_addr.strip()
with open("ressources/dict_tokens_addr.json", "w") as file:
json.dump(tokens, file, indent=4)
st.success(f"Token {new_token_name} added")
subprocess.call(["python", "utils/scrap_etherscan.py"])
df_etherscan = pd.DataFrame()
for filename in os.listdir('output'):
if filename.endswith('.csv') and 'transactions_' in filename:
df_temp = safe_read_csv(os.path.join('output', filename), sep=',')
df_etherscan = pd.concat([df_etherscan, df_temp], ignore_index=True)
except ValueError:
st.error("Invalid format. Please enter as 'name:address'")
else:
st.error("Please enter the influencer details as 'name:address'")
with col1:
# Filter the data based on the selected token
filtered_df = df_etherscan[df_etherscan['tokenSymbol'] == selected_token]
# Plot the token volume over time
st.plotly_chart(
go.Figure(
data=[
go.Scatter(
x=filtered_df['timeStamp'],
y=filtered_df['value'],
mode='lines',
name='Volume over time'
)
],
layout=go.Layout(
title='Token Volume Over Time',
yaxis=dict(
title=f'Volume ({selected_token})',
),
showlegend=True,
legend=go.layout.Legend(x=0, y=1.0),
margin=go.layout.Margin(l=40, r=0, t=40, b=30),
width=300,
height=300,
)
)
)
with col2:
# Processing data
top_buyers_df = get_top_buyers(df_etherscan, selected_token)
# Plotting
if not top_buyers_df.empty:
top_buyers_fig = plot_top_buyers(top_buyers_df)
top_buyers_fig.update_layout(
autosize=False,
width=300,
height=300)
st.plotly_chart(top_buyers_fig)
else:
st.write(f"No buying data available for {selected_token}")
st.header("Influencers' Token Balances")
token_balances_df = load_token_balances()
col1, col2 = st.columns(2)
influencers = load_influencers()
influencer_input = st.text_input("Add a new influencer", placeholder="e.g., alice:0x123...ABC")
if st.button("Add Influencer"):
if ":" in influencer_input:
try:
new_influencer_name, new_influencer_addr = influencer_input.split(":")
influencers[new_influencer_name.strip()] = new_influencer_addr.strip()
with open("ressources/dict_influencers_addr.json", "w") as file:
json.dump(influencers, file, indent=4)
st.success(f"Influencer {new_influencer_name} added")
subprocess.call(["python", "utils/scrap_influencers_balance.py"])
subprocess.call(["python", "utils/extract_tokens_balances.py"])
token_balances_df = load_token_balances()
except ValueError:
st.error("Invalid format. Please enter as 'name:address'")
else:
st.error("Please enter the influencer details as 'name:address'")
with col1:
if not token_balances_df.empty:
token_balance_fig = create_token_balance_bar_plot(token_balances_df)
token_balance_fig.update_layout(
autosize=False,
width=300,
height=400,)
st.plotly_chart(token_balance_fig)
else:
st.write("No token balance data available.")
with col2:
# Load Ether balances
try:
df_balances = pd.read_csv("output/influencers_balances.csv")
logging.info(f"Balances uploaded, shape of dataframe is {df_balances.shape}")
#st.write("DataFrame Loaded:", df_balances) # Debugging line
except FileNotFoundError:
st.error("Balance data not found. Please wait for the next update cycle.")
df_balances = pd.DataFrame()
# Inverting the influencers dictionary
inverted_influencers = {v.lower(): k for k, v in influencers.items()}
if not df_balances.empty:
df_balances["balance"] = df_balances["balance"].astype(float) / 1e18 # Convert Wei to Ether
df_balances = df_balances.rename(columns={"account": "address"})
# Ensure addresses are in the same format as in the inverted dictionary (e.g., lowercase)
df_balances["address"] = df_balances["address"].str.lower()
# Perform the mapping
df_balances["influencer"] = df_balances["address"].map(inverted_influencers)
#st.write("Mapped DataFrame:", df_balances) # Debugging line
fig = px.bar(df_balances, y="influencer", x="balance",orientation="h")
fig.update_layout(
title='Ether Balances of Influencers',
xaxis=dict(
title='Balance in eth',
titlefont_size=16,
tickfont_size=14,
))
fig.update_layout(
autosize=False,
width=300,
height=400,)
st.plotly_chart(fig)
else:
logging.info("DataFrame is empty")
# In the Streamlit app
st.header("Wallet Interactions Network Graph")
# Update Data Button with Timer Decorator
if st.button("Update interactions", on_click=update_interactions):
st.success("Interactions data updated.")
selected_influencer = st.selectbox("Select an Influencer", list(influencers.keys()))
# Load interactions data for the selected influencer
interactions_df, influencer_address = load_influencer_interactions(selected_influencer)
if not interactions_df.empty:
# Generate the network graph and the table of top interactions
network_fig, top_interactions = create_network_graph(interactions_df, selected_influencer, influencer_address)
# Display the network graph
st.plotly_chart(network_fig)
# Display the table of top interactions
st.subheader(f"Top Interactions for {selected_influencer}")
st.table(top_interactions)
else:
st.write(f"No wallet interaction data available for {selected_influencer}.")
st.markdown("""