|
|
import json |
|
|
import logging |
|
|
import datasets |
|
|
import requests |
|
|
import math |
|
|
import re |
|
|
from datasets import load_dataset, get_dataset_config_names, get_dataset_infos |
|
|
from huggingface_hub import HfApi, DatasetCard, DatasetCardData |
|
|
|
|
|
|
|
|
logging.basicConfig(level=logging.INFO) |
|
|
logger = logging.getLogger(__name__) |
|
|
|
|
|
class DatasetCommandCenter: |
|
|
def __init__(self, token=None): |
|
|
self.token = token |
|
|
self.api = HfApi(token=token) |
|
|
self.username=self.api.whoami()['name'] |
|
|
print("######################################") |
|
|
print(self.username) |
|
|
print("######################################") |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def get_dataset_metadata(self, dataset_id): |
|
|
""" |
|
|
Fetches Configs and Splits. |
|
|
""" |
|
|
configs = ['default'] |
|
|
splits = ['train', 'test', 'validation'] |
|
|
license_name = "unknown" |
|
|
|
|
|
try: |
|
|
|
|
|
try: |
|
|
found_configs = get_dataset_config_names(dataset_id, token=self.token) |
|
|
if found_configs: |
|
|
configs = found_configs |
|
|
except Exception: |
|
|
pass |
|
|
|
|
|
|
|
|
try: |
|
|
selected = configs[0] |
|
|
infos = get_dataset_infos(dataset_id, token=self.token) |
|
|
print(infos) |
|
|
info = None |
|
|
if selected in infos: |
|
|
info = infos[selected] |
|
|
elif 'default' in infos: |
|
|
info = infos['default'] |
|
|
elif infos: |
|
|
info = list(infos.values())[0] |
|
|
|
|
|
if info: |
|
|
splits = list(info.splits.keys()) |
|
|
license_name = info.license or "unknown" |
|
|
except Exception: |
|
|
pass |
|
|
|
|
|
return { |
|
|
"status": "success", |
|
|
"configs": configs, |
|
|
"splits": splits, |
|
|
"license_detected": license_name |
|
|
} |
|
|
except Exception as e: |
|
|
return {"status": "error", "message": str(e)} |
|
|
|
|
|
def get_splits_for_config(self, dataset_id, config_name): |
|
|
try: |
|
|
infos = get_dataset_infos(dataset_id, config_name=config_name, token=self.token) |
|
|
if config_name in infos: |
|
|
splits = list(infos[config_name].splits.keys()) |
|
|
elif len(infos) > 0: |
|
|
splits = list(infos.values())[0].splits.keys() |
|
|
else: |
|
|
splits = ['train', 'test'] |
|
|
return {"status": "success", "splits": splits} |
|
|
except: |
|
|
return {"status": "success", "splits": ['train', 'test', 'validation']} |
|
|
|
|
|
def _sanitize_for_json(self, obj): |
|
|
""" |
|
|
Recursively cleans data for JSON serialization. |
|
|
""" |
|
|
if isinstance(obj, float): |
|
|
if math.isnan(obj) or math.isinf(obj): |
|
|
return None |
|
|
return obj |
|
|
elif isinstance(obj, dict): |
|
|
return {k: self._sanitize_for_json(v) for k, v in obj.items()} |
|
|
elif isinstance(obj, list): |
|
|
return [self._sanitize_for_json(v) for v in obj] |
|
|
elif isinstance(obj, (str, int, bool, type(None))): |
|
|
return obj |
|
|
else: |
|
|
return str(obj) |
|
|
|
|
|
def _flatten_object(self, obj, parent_key='', sep='.'): |
|
|
""" |
|
|
Recursively finds keys for the UI dropdowns. |
|
|
""" |
|
|
items = {} |
|
|
|
|
|
|
|
|
if isinstance(obj, str): |
|
|
s = obj.strip() |
|
|
if (s.startswith('{') and s.endswith('}')) or (s.startswith('[') and s.endswith(']')): |
|
|
try: |
|
|
obj = json.loads(s) |
|
|
except: |
|
|
pass |
|
|
|
|
|
if isinstance(obj, dict): |
|
|
for k, v in obj.items(): |
|
|
new_key = f"{parent_key}{sep}{k}" if parent_key else k |
|
|
items.update(self._flatten_object(v, new_key, sep=sep)) |
|
|
elif isinstance(obj, list): |
|
|
new_key = f"{parent_key}" if parent_key else "list_content" |
|
|
items[new_key] = "List" |
|
|
else: |
|
|
items[parent_key] = type(obj).__name__ |
|
|
|
|
|
return items |
|
|
|
|
|
def inspect_dataset(self, dataset_id, config, split): |
|
|
try: |
|
|
conf = config if config != 'default' else None |
|
|
ds_stream = load_dataset(dataset_id, name=conf, split=split, streaming=True, token=self.token) |
|
|
|
|
|
sample_rows = [] |
|
|
available_paths = set() |
|
|
schema_map = {} |
|
|
|
|
|
for i, row in enumerate(ds_stream): |
|
|
if i >= 10: break |
|
|
|
|
|
|
|
|
row = dict(row) |
|
|
|
|
|
|
|
|
clean_row = self._sanitize_for_json(row) |
|
|
sample_rows.append(clean_row) |
|
|
|
|
|
|
|
|
flattened = self._flatten_object(row) |
|
|
available_paths.update(flattened.keys()) |
|
|
|
|
|
|
|
|
for k, v in row.items(): |
|
|
if k not in schema_map: |
|
|
schema_map[k] = {"type": "Object"} |
|
|
|
|
|
val = v |
|
|
if isinstance(val, str): |
|
|
try: val = json.loads(val) |
|
|
except: pass |
|
|
|
|
|
if isinstance(val, list): |
|
|
schema_map[k]["type"] = "List" |
|
|
|
|
|
sorted_paths = sorted(list(available_paths)) |
|
|
schema_tree = {} |
|
|
for path in sorted_paths: |
|
|
root = path.split('.')[0] |
|
|
if root not in schema_tree: |
|
|
schema_tree[root] = [] |
|
|
schema_tree[root].append(path) |
|
|
|
|
|
return { |
|
|
"status": "success", |
|
|
"samples": sample_rows, |
|
|
"schema_tree": schema_tree, |
|
|
"schema": schema_map, |
|
|
"dataset_id": dataset_id |
|
|
} |
|
|
except Exception as e: |
|
|
return {"status": "error", "message": str(e)} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def _get_value_by_path(self, obj, path): |
|
|
""" |
|
|
Retrieves value. PRIORITY: Direct Key Access (Fastest). |
|
|
""" |
|
|
if not path: |
|
|
return obj |
|
|
|
|
|
|
|
|
if path is None or path == '': |
|
|
return obj |
|
|
|
|
|
|
|
|
|
|
|
try: |
|
|
|
|
|
if '.' not in path: |
|
|
return obj[path] |
|
|
except (KeyError, TypeError, AttributeError): |
|
|
pass |
|
|
|
|
|
|
|
|
keys = path.split('.') |
|
|
current = obj |
|
|
|
|
|
for i, key in enumerate(keys): |
|
|
if current is None: |
|
|
return None |
|
|
|
|
|
try: |
|
|
|
|
|
if isinstance(current, list) and key.isdigit(): |
|
|
current = current[int(key)] |
|
|
else: |
|
|
|
|
|
current = current[key] |
|
|
except (KeyError, TypeError, IndexError, AttributeError): |
|
|
return None |
|
|
|
|
|
|
|
|
is_last_key = (i == len(keys) - 1) |
|
|
if not is_last_key and isinstance(current, str): |
|
|
s = current.strip() |
|
|
if (s.startswith('{') and s.endswith('}')) or (s.startswith('[') and s.endswith(']')): |
|
|
try: |
|
|
current = json.loads(s) |
|
|
except: |
|
|
return None |
|
|
|
|
|
return current |
|
|
|
|
|
|
|
|
|
|
|
def _extract_from_list_logic(self, row, source_col, filter_key, filter_val, target_path): |
|
|
""" |
|
|
FROM source_col FIND ITEM WHERE filter_key == filter_val EXTRACT target_path |
|
|
""" |
|
|
data = row.get(source_col) |
|
|
|
|
|
if isinstance(data, str): |
|
|
try: |
|
|
data = json.loads(data) |
|
|
except: |
|
|
return None |
|
|
|
|
|
if not isinstance(data, list): |
|
|
return None |
|
|
|
|
|
matched_item = None |
|
|
for item in data: |
|
|
|
|
|
if str(item.get(filter_key, '')) == str(filter_val): |
|
|
matched_item = item |
|
|
break |
|
|
|
|
|
if matched_item: |
|
|
return self._get_value_by_path(matched_item, target_path) |
|
|
|
|
|
return None |
|
|
|
|
|
def _apply_projection(self, row, recipe): |
|
|
new_row = {} |
|
|
|
|
|
|
|
|
|
|
|
eval_context = None |
|
|
|
|
|
for col_def in recipe['columns']: |
|
|
t_type = col_def.get('type', 'simple') |
|
|
target_col = col_def['name'] |
|
|
|
|
|
try: |
|
|
if t_type == 'simple': |
|
|
|
|
|
new_row[target_col] = self._get_value_by_path(row, col_def['source']) |
|
|
|
|
|
elif t_type == 'list_search': |
|
|
|
|
|
new_row[target_col] = self._extract_from_list_logic( |
|
|
row, |
|
|
col_def['source'], |
|
|
col_def['filter_key'], |
|
|
col_def['filter_val'], |
|
|
col_def['target_key'] |
|
|
) |
|
|
|
|
|
elif t_type == 'python': |
|
|
if eval_context is None: |
|
|
eval_context = row.copy() |
|
|
eval_context['row'] = row |
|
|
eval_context['json'] = json |
|
|
eval_context['re'] = re |
|
|
eval_context['requests'] = requests |
|
|
|
|
|
|
|
|
val = eval(col_def['expression'], {}, eval_context) |
|
|
new_row[target_col] = val |
|
|
|
|
|
elif t_type == 'requests': |
|
|
print(t_type) |
|
|
|
|
|
eval_context = row.copy() |
|
|
eval_context['row'] = row |
|
|
|
|
|
print(col_def['rpay']) |
|
|
val = json.loads(col_def['rpay']) |
|
|
print(val) |
|
|
new_row[target_col] = requests.post(col_def['rurl'], json=val).text |
|
|
|
|
|
except Exception as e: |
|
|
raise ValueError(f"Column '{target_col}' failed: {str(e)}") |
|
|
|
|
|
return new_row |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def _generate_card(self, source_id, target_id, recipe, license_name): |
|
|
print(source_id) |
|
|
print(target_id) |
|
|
card_data = DatasetCardData( |
|
|
language="en", |
|
|
license=license_name, |
|
|
tags=["dataset-command-center", "etl", "generated-dataset"], |
|
|
base_model=source_id, |
|
|
) |
|
|
|
|
|
content = f""" |
|
|
# {target_id.split('/')[-1]} |
|
|
|
|
|
This dataset is a transformation of [{source_id}](https://huggingface.co/datasets/{source_id}). |
|
|
It was generated using the **Hugging Face Dataset Command Center**. |
|
|
|
|
|
## Transformation Recipe |
|
|
|
|
|
The following operations were applied to the source data: |
|
|
|
|
|
| Target Column | Operation Type | Source / Logic | |
|
|
|---------------|----------------|----------------| |
|
|
""" |
|
|
for col in recipe['columns']: |
|
|
c_type = col.get('type', 'simple') |
|
|
c_name = col['name'] |
|
|
c_src = col.get('source', '-') |
|
|
|
|
|
logic = "-" |
|
|
if c_type == 'simple': |
|
|
logic = f"Mapped from `{c_src}`" |
|
|
elif c_type == 'list_search': |
|
|
logic = f"Get `{col['target_key']}` where `{col['filter_key']} == {col['filter_val']}`" |
|
|
elif c_type == 'python': |
|
|
logic = f"Python: `{col.get('expression')}`" |
|
|
|
|
|
content += f"| **{c_name}** | {c_type} | {logic} |\n" |
|
|
|
|
|
if recipe.get('filter_rule'): |
|
|
content += f"\n### Row Filtering\n**Filter Applied:** `{recipe['filter_rule']}`\n" |
|
|
|
|
|
content += f"\n## Original License\nThis dataset inherits the license: `{license_name}` from the source." |
|
|
|
|
|
card = DatasetCard.from_template(card_data, content=content) |
|
|
return card |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def process_and_push(self, source_id, config, split, target_id, recipe, max_rows=None, new_license=None): |
|
|
logger.info(f"Job started: {source_id} -> {target_id}") |
|
|
conf = config if config != 'default' else None |
|
|
|
|
|
def gen(): |
|
|
ds_stream = load_dataset(source_id, name=conf, split=split, streaming=True, token=self.token) |
|
|
count = 0 |
|
|
for i, row in enumerate(ds_stream): |
|
|
if max_rows and count >= int(max_rows): |
|
|
break |
|
|
|
|
|
|
|
|
row = dict(row) |
|
|
|
|
|
|
|
|
if recipe.get('filter_rule'): |
|
|
try: |
|
|
ctx = row.copy() |
|
|
ctx['row'] = row |
|
|
ctx['json'] = json |
|
|
ctx['re'] = re |
|
|
ctx['requests'] = requests |
|
|
if not eval(recipe['filter_rule'], {}, ctx): |
|
|
continue |
|
|
except Exception as e: |
|
|
raise ValueError(f"Filter crashed on row {i}: {e}") |
|
|
|
|
|
|
|
|
try: |
|
|
yield self._apply_projection(row, recipe) |
|
|
count += 1 |
|
|
except ValueError as ve: |
|
|
raise ve |
|
|
except Exception as e: |
|
|
raise ValueError(f"Unexpected crash on row {i}: {e}") |
|
|
|
|
|
try: |
|
|
|
|
|
new_dataset = datasets.Dataset.from_generator(gen) |
|
|
new_dataset.push_to_hub(target_id, token=self.token) |
|
|
|
|
|
|
|
|
try: |
|
|
card = self._generate_card(source_id, target_id, recipe, new_license or "unknown") |
|
|
card.push_to_hub(f'{self.username}/{target_id}', token=self.token) |
|
|
except Exception as e: |
|
|
logger.error(f"Failed to push Dataset Card: {e}") |
|
|
|
|
|
return {"status": "success", "rows_processed": len(new_dataset)} |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Job Failed: {e}") |
|
|
return {"status": "failed", "error": str(e)} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def preview_transform(self, dataset_id, config, split, recipe): |
|
|
conf = config if config != 'default' else None |
|
|
|
|
|
try: |
|
|
|
|
|
ds_stream = load_dataset(dataset_id, name=conf, split=split, streaming=True, token=self.token) |
|
|
processed = [] |
|
|
|
|
|
for i, row in enumerate(ds_stream): |
|
|
|
|
|
if len(processed) >= 5: |
|
|
break |
|
|
|
|
|
|
|
|
|
|
|
row = dict(row) |
|
|
|
|
|
|
|
|
passed = True |
|
|
if recipe.get('filter_rule'): |
|
|
try: |
|
|
|
|
|
ctx = row.copy() |
|
|
ctx['row'] = row |
|
|
ctx['json'] = json |
|
|
ctx['re'] = re |
|
|
if not eval(recipe['filter_rule'], {}, ctx): |
|
|
passed = False |
|
|
except: |
|
|
|
|
|
passed = False |
|
|
|
|
|
if passed: |
|
|
try: |
|
|
|
|
|
new_row = self._apply_projection(row, recipe) |
|
|
|
|
|
|
|
|
|
|
|
clean_new_row = self._sanitize_for_json(new_row) |
|
|
|
|
|
processed.append(clean_new_row) |
|
|
except Exception as e: |
|
|
|
|
|
processed.append({"_preview_error": f"Row {i} Error: {str(e)}"}) |
|
|
|
|
|
return processed |
|
|
|
|
|
except Exception as e: |
|
|
|
|
|
raise e |