File size: 4,129 Bytes
c65698a
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
import logging
from pandas import DataFrame
import pandas as pd

# Initialize logging
logging.basicConfig(level=logging.INFO)

class MarketDataCalculator:
    def convert_to_datetime(df: DataFrame) -> DataFrame:
        """Converts specific columns to datetime format."""
        df["time_period_start"] = pd.to_datetime(df["time_period_start"])
        df["time_open"] = pd.to_datetime(df["time_open"])
        df["time_close"] = pd.to_datetime(df["time_close"])
        return df

    def calculate_market_data(df: DataFrame) -> DataFrame:
        """
        Calculates various market data features such as VWAP, rolling mean, and Bollinger Bands.

        :param df: Input data frame with market data
        :return: Data frame with calculated features
        """
        if not isinstance(df, DataFrame):
            logging.error("Input is not a data frame")
            raise ValueError("Input must be a data frame")

        try:
            df["vwap"] = (df["volume_traded"] * df["price_close"]).cumsum() / df["volume_traded"].cumsum()
            df["rolling_mean"] = df["price_close"].rolling(window=20).mean()
            df["rolling_std"] = df["price_close"].rolling(window=20).std()
            df["bollinger_upper"] = df["rolling_mean"] + (df["rolling_std"] * 2)
            df["bollinger_lower"] = df["rolling_mean"] - (df["rolling_std"] * 2)
            df["ma50"] = df["price_close"].rolling(window=50).mean()
            df["ma200"] = df["price_close"].rolling(window=200).mean()
        except KeyError as e:
            logging.error(f"Missing necessary columns in data frame: {e}")
            raise

        return df

    def calculate_volatility(df: DataFrame) -> DataFrame:
        """
        Calculates volatility based on price high, low, and open.

        :param df: Input data frame with market data
        :return: Data frame with calculated volatility
        """
        try:
            df["volatility"] = ((df["price_high"] - df["price_low"]) / df["price_open"]) * 100
        except KeyError as e:
            logging.error(f"Missing necessary columns in data frame: {e}")
            raise

        return df

    def calculate_trade_velocity(df: DataFrame) -> DataFrame:
        """
        Calculates trade velocity based on trade count and time difference.

        :param df: Input data frame with market data
        :return: Data frame with calculated trade velocity
        """
        try:
            df["trade_velocity"] = df["trades_count"] / ((df["time_close"] - df["time_open"]).dt.total_seconds() / 3600)
        except KeyError as e:
            logging.error(f"Missing necessary columns in data frame: {e}")
            raise

        return df

    def calculate_rsi(df: DataFrame) -> DataFrame:
        """
        Calculates the Relative Strength Index (RSI).

        :param df: Input data frame with market data
        :return: Data frame with calculated RSI
        """
        try:
            delta = df["price_close"].diff()
            gain = (delta.where(delta > 0, 0)).fillna(0)
            loss = (-delta.where(delta < 0, 0)).fillna(0)
            avg_gain = gain.rolling(window=14).mean()
            avg_loss = loss.rolling(window=14).mean()
            rs = avg_gain / avg_loss
            df["rsi"] = 100 - (100 / (1 + rs))
        except KeyError as e:
            logging.error(f"Missing necessary columns in data frame: {e}")
            raise

        return df

    def calculate_macd(df: DataFrame) -> DataFrame:
        """
        Calculates the Moving Average Convergence Divergence (MACD).

        :param df: Input data frame with market data
        :return: Data frame with calculated MACD
        """
        try:
            df["ema12"] = df["price_close"].ewm(span=12, adjust=False).mean()
            df["ema26"] = df["price_close"].ewm(span=26, adjust=False).mean()
            df["macd"] = df["ema12"] - df["ema26"]
            df["macd_signal"] = df["macd"].ewm(span=9, adjust=False).mean()
        except KeyError as e:
            logging.error(f"Missing necessary columns in data frame: {e}")
            raise

        return df