File size: 2,407 Bytes
c49b21b
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
import os
from pathlib import Path
import pandas as pd
import glob

# Resolve DATA_DIR similar to other modules
try:
    from src.config import DATA_DIR as CFG_DATA_DIR
except Exception:
    try:
        from config import DATA_DIR as CFG_DATA_DIR
    except Exception:
        CFG_DATA_DIR = "/data"


def _resolve_under_data(path_like: str | os.PathLike) -> Path:
    p = Path(path_like)
    if p.is_absolute():
        return p
    parts = p.parts
    if parts and parts[0].lower() == "data":
        rel = Path(*parts[1:]) if len(parts) > 1 else Path()
    else:
        rel = p
    return Path(CFG_DATA_DIR) / rel

def add_latest_ratings_to_features(features_path, ratings_dir, output_path):
    # Resolve paths under DATA_DIR
    features_path = _resolve_under_data(features_path)
    ratings_dir = _resolve_under_data(ratings_dir)
    output_path = _resolve_under_data(output_path)

    # Load features
    features_df = pd.read_parquet(features_path)

    # Find all ratings files
    ratings_files = glob.glob(os.path.join(str(ratings_dir), '*_recommendation_trends.parquet'))
    latest_rows = []
    for file in ratings_files:
        # Read as Parquet file
        df = pd.read_parquet(file)
        # Get latest row by period (assuming period is YYYY-MM-DD)
        if 'period' in df.columns:
            df['period'] = pd.to_datetime(df['period'])
            latest = df.sort_values('period', ascending=False).iloc[[0]]
            latest_rows.append(latest)
    if latest_rows:
        all_latest_ratings = pd.concat(latest_rows, ignore_index=True)
    else:
        all_latest_ratings = pd.DataFrame()
    # Merge only if ratings data is available and has 'symbol' column
    if not all_latest_ratings.empty and 'symbol' in all_latest_ratings.columns:
        merged_df = features_df.merge(all_latest_ratings, on='symbol', how='left', suffixes=('', '_ratings'))
        merged_df.to_parquet(output_path, compression='snappy')
        print(f"[INFO] Added latest ratings data for all available symbols and saved to: {output_path}")
    else:
        print("[WARN] No valid ratings data found to merge. Output not updated.")

def main():
    features_path = "data/merged/features/stocks_features.parquet"
    ratings_dir = "data/finnhub/ratings"
    output_path = features_path
    add_latest_ratings_to_features(features_path, ratings_dir, output_path)

if __name__ == "__main__":
    main()