#-------------------------------------libraries ---------------------------------- from requests import Request, Session from requests.exceptions import ConnectionError, Timeout, TooManyRedirects import json import os import pandas as pd import numpy as np import logging from dotenv import load_dotenv load_dotenv() #-------------------------------------env vars---------------------------------- url = os.getenv("URL_CMC") endpoints = ["v1/cryptocurrency/listings/latest", "/v1/cryptocurrency/trending/latest", ] start = "1" stop = "100" parameters = { 'start':start, 'limit':stop, 'convert':'USD' } headers = { 'Accepts': 'application/json', 'X-CMC_PRO_API_KEY': os.getenv("API_KEY_CMC"), } # Configure the logging settings log_folder = "./logs/scrapping/" os.makedirs(log_folder, exist_ok=True) # Ensure the log folder exists log_file = os.path.join(log_folder, "scrapping.log") log_format = "%(asctime)s [%(levelname)s] - %(message)s" logging.basicConfig(filename=log_file, level=logging.INFO, format=log_format) #-------------------------------------api call---------------------------------- session = Session() session.headers.update(headers) for endpoint in endpoints: target = f"{url}/{endpoint}" try: response = session.get(target, params=parameters) data = json.loads(response.text) with open(f'output/cmc_data_{endpoint.replace("/", "_")}_{stop}.json', 'w') as f: json.dump(data, f) logging.info(f"Successfully fetched data from {target}") except (ConnectionError, Timeout, TooManyRedirects) as e: logging.error(f"Error while fetching data from {target}: {e}") #-------------------------------------process data---------------------------------- # create data frame with chosen columns df = pd.DataFrame(data["data"])[["name","symbol","circulating_supply","total_supply","quote"]] # explode column quote then chose columns quote_df = pd.json_normalize(df['quote'].apply(lambda x: x['USD']))[["price","percent_change_24h","percent_change_7d","percent_change_90d","market_cap","fully_diluted_market_cap","last_updated"]] # drop quote df = df.drop("quote",axis=1) # create features df["percent_tokens_circulation"] = np.round((df["circulating_supply"]/df["total_supply"])*100,1) # merge dataframe df = df.join(quote_df) df["last_updated"] = pd.to_datetime(df["last_updated"]) #df.to_csv(f"output/top_{stop}_update.csv") #-------------------------------------save data---------------------------------- # Check if the file exists output_file = f"output/top_{stop}_update.csv" if os.path.isfile(output_file): logging.info("Updating dataset"+f"top_{stop}_update"+". ") # Read the existing data existing_data = pd.read_csv(output_file) # Concatenate the existing data with the new data vertically updated_data = pd.concat([existing_data, df], axis=0, ignore_index=True) # Remove duplicates (if any) based on a unique identifier column updated_data.drop_duplicates(subset=["symbol", "last_updated"], inplace=True) # Save the updated data back to the same file updated_data.to_csv(output_file, index=False) else: # If the file doesn't exist, save the current data to it df.to_csv(output_file, index=False) logging.info("Script execution completed.") #-------------------------------------end----------------------------------