File size: 5,007 Bytes
d8d14f1
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
import time
import tracemalloc
from functools import wraps
from typing import Any, Callable

import psutil
from pydantic import BaseModel

from swarms.utils.loguru_logger import initialize_logger

logger = initialize_logger(log_folder="calculate_func_metrics")


class FunctionMetrics(BaseModel):
    execution_time: float
    memory_usage: float
    cpu_usage: float
    io_operations: int
    function_calls: int


def profile_func(func):
    """
    Decorator function that profiles the execution of a given function.

    Args:
        func: The function to be profiled.

    Returns:
        A wrapper function that profiles the execution of the given function and returns the result along with the metrics.

    """

    def wrapper(*args, **kwargs):
        # Record the initial time, memory usage, CPU usage, and I/O operations
        start_time = time.time()
        start_mem = psutil.Process().memory_info().rss
        start_cpu = psutil.cpu_percent()
        start_io = (
            psutil.disk_io_counters().read_count
            + psutil.disk_io_counters().write_count
        )

        # Call the function
        result = func(*args, **kwargs)

        # Record the final time, memory usage, CPU usage, and I/O operations
        end_time = time.time()
        end_mem = psutil.Process().memory_info().rss
        end_cpu = psutil.cpu_percent()
        end_io = (
            psutil.disk_io_counters().read_count
            + psutil.disk_io_counters().write_count
        )

        # Calculate the execution time, memory usage, CPU usage, and I/O operations
        execution_time = end_time - start_time
        memory_usage = (end_mem - start_mem) / (
            1024**2
        )  # Convert bytes to MiB
        cpu_usage = end_cpu - start_cpu
        io_operations = end_io - start_io

        # Return the metrics as a FunctionMetrics object
        metrics = FunctionMetrics(
            execution_time=execution_time,
            memory_usage=memory_usage,
            cpu_usage=cpu_usage,
            io_operations=io_operations,
            function_calls=1,  # Each call to the function counts as one function call
        )

        json_data = metrics.model_dump_json(indent=4)

        logger.info(f"Function metrics: {json_data}")

        return result, metrics

    return wrapper


def profile_all(func: Callable) -> Callable:
    """
    A decorator to profile memory usage, CPU usage, and I/O operations
    of a function and log the data using loguru.

    It combines tracemalloc for memory profiling, psutil for CPU and I/O operations,
    and measures execution time.

    Args:
        func (Callable): The function to be profiled.

    Returns:
        Callable: The wrapped function with profiling enabled.
    """

    @wraps(func)
    def wrapper(*args: Any, **kwargs: Any) -> Any:
        # Start memory tracking
        tracemalloc.start()

        # Get initial CPU stats
        process = psutil.Process()
        initial_cpu_times = process.cpu_times()

        # Get initial I/O stats if available
        try:
            initial_io_counters = process.io_counters()
            io_tracking_available = True
        except AttributeError:
            logger.warning(
                "I/O counters not available on this platform."
            )
            io_tracking_available = False

        # Start timing the function execution
        start_time = time.time()

        # Execute the function
        result = func(*args, **kwargs)

        # Stop timing
        end_time = time.time()
        execution_time = end_time - start_time

        # Get final CPU stats
        final_cpu_times = process.cpu_times()

        # Get final I/O stats if available
        if io_tracking_available:
            final_io_counters = process.io_counters()
            io_read_count = (
                final_io_counters.read_count
                - initial_io_counters.read_count
            )
            io_write_count = (
                final_io_counters.write_count
                - initial_io_counters.write_count
            )
        else:
            io_read_count = io_write_count = 0

        # Get memory usage statistics
        snapshot = tracemalloc.take_snapshot()
        top_stats = snapshot.statistics("lineno")

        # Calculate CPU usage
        cpu_usage = (
            final_cpu_times.user
            - initial_cpu_times.user
            + final_cpu_times.system
            - initial_cpu_times.system
        )

        # Log the data
        logger.info(f"Execution time: {execution_time:.4f} seconds")
        logger.info(f"CPU usage: {cpu_usage:.2f} seconds")
        if io_tracking_available:
            logger.info(
                f"I/O Operations - Read: {io_read_count}, Write: {io_write_count}"
            )
        logger.info("Top memory usage:")
        for stat in top_stats[:10]:
            logger.info(stat)

        # Stop memory tracking
        tracemalloc.stop()

        return result

    return wrapper