dash-whales-spaces / utils /scrap_cmc.py
mohcineelharras's picture
working scheduler
349c960
raw
history blame
3.21 kB
# ---------------------- Library Imports ----------------------
import os
import json
import pandas as pd
import numpy as np
import logging
import requests
from dotenv import load_dotenv
from requests import Session
from requests.exceptions import ConnectionError, Timeout, TooManyRedirects
import time
# ---------------------- Environment Variables ----------------------
load_dotenv()
url_cmc = os.getenv("URL_CMC")
api_key_cmc = os.getenv("API_KEY_CMC")
log_folder = os.getenv("LOG_FOLDER")
os.makedirs(log_folder, exist_ok=True)
log_file = os.path.join(log_folder, "cmc_scrapping.log")
log_format = "%(asctime)s [%(levelname)s] - %(message)s"
logging.basicConfig(filename=log_file, level=logging.INFO, format=log_format)
# ---------------------- Helper Functions ----------------------
def log_execution_time(func):
def wrapper(*args, **kwargs):
start_time = time.time()
result = func(*args, **kwargs)
end_time = time.time()
logging.info(f"Function {func.__name__} executed in {end_time - start_time:.2f} seconds")
return result
return wrapper
def process_cmc_data(data, stop):
df = pd.DataFrame(data["data"])[["name", "symbol", "circulating_supply", "total_supply", "quote"]]
quote_df = pd.json_normalize(df['quote'].apply(lambda x: x['USD']))[["price", "percent_change_24h", "percent_change_7d", "percent_change_90d", "market_cap", "fully_diluted_market_cap", "last_updated"]]
df = df.drop("quote", axis=1)
df["percent_tokens_circulation"] = np.round((df["circulating_supply"] / df["total_supply"]) * 100, 1)
df = df.join(quote_df)
df["last_updated"] = pd.to_datetime(df["last_updated"])
save_cmc_data(df, stop)
def save_cmc_data(df, stop):
output_file = f"output/top_{stop}_update.csv"
if os.path.isfile(output_file):
existing_data = pd.read_csv(output_file)
updated_data = pd.concat([existing_data, df], axis=0, ignore_index=True)
updated_data.drop_duplicates(subset=["symbol", "last_updated"], inplace=True)
updated_data.to_csv(output_file, index=False)
else:
df.to_csv(output_file, index=False)
logging.info("CMC data script execution completed.")
# ---------------------- CMC Scraping Function ----------------------
@log_execution_time
def fetch_and_process_cmc_data():
session = Session()
session.headers.update({
'Accepts': 'application/json',
'X-CMC_PRO_API_KEY': api_key_cmc,
})
parameters = {
'start': '1',
'limit': '100',
'convert': 'USD'
}
for endpoint in ["v1/cryptocurrency/listings/latest"]:
target = f"{url_cmc}/{endpoint}"
try:
response = session.get(target, params=parameters)
data = json.loads(response.text)
with open(f'output/cmc_data_{endpoint.replace("/", "_")}_100.json', 'w') as f:
json.dump(data, f)
process_cmc_data(data, '100')
except (ConnectionError, Timeout, TooManyRedirects) as e:
logging.error(f"Error while fetching data from {target}: {e}")
# ---------------------- Execution ----------------------
if __name__ == "__main__":
fetch_and_process_cmc_data()