Spaces:
Sleeping
Sleeping
File size: 5,007 Bytes
d8d14f1 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 |
import time
import tracemalloc
from functools import wraps
from typing import Any, Callable
import psutil
from pydantic import BaseModel
from swarms.utils.loguru_logger import initialize_logger
logger = initialize_logger(log_folder="calculate_func_metrics")
class FunctionMetrics(BaseModel):
execution_time: float
memory_usage: float
cpu_usage: float
io_operations: int
function_calls: int
def profile_func(func):
"""
Decorator function that profiles the execution of a given function.
Args:
func: The function to be profiled.
Returns:
A wrapper function that profiles the execution of the given function and returns the result along with the metrics.
"""
def wrapper(*args, **kwargs):
# Record the initial time, memory usage, CPU usage, and I/O operations
start_time = time.time()
start_mem = psutil.Process().memory_info().rss
start_cpu = psutil.cpu_percent()
start_io = (
psutil.disk_io_counters().read_count
+ psutil.disk_io_counters().write_count
)
# Call the function
result = func(*args, **kwargs)
# Record the final time, memory usage, CPU usage, and I/O operations
end_time = time.time()
end_mem = psutil.Process().memory_info().rss
end_cpu = psutil.cpu_percent()
end_io = (
psutil.disk_io_counters().read_count
+ psutil.disk_io_counters().write_count
)
# Calculate the execution time, memory usage, CPU usage, and I/O operations
execution_time = end_time - start_time
memory_usage = (end_mem - start_mem) / (
1024**2
) # Convert bytes to MiB
cpu_usage = end_cpu - start_cpu
io_operations = end_io - start_io
# Return the metrics as a FunctionMetrics object
metrics = FunctionMetrics(
execution_time=execution_time,
memory_usage=memory_usage,
cpu_usage=cpu_usage,
io_operations=io_operations,
function_calls=1, # Each call to the function counts as one function call
)
json_data = metrics.model_dump_json(indent=4)
logger.info(f"Function metrics: {json_data}")
return result, metrics
return wrapper
def profile_all(func: Callable) -> Callable:
"""
A decorator to profile memory usage, CPU usage, and I/O operations
of a function and log the data using loguru.
It combines tracemalloc for memory profiling, psutil for CPU and I/O operations,
and measures execution time.
Args:
func (Callable): The function to be profiled.
Returns:
Callable: The wrapped function with profiling enabled.
"""
@wraps(func)
def wrapper(*args: Any, **kwargs: Any) -> Any:
# Start memory tracking
tracemalloc.start()
# Get initial CPU stats
process = psutil.Process()
initial_cpu_times = process.cpu_times()
# Get initial I/O stats if available
try:
initial_io_counters = process.io_counters()
io_tracking_available = True
except AttributeError:
logger.warning(
"I/O counters not available on this platform."
)
io_tracking_available = False
# Start timing the function execution
start_time = time.time()
# Execute the function
result = func(*args, **kwargs)
# Stop timing
end_time = time.time()
execution_time = end_time - start_time
# Get final CPU stats
final_cpu_times = process.cpu_times()
# Get final I/O stats if available
if io_tracking_available:
final_io_counters = process.io_counters()
io_read_count = (
final_io_counters.read_count
- initial_io_counters.read_count
)
io_write_count = (
final_io_counters.write_count
- initial_io_counters.write_count
)
else:
io_read_count = io_write_count = 0
# Get memory usage statistics
snapshot = tracemalloc.take_snapshot()
top_stats = snapshot.statistics("lineno")
# Calculate CPU usage
cpu_usage = (
final_cpu_times.user
- initial_cpu_times.user
+ final_cpu_times.system
- initial_cpu_times.system
)
# Log the data
logger.info(f"Execution time: {execution_time:.4f} seconds")
logger.info(f"CPU usage: {cpu_usage:.2f} seconds")
if io_tracking_available:
logger.info(
f"I/O Operations - Read: {io_read_count}, Write: {io_write_count}"
)
logger.info("Top memory usage:")
for stat in top_stats[:10]:
logger.info(stat)
# Stop memory tracking
tracemalloc.stop()
return result
return wrapper
|