| |
| """ |
| Implements the NQL (Natural Query Language) Query Agent for Tensorus. |
| |
| This agent provides a basic natural language interface to query datasets |
| stored in TensorStorage. It uses regular expressions to parse simple queries |
| and translates them into calls to TensorStorage.query or other methods. |
| |
| Limitations (without LLM): |
| - Understands only a very limited set of predefined sentence structures. |
| - Limited support for complex conditions (AND/OR not implemented). |
| - Limited support for data types in conditions (primarily numbers and exact strings). |
| - No support for aggregations (mean, sum, etc.) beyond simple counts. |
| - Error handling for parsing ambiguity is basic. |
| |
| Future Enhancements: |
| - Integrate a local or remote LLM for robust NLU. |
| - Support for complex queries (multiple conditions, joins). |
| - Support for aggregations and projections (selecting specific fields). |
| - More sophisticated error handling and user feedback. |
| - Context awareness and conversation history. |
| """ |
|
|
| import re |
| import logging |
| import torch |
| from typing import List, Dict, Any, Optional, Callable, Tuple |
|
|
| from tensor_storage import TensorStorage |
|
|
| |
| logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') |
| logger = logging.getLogger(__name__) |
|
|
| class NQLAgent: |
| """Parses simple natural language queries and executes them against TensorStorage.""" |
|
|
| def __init__(self, tensor_storage: TensorStorage): |
| """ |
| Initializes the NQL Agent. |
| |
| Args: |
| tensor_storage: An instance of the TensorStorage class. |
| """ |
| if not isinstance(tensor_storage, TensorStorage): |
| raise TypeError("tensor_storage must be an instance of TensorStorage") |
| self.tensor_storage = tensor_storage |
|
|
| |
| |
| self.pattern_get_all = re.compile( |
| r"^(?:get|show|find)\s+(?:all\s+)?(?:data|tensors?|records?|entries|experiences?)\s+from\s+(?:dataset\s+)?([\w_.-]+)$", |
| re.IGNORECASE |
| ) |
|
|
| |
| |
| |
| self.pattern_filter_meta = re.compile( |
| r"^(?:get|show|find)\s+.*\s+from\s+([\w_.-]+)\s+where\s+([\w_.-]+)\s*([<>=!]+)\s*'?([\w\s\d_.-]+?)'?$", |
| re.IGNORECASE |
| ) |
| |
| self.pattern_filter_meta_alt = re.compile( |
| r"^(?:get|show|find)\s+.*\s+from\s+([\w_.-]+)\s+where\s+([\w_.-]+)\s+(?:is|equals|eq)\s+'?([\w\s\d_.-]+?)'?$", |
| re.IGNORECASE |
| ) |
|
|
|
|
| |
| |
| self.pattern_filter_tensor = re.compile( |
| r"^(?:get|show|find)\s+.*\s+from\s+([\w_.-]+)\s+where\s+(?:tensor|value)\s*(?:\[(\d+)\])?\s*([<>=!]+)\s*([\d.-]+)$", |
| re.IGNORECASE |
| ) |
| |
| |
|
|
| |
| self.pattern_count = re.compile( |
| r"^count\s+(?:records?|entries|experiences?)\s+(?:in|from)\s+(?:dataset\s+)?([\w_.-]+)$", |
| re.IGNORECASE |
| ) |
|
|
| logger.info("NQLAgent initialized with basic regex patterns.") |
|
|
|
|
| def _parse_operator_and_value(self, op_str: str, val_str: str) -> Tuple[Callable, Any]: |
| """Attempts to parse operator string and convert value string to number if possible.""" |
| val_str = val_str.strip() |
| op_map = { |
| '=': lambda a, b: a == b, |
| '==': lambda a, b: a == b, |
| '!=': lambda a, b: a != b, |
| '<': lambda a, b: a < b, |
| '<=': lambda a, b: a <= b, |
| '>': lambda a, b: a > b, |
| '>=': lambda a, b: a >= b, |
| } |
|
|
| op_func = op_map.get(op_str) |
| if op_func is None: |
| raise ValueError(f"Unsupported operator: {op_str}") |
|
|
| |
| try: |
| value = float(val_str) |
| if value.is_integer(): |
| value = int(value) |
| except ValueError: |
| |
| value = val_str |
|
|
| return op_func, value |
|
|
|
|
| def process_query(self, query: str) -> Dict[str, Any]: |
| """ |
| Processes a natural language query string. |
| |
| Args: |
| query: The natural language query. |
| |
| Returns: |
| A dictionary containing: |
| 'success': bool, indicating if the query was processed. |
| 'message': str, status message or error description. |
| 'count': Optional[int], number of results found. |
| 'results': Optional[List[Dict]], the list of matching records |
| (each a dict with 'tensor' and 'metadata'). |
| """ |
| query = query.strip() |
| logger.info(f"Processing NQL query: '{query}'") |
|
|
| |
|
|
| |
| match = self.pattern_count.match(query) |
| if match: |
| dataset_name = match.group(1) |
| logger.debug(f"Matched COUNT pattern for dataset '{dataset_name}'") |
| try: |
| |
| |
| results = self.tensor_storage.get_dataset_with_metadata(dataset_name) |
| count = len(results) |
| return { |
| "success": True, |
| "message": f"Found {count} records in dataset '{dataset_name}'.", |
| "count": count, |
| "results": None |
| } |
| except ValueError as e: |
| logger.error(f"Error during COUNT query: {e}") |
| return {"success": False, "message": str(e), "count": None, "results": None} |
| except Exception as e: |
| logger.error(f"Unexpected error during COUNT query: {e}", exc_info=True) |
| return {"success": False, "message": f"An unexpected error occurred: {e}", "count": None, "results": None} |
|
|
|
|
| |
| match = self.pattern_get_all.match(query) |
| if match: |
| dataset_name = match.group(1) |
| logger.debug(f"Matched GET ALL pattern for dataset '{dataset_name}'") |
| try: |
| results = self.tensor_storage.get_dataset_with_metadata(dataset_name) |
| count = len(results) |
| return { |
| "success": True, |
| "message": f"Retrieved {count} records from dataset '{dataset_name}'.", |
| "count": count, |
| "results": results |
| } |
| except ValueError as e: |
| logger.error(f"Error during GET ALL query: {e}") |
| return {"success": False, "message": str(e), "count": None, "results": None} |
| except Exception as e: |
| logger.error(f"Unexpected error during GET ALL query: {e}", exc_info=True) |
| return {"success": False, "message": f"An unexpected error occurred: {e}", "count": None, "results": None} |
|
|
|
|
| |
| match_meta = self.pattern_filter_meta.match(query) |
| if not match_meta: |
| match_meta = self.pattern_filter_meta_alt.match(query) |
| if match_meta: |
| |
| dataset_name = match_meta.group(1) |
| key = match_meta.group(2) |
| op_str = '=' |
| val_str = match_meta.group(3) |
| logger.debug(f"Matched FILTER META ALT pattern: dataset='{dataset_name}', key='{key}', op='{op_str}', value='{val_str}'") |
| else: |
| match_meta = None |
| else: |
| |
| dataset_name = match_meta.group(1) |
| key = match_meta.group(2) |
| op_str = match_meta.group(3) |
| val_str = match_meta.group(4) |
| logger.debug(f"Matched FILTER META pattern: dataset='{dataset_name}', key='{key}', op='{op_str}', value='{val_str}'") |
|
|
|
|
| if match_meta: |
| try: |
| op_func, filter_value = self._parse_operator_and_value(op_str, val_str) |
|
|
| |
| def query_fn_meta(tensor: torch.Tensor, metadata: Dict[str, Any]) -> bool: |
| actual_value = metadata.get(key) |
| if actual_value is None: |
| return False |
|
|
| |
| |
| try: |
| if isinstance(filter_value, (int, float)) and not isinstance(actual_value, (int, float)): |
| actual_value = type(filter_value)(actual_value) |
| elif isinstance(filter_value, str) and not isinstance(actual_value, str): |
| actual_value = str(actual_value) |
| except (ValueError, TypeError): |
| return False |
|
|
| try: |
| return op_func(actual_value, filter_value) |
| except TypeError: |
| |
| return False |
| except Exception as e_inner: |
| logger.warning(f"Error during query_fn execution for key '{key}': {e_inner}") |
| return False |
|
|
| results = self.tensor_storage.query(dataset_name, query_fn_meta) |
| count = len(results) |
| return { |
| "success": True, |
| "message": f"Query executed successfully. Found {count} matching records.", |
| "count": count, |
| "results": results |
| } |
|
|
| except ValueError as e: |
| logger.error(f"Error processing FILTER META query: {e}") |
| return {"success": False, "message": str(e), "count": None, "results": None} |
| except Exception as e: |
| logger.error(f"Unexpected error during FILTER META query: {e}", exc_info=True) |
| return {"success": False, "message": f"An unexpected error occurred: {e}", "count": None, "results": None} |
|
|
|
|
| |
| match = self.pattern_filter_tensor.match(query) |
| if match: |
| dataset_name = match.group(1) |
| index_str = match.group(2) |
| op_str = match.group(3) |
| val_str = match.group(4) |
| logger.debug(f"Matched FILTER TENSOR pattern: dataset='{dataset_name}', index='{index_str}', op='{op_str}', value='{val_str}'") |
|
|
| try: |
| op_func, filter_value = self._parse_operator_and_value(op_str, val_str) |
| if not isinstance(filter_value, (int, float)): |
| raise ValueError(f"Tensor value filtering currently only supports numeric comparisons. Got value: {filter_value}") |
|
|
| tensor_index: Optional[int] = None |
| if index_str is not None: |
| tensor_index = int(index_str) |
|
|
| |
| def query_fn_tensor(tensor: torch.Tensor, metadata: Dict[str, Any]) -> bool: |
| try: |
| if tensor_index is None: |
| |
| |
| |
| if tensor.numel() > 0: |
| actual_value = tensor.view(-1)[0].item() |
| else: |
| return False |
| else: |
| |
| if tensor_index >= tensor.numel(): |
| return False |
| actual_value = tensor.view(-1)[tensor_index].item() |
|
|
| |
| return op_func(actual_value, filter_value) |
|
|
| except IndexError: |
| return False |
| except Exception as e_inner: |
| logger.warning(f"Error during query_fn_tensor execution: {e_inner}") |
| return False |
|
|
| results = self.tensor_storage.query(dataset_name, query_fn_tensor) |
| count = len(results) |
| return { |
| "success": True, |
| "message": f"Query executed successfully. Found {count} matching records.", |
| "count": count, |
| "results": results |
| } |
|
|
| except ValueError as e: |
| logger.error(f"Error processing FILTER TENSOR query: {e}") |
| return {"success": False, "message": str(e), "count": None, "results": None} |
| except Exception as e: |
| logger.error(f"Unexpected error during FILTER TENSOR query: {e}", exc_info=True) |
| return {"success": False, "message": f"An unexpected error occurred: {e}", "count": None, "results": None} |
|
|
|
|
| |
| logger.warning(f"Query did not match any known patterns: '{query}'") |
| return { |
| "success": False, |
| "message": "Sorry, I couldn't understand that query. Try simple commands like 'get all data from my_dataset' or 'find records from my_dataset where key = value'.", |
| "count": None, |
| "results": None |
| } |
|
|
|
|
| |
| if __name__ == "__main__": |
| print("--- Starting NQL Agent Example ---") |
|
|
| |
| storage = TensorStorage() |
| storage.create_dataset("sensor_data") |
| storage.create_dataset("rl_experiences_test") |
|
|
| storage.insert("sensor_data", torch.tensor([10.5, 25.2]), metadata={"sensor_id": "A001", "location": "floor1", "status":"active"}) |
| storage.insert("sensor_data", torch.tensor([12.1, 26.8]), metadata={"sensor_id": "A002", "location": "floor1", "status":"active"}) |
| storage.insert("sensor_data", torch.tensor([-5.0, 24.1]), metadata={"sensor_id": "B001", "location": "floor2", "status":"inactive"}) |
|
|
| |
| storage.insert("rl_experiences_test", torch.tensor([1.0]), metadata={"state_id": "s1", "action": 0, "reward": -1.5, "next_state_id": "s2", "done": 0}) |
| storage.insert("rl_experiences_test", torch.tensor([1.0]), metadata={"state_id": "s2", "action": 1, "reward": 5.2, "next_state_id": "s3", "done": 0}) |
| storage.insert("rl_experiences_test", torch.tensor([1.0]), metadata={"state_id": "s3", "action": 0, "reward": -8.0, "next_state_id": None, "done": 1}) |
|
|
|
|
| |
| nql_agent = NQLAgent(storage) |
|
|
| |
| queries = [ |
| "get all data from sensor_data", |
| "show all records from rl_experiences_test", |
| "count records in sensor_data", |
| "find tensors from sensor_data where sensor_id = 'A001'", |
| "find data from sensor_data where location is 'floor1'", |
| "get records from sensor_data where status != 'active'", |
| "find experiences from rl_experiences_test where reward > 0", |
| "get experiences from rl_experiences_test where reward < -5", |
| "find entries from rl_experiences_test where done == 1", |
| "get records from sensor_data where value[0] > 11", |
| "find tensors from sensor_data where tensor[1] < 25", |
| "show data from sensor_data where value = -5.0", |
| "get everything from non_existent_dataset", |
| "find data from sensor_data where invalid_key = 10", |
| "give me the average sensor reading", |
| "select * from sensor_data" |
| ] |
|
|
| |
| print("\n--- Processing Queries ---") |
| for q in queries: |
| print(f"\n> Query: \"{q}\"") |
| response = nql_agent.process_query(q) |
| print(f"< Success: {response['success']}") |
| print(f"< Message: {response['message']}") |
| if response['success'] and response['results'] is not None: |
| print(f"< Count: {response['count']}") |
| |
| limit = 3 |
| for i, item in enumerate(response['results']): |
| if i >= limit: |
| print(f" ... (omitting {len(response['results']) - limit} more results)") |
| break |
| |
| tensor_str = f"Tensor(shape={item['tensor'].shape}, dtype={item['tensor'].dtype})" |
| print(f" - Result {i+1}: Metadata={item['metadata']}, Tensor={tensor_str}") |
| elif response['success'] and response['count'] is not None: |
| print(f"< Count: {response['count']}") |
|
|
|
|
| print("\n--- NQL Agent Example Finished ---") |