# ------------------------ Libraries -------------------------- import os import pandas as pd import streamlit as st import plotly.graph_objs as go import logging import subprocess import threading from dotenv import load_dotenv from requests.exceptions import ConnectionError, Timeout, TooManyRedirects import plotly.express as px import json import networkx as nx import time # ------------------------ Environment Variables -------------------------- load_dotenv() log_folder = os.getenv("LOG_FOLDER") # Logging log_folder = os.getenv("LOG_STREAMLIT") os.makedirs(log_folder, exist_ok=True) log_file = os.path.join(log_folder, "front.log") log_format = "%(asctime)s [%(levelname)s] - %(message)s" logging.basicConfig(filename=log_file, level=logging.INFO, format=log_format) logging.info("Streamlit app has started") # Create output folder if it doesn't exist if not os.path.exists("output"): os.makedirs("output") #-------------------------------------back---------------------------------- def safe_read_csv(file_path, sep=','): if os.path.exists(file_path) and os.path.getsize(file_path) > 0: return pd.read_csv(file_path, sep=sep) else: logging.warning(f"File {file_path} is empty or does not exist.") return pd.DataFrame() # return an empty DataFrame # etherscan ## Load the data from the CSV files df_etherscan = pd.DataFrame() for filename in os.listdir('output'): if filename.endswith('.csv') and 'transactions_' in filename: df_temp = safe_read_csv(os.path.join('output', filename), sep=',') df_etherscan = pd.concat([df_etherscan, df_temp], ignore_index=True) # CMC ## Load cmc data df_cmc = safe_read_csv("output/top_100_update.csv", sep=',') df_cmc = df_cmc[df_cmc["last_updated"] == df_cmc["last_updated"].max()] # Global metrics about the market def load_global_metrics(): try: return pd.read_csv("output/global_metrics.csv") except FileNotFoundError: logging.warning("Global metrics file not found.") return pd.DataFrame() # Return an empty DataFrame if file is not found # Load influencers def load_influencers(): try: with open("ressources/dict_influencers_addr.json", "r") as file: return json.load(file) except Exception as e: st.error(f"Error loading influencers: {e}") return {} # Load influencers def load_tokens(): try: with open("ressources/dict_tokens_addr.json", "r") as file: return json.load(file) except Exception as e: st.error(f"Error loading influencers: {e}") return {} def create_dominance_pie_chart(df_global_metrics): # Extract BTC and ETH dominance btc_dominance = df_global_metrics['btc_dominance'].iloc[0] eth_dominance = df_global_metrics['eth_dominance'].iloc[0] # Calculate the dominance of other cryptocurrencies others_dominance = 100 - btc_dominance - eth_dominance #print(btc_dominance,eth_dominance,others_dominance) # Prepare data for pie chart dominance_data = { 'Cryptocurrency': ['BTC', 'ETH', 'Others'], 'Dominance': [btc_dominance, eth_dominance, others_dominance] } df_dominance = pd.DataFrame(dominance_data) # Create a pie chart fig = px.pie(df_dominance, values='Dominance', names='Cryptocurrency', title='Market Cap Dominance') return fig def display_greed_fear_index(): try: df = pd.read_csv('output/greed_fear_index.csv') # Prepare data for plotting time_periods = ['One Year Ago', 'One Month Ago', 'One Week Ago', 'Previous Close', 'Now'] values = [ df['fgi_oneYearAgo_value'].iloc[0], df['fgi_oneMonthAgo_value'].iloc[0], df['fgi_oneWeekAgo_value'].iloc[0], df['fgi_previousClose_value'].iloc[0], df['fgi_now_value'].iloc[0] ] labels = [ df['fgi_oneYearAgo_valueText'].iloc[0], df['fgi_oneMonthAgo_valueText'].iloc[0], df['fgi_oneWeekAgo_valueText'].iloc[0], df['fgi_previousClose_valueText'].iloc[0], df['fgi_now_valueText'].iloc[0] ] # Create a Plotly figure fig = go.Figure(data=[ go.Scatter(x=time_periods, y=values, mode='lines+markers+text', text=labels, textposition='top center') ]) # Update layout fig.update_layout( title='Fear and Greed Index Over Time', xaxis_title='Time Period', yaxis_title='Index Value', yaxis=dict(range=[0, 100]) # Fear and Greed index ranges from 0 to 100 ) # Display the figure st.plotly_chart(fig) except FileNotFoundError: st.error("Greed and Fear index data not available. Please wait for the next update cycle.") def load_token_balances(): try: return pd.read_csv("output/influencers_token_balances.csv") except FileNotFoundError: logging.warning("Token balances file not found.") return pd.DataFrame() # Return an empty DataFrame if file is not found def create_token_balance_bar_plot(df): if df.empty: return go.Figure() # Return an empty figure if there is no data fig = px.bar(df, x="Influencer", y="Balance", color="Token", barmode="group") fig.update_layout( title="Token Balances of Influencers", xaxis_title="Influencer", yaxis_title="Token Balance", legend_title="Token" ) return fig def get_top_buyers(df, token, top_n=5): # Filter for selected token token_df = df[df['tokenSymbol'] == token] # Assuming 'value' column holds the amount bought and 'from' column holds the buyer's address top_buyers = token_df.groupby('from')['value'].sum().sort_values(ascending=False).head(top_n) return top_buyers.reset_index() def plot_top_buyers(df): fig = px.bar(df, x='from', y='value', title=f'Top 5 Buyers of {selected_token}',orientation="h") fig.update_layout(xaxis_title="Address", yaxis_title="Total Amount Bought") return fig def load_influencer_interactions(influencer_name): try: # Load the influencer addresses dictionary with open("ressources/dict_influencers_addr.json", "r") as file: influencers = json.load(file) # Get the address of the specified influencer influencer_address = influencers.get(influencer_name, None) if influencer_address is None: return pd.DataFrame(), None file_path = f"output/interactions_{influencer_name}.csv" df = pd.read_csv(file_path) # Keep only the 'from', 'to', and 'value' columns and remove duplicates df = df[['from', 'to', 'value']].drop_duplicates() return df, influencer_address except FileNotFoundError: return pd.DataFrame(), None def create_network_graph(df, influencer_name, influencer_address): G = nx.Graph() # Consider bidirectional interactions df_bi = pd.concat([df.rename(columns={'from': 'to', 'to': 'from'}), df]) interaction_counts = df_bi.groupby(['from', 'to']).size().reset_index(name='count') top_interactions = interaction_counts.sort_values('count', ascending=False).head(20) # Add edges and nodes to the graph for _, row in top_interactions.iterrows(): G.add_edge(row['from'], row['to'], weight=row['count']) G.add_node(row['from'], type='sender') G.add_node(row['to'], type='receiver') # Node positions pos = nx.spring_layout(G, weight='weight') # Edge trace edge_x = [] edge_y = [] edge_hover = [] for edge in G.edges(data=True): x0, y0 = pos[edge[0]] x1, y1 = pos[edge[1]] edge_x.extend([x0, x1, None]) edge_y.extend([y0, y1, None]) edge_hover.append(f'Interactions: {edge[2]["weight"]}') edge_trace = go.Scatter( x=edge_x, y=edge_y, line=dict(width=2, color='#888'), hoverinfo='text', text=edge_hover, mode='lines') # Node trace node_x = [] node_y = [] node_hover = [] node_size = [] for node in G.nodes(): x, y = pos[node] node_x.append(x) node_y.append(y) connections = len(G.edges(node)) interaction_sum = interaction_counts[interaction_counts['from'].eq(node) | interaction_counts['to'].eq(node)]['count'].sum() node_hover_info = f'Address: {node}
# of connections: {connections}
# of interactions: {interaction_sum}' if node == influencer_address: node_hover_info = f'Influencer: {influencer_name}
' + node_hover_info node_size.append(30) # Central node size else: node_size.append(20) # Other nodes size node_hover.append(node_hover_info) node_trace = go.Scatter( x=node_x, y=node_y, mode='markers', hoverinfo='text', text=node_hover, marker=dict( showscale=False, color='blue', size=node_size, line=dict(width=2, color='black'))) # Create figure fig = go.Figure(data=[edge_trace, node_trace], layout=go.Layout( title=f'
Network graph of wallet interactions for {influencer_name}', titlefont=dict(size=16), showlegend=False, hovermode='closest', margin=dict(b=20, l=5, r=5, t=40), xaxis=dict(showgrid=False, zeroline=False, showticklabels=False), yaxis=dict(showgrid=False, zeroline=False, showticklabels=False))) return fig, top_interactions # Function to read the last update time from a file def read_last_update_time(): try: with open("ressources/last_update.txt", "r") as file: return file.read() except FileNotFoundError: return "" # Initialize last_update_time using the function st.session_state.last_update_time = read_last_update_time() # Update Data Button with Timer Decorator def update_data_with_timer(): # Execute the scripts in the 'utils' folder to update data subprocess.call(["python", "utils/scrap_etherscan.py"]) subprocess.call(["python", "utils/scrap_cmc.py"]) subprocess.call(["python", "utils/scrap_influencers_balance.py"]) subprocess.call(["python", "utils/scrap_cmc_global_metrics.py"]) subprocess.call(["python", "utils/scrap_greed_fear_index.py"]) subprocess.call(["python", "utils/extract_tokens_balances.py"]) # Update the last_update_time variable last_update_time = time.strftime("%Y-%m-%d %H:%M:%S") st.session_state.last_update_time = last_update_time # Write the last update time to the file with open("ressources/last_update.txt", "w") as file: file.write(last_update_time) # Update Data Button with Timer Decorator def update_interactions(): # Execute the scripts in the 'utils' folder to update data subprocess.call(["python", "utils/extract_wallet_interactions.py"]) # Update the last_update_time variable #-------------------------------------scheduler ---------------------------------- # # Function to execute the scraping functions # def execute_etherscan_scraping(): # subprocess.call(["python", "utils/scrap_etherscan.py"]) # logging.info("Etherscan scraping completed") # threading.Timer(3600, execute_etherscan_scraping).start() # # Balancer scrapping # def execute_influencers_scraping(): # subprocess.call(["python", "utils/scrap_influencers_balance.py"]) # logging.info("Influencers balance scraping completed") # threading.Timer(3600, execute_influencers_scraping).start() # # Function to execute the scraping functions # def execute_cmc_scraping(): # subprocess.call(["python", "utils/scrap_cmc.py"]) # logging.info("CMC scraping completed") # threading.Timer(2592000 / 9000, execute_cmc_scraping).start() # # Function to execute the global metrics scraping # def execute_global_metrics_scraping(): # subprocess.call(["python", "utils/scrap_cmc_global_metrics.py"]) # logging.info("Global metrics scraping completed") # threading.Timer(2592000 / 9000, execute_influencers_scraping).start() # def execute_greed_fear_index_scraping(): # subprocess.call(["python", "utils/scrap_greed_fear_index.py"]) # logging.info("Greed and Fear index scraping completed") # threading.Timer(3600, execute_greed_fear_index_scraping).start() # def execute_token_balances_scraping(): # subprocess.call(["python", "utils/extract_tokens_balances.py"]) # logging.info("Token balances scraping completed") # threading.Timer(3600, execute_token_balances_scraping).start() # if "initialized" not in st.session_state: # # Start the scraping threads # threading.Thread(target=execute_etherscan_scraping).start() # threading.Thread(target=execute_cmc_scraping).start() # threading.Thread(target=execute_influencers_scraping).start() # threading.Thread(target=execute_global_metrics_scraping).start() # threading.Thread(target=execute_greed_fear_index_scraping).start() # threading.Thread(target=execute_token_balances_scraping).start() # st.session_state["initialized"] = True #-------------------------------------streamlit ---------------------------------- # Set the title and other page configurations st.title('Crypto Analysis') st.write("Welcome to the Crypto Analysis app. Please note that data is not updated automatically due to API plan limitations.") # Display the last update time st.write(f"Time of last update: {st.session_state.last_update_time}") # Update Data Button with Timer Decorator if st.button("Scrap new data", on_click=update_data_with_timer): st.success("Data updated.") st.header("Global Cryptocurrency Market Metrics") # Create two columns for the two plots col1, col2 = st.columns(2) global_metrics_df = load_global_metrics() display_greed_fear_index() st.write(global_metrics_df) with col1: # Create and display the pie chart dominance_fig = create_dominance_pie_chart(global_metrics_df) dominance_fig.update_layout( autosize=False, width=300, height=300,) st.plotly_chart(dominance_fig) with col2: # cmc selected_var = st.selectbox('Select Var', ["percent_change_24h","percent_change_7d","percent_change_90d"], index=0) # Sort the DataFrame by the 'percent_change_24h' column in ascending order df_sorted = df_cmc.sort_values(by=selected_var, ascending=False) # Select the top 10 and worst 10 rows top_10 = df_sorted.head(10) worst_10 = df_sorted.tail(10) # Combine the top and worst dataframes for plotting combined_df = pd.concat([top_10, worst_10], axis=0) max_abs_val = max(abs(combined_df[selected_var].min()), abs(combined_df[selected_var].max())) # Create a bar plot for the top 10 with a green color scale fig = go.Figure(data=[ go.Bar( x=top_10["symbol"], y=top_10[selected_var], marker_color='rgb(0,100,0)', # Green color for top 10 hovertext= "Name : "+top_10["name"].astype(str)+ '
' + selected_var + " : " + top_10["percent_tokens_circulation"].astype(str) + '
' + 'Market Cap: ' + top_10["market_cap"].astype(str) + '
' + 'Fully Diluted Market Cap: ' + top_10["fully_diluted_market_cap"].astype(str) + '
' + 'Last Updated: ' + top_10["last_updated"].astype(str), name="top_10" ) ]) # Add the worst 10 to the same plot with a red color scale fig.add_traces(go.Bar( x=worst_10["symbol"], y=worst_10[selected_var], marker_color='rgb(255,0,0)', # Red color for worst 10 hovertext="Name:"+worst_10["name"].astype(str)+ '
' + selected_var + " : " + worst_10["percent_tokens_circulation"].astype(str) + '
' + 'Market Cap: ' + worst_10["market_cap"].astype(str) + '
' + 'Fully Diluted Market Cap: ' + worst_10["fully_diluted_market_cap"].astype(str) + '
' + 'Last Updated: ' + worst_10["last_updated"].astype(str), name="worst_10" ) ) # Customize aspect fig.update_traces(marker_line_color='rgb(8,48,107)', marker_line_width=1.5, opacity=0.8) fig.update_layout(title_text=f'Top 10 and Worst 10 by {selected_var.split("_")[-1]} Percentage Change') fig.update_xaxes(categoryorder='total ascending') fig.update_layout( autosize=False, width=300, height=300, #paper_bgcolor="LightSteelBlue", ) st.plotly_chart(fig) st.header("Deep Dive into Specific Coins") col1, col2 = st.columns(2) tokens = load_tokens() selected_token = st.selectbox('Select Token', df_etherscan['tokenSymbol'].unique(), index=0) token_input = st.text_input("Add new token", placeholder="e.g., APE:0x123...ABC") if st.button("Add Token"): if ":" in token_input: try: new_token_name, new_token_addr = token_input.split(":") tokens[new_token_name.strip()] = new_token_addr.strip() with open("ressources/dict_tokens_addr.json", "w") as file: json.dump(tokens, file, indent=4) st.success(f"Token {new_token_name} added") subprocess.call(["python", "utils/scrap_etherscan.py"]) df_etherscan = pd.DataFrame() for filename in os.listdir('output'): if filename.endswith('.csv') and 'transactions_' in filename: df_temp = safe_read_csv(os.path.join('output', filename), sep=',') df_etherscan = pd.concat([df_etherscan, df_temp], ignore_index=True) except ValueError: st.error("Invalid format. Please enter as 'name:address'") else: st.error("Please enter the influencer details as 'name:address'") with col1: # Filter the data based on the selected token filtered_df = df_etherscan[df_etherscan['tokenSymbol'] == selected_token] # Plot the token volume over time st.plotly_chart( go.Figure( data=[ go.Scatter( x=filtered_df['timeStamp'], y=filtered_df['value'], mode='lines', name='Volume over time' ) ], layout=go.Layout( title='Token Volume Over Time', yaxis=dict( title=f'Volume ({selected_token})', ), showlegend=True, legend=go.layout.Legend(x=0, y=1.0), margin=go.layout.Margin(l=40, r=0, t=40, b=30), width=300, height=300, ) ) ) with col2: # Processing data top_buyers_df = get_top_buyers(df_etherscan, selected_token) # Plotting if not top_buyers_df.empty: top_buyers_fig = plot_top_buyers(top_buyers_df) top_buyers_fig.update_layout( autosize=False, width=300, height=300) st.plotly_chart(top_buyers_fig) else: st.write(f"No buying data available for {selected_token}") st.header("Influencers' Token Balances") token_balances_df = load_token_balances() col1, col2 = st.columns(2) influencers = load_influencers() influencer_input = st.text_input("Add a new influencer", placeholder="e.g., alice:0x123...ABC") if st.button("Add Influencer"): if ":" in influencer_input: try: new_influencer_name, new_influencer_addr = influencer_input.split(":") influencers[new_influencer_name.strip()] = new_influencer_addr.strip() with open("ressources/dict_influencers_addr.json", "w") as file: json.dump(influencers, file, indent=4) st.success(f"Influencer {new_influencer_name} added") subprocess.call(["python", "utils/scrap_influencers_balance.py"]) subprocess.call(["python", "utils/extract_tokens_balances.py"]) token_balances_df = load_token_balances() except ValueError: st.error("Invalid format. Please enter as 'name:address'") else: st.error("Please enter the influencer details as 'name:address'") with col1: if not token_balances_df.empty: token_balance_fig = create_token_balance_bar_plot(token_balances_df) token_balance_fig.update_layout( autosize=False, width=300, height=400,) st.plotly_chart(token_balance_fig) else: st.write("No token balance data available.") with col2: # Load Ether balances try: df_balances = pd.read_csv("output/influencers_balances.csv") logging.info(f"Balances uploaded, shape of dataframe is {df_balances.shape}") #st.write("DataFrame Loaded:", df_balances) # Debugging line except FileNotFoundError: st.error("Balance data not found. Please wait for the next update cycle.") df_balances = pd.DataFrame() # Inverting the influencers dictionary inverted_influencers = {v.lower(): k for k, v in influencers.items()} if not df_balances.empty: df_balances["balance"] = df_balances["balance"].astype(float) / 1e18 # Convert Wei to Ether df_balances = df_balances.rename(columns={"account": "address"}) # Ensure addresses are in the same format as in the inverted dictionary (e.g., lowercase) df_balances["address"] = df_balances["address"].str.lower() # Perform the mapping df_balances["influencer"] = df_balances["address"].map(inverted_influencers) #st.write("Mapped DataFrame:", df_balances) # Debugging line fig = px.bar(df_balances, y="influencer", x="balance",orientation="h") fig.update_layout( title='Ether Balances of Influencers', xaxis=dict( title='Balance in eth', titlefont_size=16, tickfont_size=14, )) fig.update_layout( autosize=False, width=300, height=400,) st.plotly_chart(fig) else: logging.info("DataFrame is empty") # In the Streamlit app st.header("Wallet Interactions Network Graph") # Update Data Button with Timer Decorator if st.button("Update interactions", on_click=update_interactions): st.success("Interactions data updated.") selected_influencer = st.selectbox("Select an Influencer", list(influencers.keys())) # Load interactions data for the selected influencer interactions_df, influencer_address = load_influencer_interactions(selected_influencer) if not interactions_df.empty: # Generate the network graph and the table of top interactions network_fig, top_interactions = create_network_graph(interactions_df, selected_influencer, influencer_address) # Display the network graph st.plotly_chart(network_fig) # Display the table of top interactions st.subheader(f"Top Interactions for {selected_influencer}") st.table(top_interactions) else: st.write(f"No wallet interaction data available for {selected_influencer}.") st.markdown("""
Repository LinkedIn GitHub
© 2023 Mohcine EL HARRAS
""", unsafe_allow_html=True) #-------------------------------------end ----------------------------------