"""This section describes unitxt operators for structured data. These operators are specialized in handling structured data like tables. For tables, expected input format is: { "header": ["col1", "col2"], "rows": [["row11", "row12"], ["row21", "row22"], ["row31", "row32"]] } For triples, expected input format is: [[ "subject1", "relation1", "object1" ], [ "subject1", "relation2", "object2"]] For key-value pairs, expected input format is: {"key1": "value1", "key2": value2, "key3": "value3"} ------------------------ """ import json import random from abc import ABC, abstractmethod from typing import ( Any, Dict, List, Optional, ) import pandas as pd from .dict_utils import dict_get from .operators import FieldOperator, InstanceOperator from .random_utils import new_random_generator from .serializers import TableSerializer from .types import Table from .utils import recursive_copy def shuffle_columns(table: Table, seed=0) -> Table: # extract header & rows from the dictionary header = table.get("header", []) rows = table.get("rows", []) # shuffle the indices first indices = list(range(len(header))) random_generator = new_random_generator({"table": table, "seed": seed}) random_generator.shuffle(indices) # shuffle the header & rows based on that indices shuffled_header = [header[i] for i in indices] shuffled_rows = [[row[i] for i in indices] for row in rows] table["header"] = shuffled_header table["rows"] = shuffled_rows return table def shuffle_rows(table: Table, seed=0) -> Table: # extract header & rows from the dictionary rows = table.get("rows", []) # shuffle rows random_generator = new_random_generator({"table": table, "seed": seed}) random_generator.shuffle(rows) table["rows"] = rows return table class SerializeTable(ABC, TableSerializer): """TableSerializer converts a given table into a flat sequence with special symbols. Output format varies depending on the chosen serializer. This abstract class defines structure of a typical table serializer that any concrete implementation should follow. """ seed: int = 0 shuffle_rows: bool = False shuffle_columns: bool = False def serialize(self, value: Table, instance: Dict[str, Any]) -> str: value = recursive_copy(value) if self.shuffle_columns: value = shuffle_columns(table=value, seed=self.seed) if self.shuffle_rows: value = shuffle_rows(table=value, seed=self.seed) return self.serialize_table(value) # main method to serialize a table @abstractmethod def serialize_table(self, table_content: Dict) -> str: pass # method to process table header def process_header(self, header: List): pass # method to process a table row def process_row(self, row: List, row_index: int): pass # Concrete classes implementing table serializers class SerializeTableAsIndexedRowMajor(SerializeTable): """Indexed Row Major Table Serializer. Commonly used row major serialization format. Format: col : col1 | col2 | col 3 row 1 : val1 | val2 | val3 | val4 row 2 : val1 | ... """ # main method that processes a table # table_content must be in the presribed input format def serialize_table(self, table_content: Dict) -> str: # Extract headers and rows from the dictionary header = table_content.get("header", []) rows = table_content.get("rows", []) assert header and rows, "Incorrect input table format" # Process table header first serialized_tbl_str = self.process_header(header) + " " # Process rows sequentially starting from row 1 for i, row in enumerate(rows, start=1): serialized_tbl_str += self.process_row(row, row_index=i) + " " # return serialized table as a string return serialized_tbl_str.strip() # serialize header into a string containing the list of column names separated by '|' symbol def process_header(self, header: List): return "col : " + " | ".join(header) # serialize a table row into a string containing the list of cell values separated by '|' def process_row(self, row: List, row_index: int): serialized_row_str = "" row_cell_values = [ str(value) if isinstance(value, (int, float)) else value for value in row ] serialized_row_str += " | ".join(row_cell_values) return f"row {row_index} : {serialized_row_str}" class SerializeTableAsMarkdown(SerializeTable): """Markdown Table Serializer. Markdown table format is used in GitHub code primarily. Format: |col1|col2|col3| |---|---|---| |A|4|1| |I|2|1| ... """ # main method that serializes a table. # table_content must be in the presribed input format. def serialize_table(self, table_content: Dict) -> str: # Extract headers and rows from the dictionary header = table_content.get("header", []) rows = table_content.get("rows", []) assert header and rows, "Incorrect input table format" # Process table header first serialized_tbl_str = self.process_header(header) # Process rows sequentially starting from row 1 for i, row in enumerate(rows, start=1): serialized_tbl_str += self.process_row(row, row_index=i) # return serialized table as a string return serialized_tbl_str.strip() # serialize header into a string containing the list of column names def process_header(self, header: List): header_str = "|{}|\n".format("|".join(header)) header_str += "|{}|\n".format("|".join(["---"] * len(header))) return header_str # serialize a table row into a string containing the list of cell values def process_row(self, row: List, row_index: int): row_str = "" row_str += "|{}|\n".format("|".join(str(cell) for cell in row)) return row_str class SerializeTableAsDFLoader(SerializeTable): """DFLoader Table Serializer. Pandas dataframe based code snippet format serializer. Format(Sample): pd.DataFrame({ "name" : ["Alex", "Diana", "Donald"], "age" : [26, 34, 39] }, index=[0,1,2]) """ # main method that serializes a table. # table_content must be in the presribed input format. def serialize_table(self, table_content: Dict) -> str: # Extract headers and rows from the dictionary header = table_content.get("header", []) rows = table_content.get("rows", []) assert header and rows, "Incorrect input table format" # Fix duplicate columns, ensuring the first occurrence has no suffix header = [ f"{col}_{header[:i].count(col)}" if header[:i].count(col) > 0 else col for i, col in enumerate(header) ] # Create a pandas DataFrame df = pd.DataFrame(rows, columns=header) # Generate output string in the desired format data_dict = df.to_dict(orient="list") return ( "pd.DataFrame({\n" + json.dumps(data_dict) + "},\nindex=" + str(list(range(len(rows)))) + ")" ) class SerializeTableAsJson(SerializeTable): """JSON Table Serializer. Json format based serializer. Format(Sample): { "0":{"name":"Alex","age":26}, "1":{"name":"Diana","age":34}, "2":{"name":"Donald","age":39} } """ # main method that serializes a table. # table_content must be in the presribed input format. def serialize_table(self, table_content: Dict) -> str: # Extract headers and rows from the dictionary header = table_content.get("header", []) rows = table_content.get("rows", []) assert header and rows, "Incorrect input table format" # Generate output dictionary output_dict = {} for i, row in enumerate(rows): output_dict[i] = {header[j]: value for j, value in enumerate(row)} # Convert dictionary to JSON string return json.dumps(output_dict) class SerializeTableAsHTML(SerializeTable): """HTML Table Serializer. HTML table format used for rendering tables in web pages. Format(Sample):
nameagesex
Alice26F
Raj34M
""" # main method that serializes a table. # table_content must be in the prescribed input format. def serialize_table(self, table_content: Dict) -> str: # Extract headers and rows from the dictionary header = table_content.get("header", []) rows = table_content.get("rows", []) assert header and rows, "Incorrect input table format" # Build the HTML table structure serialized_tbl_str = "\n" serialized_tbl_str += self.process_header(header) + "\n" serialized_tbl_str += self.process_rows(rows) + "\n" serialized_tbl_str += "
" return serialized_tbl_str.strip() # serialize the header into an HTML section def process_header(self, header: List) -> str: header_html = " \n " for col in header: header_html += f"{col}" header_html += "\n " return header_html # serialize the rows into an HTML section def process_rows(self, rows: List[List]) -> str: rows_html = " " for row in rows: rows_html += "\n " for cell in row: rows_html += f"{cell}" rows_html += "" rows_html += "\n " return rows_html # truncate cell value to maximum allowed length def truncate_cell(cell_value, max_len): if cell_value is None: return None if isinstance(cell_value, int) or isinstance(cell_value, float): return None if cell_value.strip() == "": return None if len(cell_value) > max_len: return cell_value[:max_len] return None class TruncateTableCells(InstanceOperator): """Limit the maximum length of cell values in a table to reduce the overall length. Args: max_length (int) - maximum allowed length of cell values For tasks that produce a cell value as answer, truncating a cell value should be replicated with truncating the corresponding answer as well. This has been addressed in the implementation. """ max_length: int = 15 table: str = None text_output: Optional[str] = None def process( self, instance: Dict[str, Any], stream_name: Optional[str] = None ) -> Dict[str, Any]: table = dict_get(instance, self.table) answers = [] if self.text_output is not None: answers = dict_get(instance, self.text_output) self.truncate_table(table_content=table, answers=answers) return instance # truncate table cells def truncate_table(self, table_content: Dict, answers: Optional[List]): cell_mapping = {} # One row at a time for row in table_content.get("rows", []): for i, cell in enumerate(row): truncated_cell = truncate_cell(cell, self.max_length) if truncated_cell is not None: cell_mapping[cell] = truncated_cell row[i] = truncated_cell # Update values in answer list to truncated values if answers is not None: for i, case in enumerate(answers): answers[i] = cell_mapping.get(case, case) class TruncateTableRows(FieldOperator): """Limits table rows to specified limit by removing excess rows via random selection. Args: rows_to_keep (int) - number of rows to keep. """ rows_to_keep: int = 10 def process_value(self, table: Any) -> Any: return self.truncate_table_rows(table_content=table) def truncate_table_rows(self, table_content: Dict): # Get rows from table rows = table_content.get("rows", []) num_rows = len(rows) # if number of rows are anyway lesser, return. if num_rows <= self.rows_to_keep: return table_content # calculate number of rows to delete, delete them rows_to_delete = num_rows - self.rows_to_keep # Randomly select rows to be deleted deleted_rows_indices = random.sample(range(len(rows)), rows_to_delete) remaining_rows = [ row for i, row in enumerate(rows) if i not in deleted_rows_indices ] table_content["rows"] = remaining_rows return table_content class SerializeTableRowAsText(InstanceOperator): """Serializes a table row as text. Args: fields (str) - list of fields to be included in serialization. to_field (str) - serialized text field name. max_cell_length (int) - limits cell length to be considered, optional. """ fields: str to_field: str max_cell_length: Optional[int] = None def process( self, instance: Dict[str, Any], stream_name: Optional[str] = None ) -> Dict[str, Any]: linearized_str = "" for field in self.fields: value = dict_get(instance, field) if self.max_cell_length is not None: truncated_value = truncate_cell(value, self.max_cell_length) if truncated_value is not None: value = truncated_value linearized_str = linearized_str + field + " is " + str(value) + ", " instance[self.to_field] = linearized_str return instance class SerializeTableRowAsList(InstanceOperator): """Serializes a table row as list. Args: fields (str) - list of fields to be included in serialization. to_field (str) - serialized text field name. max_cell_length (int) - limits cell length to be considered, optional. """ fields: str to_field: str max_cell_length: Optional[int] = None def process( self, instance: Dict[str, Any], stream_name: Optional[str] = None ) -> Dict[str, Any]: linearized_str = "" for field in self.fields: value = dict_get(instance, field) if self.max_cell_length is not None: truncated_value = truncate_cell(value, self.max_cell_length) if truncated_value is not None: value = truncated_value linearized_str = linearized_str + field + ": " + str(value) + ", " instance[self.to_field] = linearized_str return instance class SerializeTriples(FieldOperator): """Serializes triples into a flat sequence. Sample input in expected format: [[ "First Clearing", "LOCATION", "On NYS 52 1 Mi. Youngsville" ], [ "On NYS 52 1 Mi. Youngsville", "CITY_OR_TOWN", "Callicoon, New York"]] Sample output: First Clearing : LOCATION : On NYS 52 1 Mi. Youngsville | On NYS 52 1 Mi. Youngsville : CITY_OR_TOWN : Callicoon, New York """ def process_value(self, tripleset: Any) -> Any: return self.serialize_triples(tripleset) def serialize_triples(self, tripleset) -> str: return " | ".join( f"{subj} : {rel.lower()} : {obj}" for subj, rel, obj in tripleset ) class SerializeKeyValPairs(FieldOperator): """Serializes key, value pairs into a flat sequence. Sample input in expected format: {"name": "Alex", "age": 31, "sex": "M"} Sample output: name is Alex, age is 31, sex is M """ def process_value(self, kvpairs: Any) -> Any: return self.serialize_kvpairs(kvpairs) def serialize_kvpairs(self, kvpairs) -> str: serialized_str = "" for key, value in kvpairs.items(): serialized_str += f"{key} is {value}, " # Remove the trailing comma and space then return return serialized_str[:-2] class ListToKeyValPairs(InstanceOperator): """Maps list of keys and values into key:value pairs. Sample input in expected format: {"keys": ["name", "age", "sex"], "values": ["Alex", 31, "M"]} Sample output: {"name": "Alex", "age": 31, "sex": "M"} """ fields: List[str] to_field: str def process( self, instance: Dict[str, Any], stream_name: Optional[str] = None ) -> Dict[str, Any]: keylist = dict_get(instance, self.fields[0]) valuelist = dict_get(instance, self.fields[1]) output_dict = {} for key, value in zip(keylist, valuelist): output_dict[key] = value instance[self.to_field] = output_dict return instance class ConvertTableColNamesToSequential(FieldOperator): """Replaces actual table column names with static sequential names like col_0, col_1,... Sample input: { "header": ["name", "age"], "rows": [["Alex", 21], ["Donald", 34]] } Sample output: { "header": ["col_0", "col_1"], "rows": [["Alex", 21], ["Donald", 34]] } """ def process_value(self, table: Any) -> Any: table_input = recursive_copy(table) return self.replace_header(table_content=table_input) # replaces header with sequential column names def replace_header(self, table_content: Dict) -> str: # Extract header from the dictionary header = table_content.get("header", []) assert header, "Input table missing header" new_header = ["col_" + str(i) for i in range(len(header))] table_content["header"] = new_header return table_content class ShuffleTableRows(FieldOperator): """Shuffles the input table rows randomly. Sample Input: { "header": ["name", "age"], "rows": [["Alex", 26], ["Raj", 34], ["Donald", 39]], } Sample Output: { "header": ["name", "age"], "rows": [["Donald", 39], ["Raj", 34], ["Alex", 26]], } """ def process_value(self, table: Any) -> Any: table_input = recursive_copy(table) return shuffle_rows(table_input) class ShuffleTableColumns(FieldOperator): """Shuffles the table columns randomly. Sample Input: { "header": ["name", "age"], "rows": [["Alex", 26], ["Raj", 34], ["Donald", 39]], } Sample Output: { "header": ["age", "name"], "rows": [[26, "Alex"], [34, "Raj"], [39, "Donald"]], } """ def process_value(self, table: Any) -> Any: table_input = recursive_copy(table) return shuffle_columns(table_input) class LoadJson(FieldOperator): failure_value: Any = None allow_failure: bool = False def process_value(self, value: str) -> Any: if self.allow_failure: try: return json.loads(value) except json.JSONDecodeError: return self.failure_value else: return json.loads(value) class DumpJson(FieldOperator): def process_value(self, value: str) -> str: return json.dumps(value) class MapHTMLTableToJSON(FieldOperator): """Converts HTML table format to the basic one (JSON). JSON format { "header": ["col1", "col2"], "rows": [["row11", "row12"], ["row21", "row22"], ["row31", "row32"]] } """ _requirements_list = ["bs4"] def process_value(self, table: Any) -> Any: return self.truncate_table_rows(table_content=table) def truncate_table_rows(self, table_content: str) -> Dict: from bs4 import BeautifulSoup soup = BeautifulSoup(table_content, "html.parser") # Extract header header = [] header_cells = soup.find("thead").find_all("th") for cell in header_cells: header.append(cell.get_text()) # Extract rows rows = [] for row in soup.find("tbody").find_all("tr"): row_data = [] for cell in row.find_all("td"): row_data.append(cell.get_text()) rows.append(row_data) # return dictionary return {"header": header, "rows": rows} class MapTableListsToStdTableJSON(FieldOperator): """Converts lists table format to the basic one (JSON). JSON format { "header": ["col1", "col2"], "rows": [["row11", "row12"], ["row21", "row22"], ["row31", "row32"]] } """ def process_value(self, table: Any) -> Any: return self.map_tablelists_to_stdtablejson_util(table_content=table) def map_tablelists_to_stdtablejson_util(self, table_content: str) -> Dict: return {"header": table_content[0], "rows": table_content[1:]} class ConstructTableFromRowsCols(InstanceOperator): """Maps column and row field into single table field encompassing both header and rows. field[0] = header string as List field[1] = rows string as List[List] field[2] = table caption string(optional) """ fields: List[str] to_field: str def process( self, instance: Dict[str, Any], stream_name: Optional[str] = None ) -> Dict[str, Any]: header = dict_get(instance, self.fields[0]) rows = dict_get(instance, self.fields[1]) if len(self.fields) >= 3: caption = instance[self.fields[2]] else: caption = None import ast header_processed = ast.literal_eval(header) rows_processed = ast.literal_eval(rows) output_dict = {"header": header_processed, "rows": rows_processed} if caption is not None: output_dict["caption"] = caption instance[self.to_field] = output_dict return instance class TransposeTable(FieldOperator): """Transpose a table. Sample Input: { "header": ["name", "age", "sex"], "rows": [["Alice", 26, "F"], ["Raj", 34, "M"], ["Donald", 39, "M"]], } Sample Output: { "header": [" ", "0", "1", "2"], "rows": [["name", "Alice", "Raj", "Donald"], ["age", 26, 34, 39], ["sex", "F", "M", "M"]], } """ def process_value(self, table: Any) -> Any: return self.transpose_table(table) def transpose_table(self, table: Dict) -> Dict: # Extract the header and rows from the table object header = table["header"] rows = table["rows"] # Transpose the table by converting rows as columns and vice versa transposed_header = [" "] + [str(i) for i in range(len(rows))] transposed_rows = [ [header[i]] + [row[i] for row in rows] for i in range(len(header)) ] return {"header": transposed_header, "rows": transposed_rows} class DuplicateTableRows(FieldOperator): """Duplicates specific rows of a table for the given number of times. Args: row_indices (List[int]) - rows to be duplicated times(int) - how many times to duplicate """ row_indices: List[int] = [] times: int = 1 def process_value(self, table: Any) -> Any: # Extract the header and rows from the table header = table["header"] rows = table["rows"] # Duplicate only the specified rows duplicated_rows = [] for i, row in enumerate(rows): if i in self.row_indices: duplicated_rows.extend( [row] * self.times ) # Duplicate the selected rows else: duplicated_rows.append(row) # Leave other rows unchanged # Return the new table with selectively duplicated rows return {"header": header, "rows": duplicated_rows} class DuplicateTableColumns(FieldOperator): """Duplicates specific columns of a table for the given number of times. Args: column_indices (List[int]) - columns to be duplicated times(int) - how many times to duplicate """ column_indices: List[int] = [] times: int = 1 def process_value(self, table: Any) -> Any: # Extract the header and rows from the table header = table["header"] rows = table["rows"] # Duplicate the specified columns in the header duplicated_header = [] for i, col in enumerate(header): if i in self.column_indices: duplicated_header.extend([col] * self.times) else: duplicated_header.append(col) # Duplicate the specified columns in each row duplicated_rows = [] for row in rows: new_row = [] for i, value in enumerate(row): if i in self.column_indices: new_row.extend([value] * self.times) else: new_row.append(value) duplicated_rows.append(new_row) # Return the new table with selectively duplicated columns return {"header": duplicated_header, "rows": duplicated_rows} class InsertEmptyTableRows(FieldOperator): """Inserts empty rows in a table randomly for the given number of times. Args: times(int) - how many times to insert """ times: int = 0 def process_value(self, table: Any) -> Any: # Extract the header and rows from the table header = table["header"] rows = table["rows"] # Insert empty rows at random positions for _ in range(self.times): empty_row = [""] * len( header ) # Create an empty row with the same number of columns insert_pos = random.randint( 0, len(rows) ) # Get a random position to insert the empty row created rows.insert(insert_pos, empty_row) # Return the modified table return {"header": header, "rows": rows}