test / modules /dml /pdh /__init__.py
bilegentile's picture
Upload folder using huggingface_hub
c19ca42 verified
from ctypes import *
from ctypes.wintypes import *
from typing import NamedTuple, TypeVar
from .apis import PdhExpandWildCardPathW, PdhOpenQueryW, PdhAddEnglishCounterW, PdhCollectQueryData, PdhGetFormattedCounterValue, PdhGetFormattedCounterArrayW, PdhCloseQuery
from .structures import PDH_HQUERY, PDH_HCOUNTER, PDH_FMT_COUNTERVALUE, PPDH_FMT_COUNTERVALUE_ITEM_W
from .defines import *
from .msvcrt import malloc
from .errors import PDHError
class __InternalAbstraction(NamedTuple):
flag: int
attr_name: str
_type_map = {
int: __InternalAbstraction(PDH_FMT_LARGE, "largeValue"),
float: __InternalAbstraction(PDH_FMT_DOUBLE, "doubleValue"),
}
def expand_wildcard_path(path: str) -> list[str]:
listLength = DWORD(0)
if PdhExpandWildCardPathW(None, LPCWSTR(path), None, byref(listLength), PDH_NOEXPANDCOUNTERS) != PDH_MORE_DATA:
raise PDHError("Something went wrong.")
expanded = (WCHAR * listLength.value)()
if PdhExpandWildCardPathW(None, LPCWSTR(path), expanded, byref(listLength), PDH_NOEXPANDCOUNTERS) != PDH_OK:
raise PDHError(f"Couldn't expand wildcard path '{path}'")
result = []
cur = ""
for c in expanded:
if c == '\0':
result.append(cur)
cur = ""
else:
cur += c
result.pop()
return result
T = TypeVar("T", *_type_map.keys())
class HCounter(PDH_HCOUNTER):
def get_formatted_value(self, typ: T) -> T:
if typ not in _type_map:
raise PDHError(f"Invalid value type: {typ}")
flag, attr_name = _type_map[typ]
value = PDH_FMT_COUNTERVALUE()
if PdhGetFormattedCounterValue(self, DWORD(flag | PDH_FMT_NOSCALE), None, byref(value)) != PDH_OK:
raise PDHError("Couldn't get formatted counter value.")
return getattr(value.u, attr_name)
def get_formatted_dict(self, typ: T) -> dict[str, T]:
if typ not in _type_map:
raise PDHError(f"Invalid value type: {typ}")
flag, attr_name = _type_map[typ]
bufferSize = DWORD(0)
itemCount = DWORD(0)
if PdhGetFormattedCounterArrayW(self, DWORD(flag | PDH_FMT_NOSCALE), byref(bufferSize), byref(itemCount), None) != PDH_MORE_DATA:
raise PDHError("Something went wrong.")
itemBuffer = cast(malloc(c_size_t(bufferSize.value)), PPDH_FMT_COUNTERVALUE_ITEM_W)
if PdhGetFormattedCounterArrayW(self, DWORD(flag | PDH_FMT_NOSCALE), byref(bufferSize), byref(itemCount), itemBuffer) != PDH_OK:
raise PDHError("Couldn't get formatted counter array.")
result: dict[str, T] = dict()
for i in range(0, itemCount.value):
item = itemBuffer[i]
result[item.szName] = getattr(item.FmtValue.u, attr_name)
return result
class HQuery(PDH_HQUERY):
def __init__(self):
super().__init__()
if PdhOpenQueryW(None, None, byref(self)) != PDH_OK:
raise PDHError("Couldn't open PDH query.")
def add_counter(self, path: str) -> HCounter:
hCounter = HCounter()
if PdhAddEnglishCounterW(self, LPCWSTR(path), None, byref(hCounter)) != PDH_OK:
raise PDHError("Couldn't add counter query.")
return hCounter
def collect_data(self):
if PdhCollectQueryData(self) != PDH_OK:
raise PDHError("Couldn't collect query data.")
def close(self):
if PdhCloseQuery(self) != PDH_OK:
raise PDHError("Couldn't close PDH query.")