# %% import os import sys # Change the current working directory to the directory where the script is located #__file__ = current_dir = os.path.dirname(os.path.abspath(__file__)) os.chdir(current_dir) # %% # import requests # from bs4 import BeautifulSoup # from urllib.parse import urljoin # import time # import concurrent.futures # from queue import Queue # from threading import Lock # def fetch_and_parse_links(url, base_url): # try: # response = requests.get(url, timeout=10) # response.raise_for_status() # soup = BeautifulSoup(response.content, 'html.parser') # main_div = soup.find('div', id='main') # if not main_div: # print(f"No div with id='main' found in {url}") # return [] # links = main_div.find_all('a', href=True) # paths = [] # for link in links: # href = urljoin(url, link['href']) # if href.startswith(base_url) and '#' not in href: # path = href[len(base_url):].strip("/") # if path and path not in paths: # paths.append(path) # return paths # except requests.RequestException as e: # print(f"Error fetching {url}: {e}") # return [] # def worker(base_url, to_visit_queue, visited_paths, unvisited_paths, tuples_list, lock): # while True: # current_path = to_visit_queue.get() # if current_path is None: # break # with lock: # if current_path in visited_paths: # to_visit_queue.task_done() # continue # visited_paths.add(current_path) # current_url = urljoin(base_url, current_path) # print(f"Visiting: {current_url}") # new_paths = fetch_and_parse_links(current_url, base_url) # with lock: # for new_path in new_paths: # if new_path not in visited_paths: # to_visit_queue.put(new_path) # unvisited_paths.add(new_path) # from_url = f"{base_url}{current_path}" # to_url = f"{base_url}{new_path}" # new_tuple = (from_url, to_url) # if new_tuple not in tuples_list: # tuples_list.append(new_tuple) # if current_path in unvisited_paths: # unvisited_paths.remove(current_path) # to_visit_queue.task_done() # time.sleep(1) # Be polite to the server # def create_tuples_from_paths(base_url, max_workers=5): # visited_paths = set() # unvisited_paths = set() # tuples_list = [] # to_visit_queue = Queue() # lock = Lock() # to_visit_queue.put("") # Start with an empty string to represent the root # with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor: # futures = [] # for _ in range(max_workers): # future = executor.submit(worker, base_url, to_visit_queue, visited_paths, unvisited_paths, tuples_list, lock) # futures.append(future) # to_visit_queue.join() # for _ in range(max_workers): # to_visit_queue.put(None) # concurrent.futures.wait(futures) # return tuples_list, visited_paths, unvisited_paths # # Define the base URL # base_url = "https://www.gov.br/governodigital/pt-br/acessibilidade-e-usuario/atendimento-gov.br/" # import json # def load_json(file_path): # with open(file_path, 'r', encoding='utf-8') as file: # return json.load(file) # def flatten_list(nested_list): # for item in nested_list: # if isinstance(item, list): # yield from flatten_list(item) # Recursively yield from nested lists # else: # yield item # import polars as pl # # Define the base URL # base_url = 'https://www.gov.br/governodigital/pt-br/' # # Example usage # file_path = 'memory/graph_data_tiplet.json' # Replace with your actual file path # base_url = 'https://www.gov.br/governodigital/pt-br/' # json_data = load_json(file_path) # json_data = list(flatten_list(json_data)) # # Convert the list of URLs to a Polars DataFrame # df = pl.DataFrame({ # 'url': json_data # }) # # Remove the base URL and convert to path # df = df.with_columns( # (pl.col('url').str.replace(base_url, '')).alias('path') # ) # # Extract paths as a list # paths = df['path'].to_list() # # Build a hierarchical structure # def build_tree(paths): # tree = {} # for path in paths: # parts = path.strip('/').split('/') # current_level = tree # for part in parts: # if part not in current_level: # current_level[part] = {} # current_level = current_level[part] # return tree #%% from utils.llm import chat from utils.file import File import json system = File("prompts/system.md") knowledge = File("prompts/knowledge.md") graph = File("interface/visualization.html") graph_data = File("memory/graph_data.json") # user_question = input("Question?") # messages = [ # { # "role": "system", # "content": [ # { # "type": "text", # "text": system # } # ] # }, # { # "role": "user", # "content": [ # { # "type": "text", # "text": user_question # } # ] # } # ] def pipeline(messages): res = chat(messages=messages) response = res.choices[0].message.content return response # if __name__ == "__main__": # res = chat(messages=messages) # response = res.choices[0].message.content # print(response) #%% # from IPython.display import display, Markdown # def build_tree_structure(tree, indent=0): # """ # Recursively builds a string representation of the tree structure. # Args: # tree (dict): The hierarchical tree structure. # indent (int): The current level of indentation. # Returns: # str: A string representing the tree structure. # """ # result = "" # for key, subtree in tree.items(): # result += f"{' ' * indent} - {key}/\n" # if isinstance(subtree, dict): # result += build_tree_structure(subtree, indent + 1) # return result # # Create and print the hierarchical structure # tree_structure = build_tree(paths) # obj = build_tree_structure(tree_structure) # print(obj) # display(Markdown(obj)) # # print(json.dumps(tree_structure, indent=2)) # #%% # # Create tuples from paths and track visited/unvisited paths # tuples_list, visited_paths, unvisited_paths = create_tuples_from_paths(base_url, 10) # # Print the resulting list of tuples # print("\nTuples:") # for t in tuples_list: # print(t) # # Print visited and unvisited paths # print("\nVisited Paths:") # for p in visited_paths: # print(f"{base_url}{p}") # print("\nUnvisited Paths:") # for p in unvisited_paths: # print(f"{base_url}{p}") # # Print summary # print(f"\nTotal links found: {len(tuples_list)}") # print(f"Visited pages: {len(visited_paths)}") # print(f"Unvisited pages: {len(unvisited_paths)}") # # Create a dictionary to hold our graph data # graph_data = { # "nodes": [], # "edges": [] # } # import json # # Create a set to keep track of nodes we've added # added_nodes = set() # # Process the tuples to create nodes and edges # for from_url, to_url in tuples_list: # from_path = from_url[len(base_url):].strip("/") or "root" # to_path = to_url[len(base_url):].strip("/") # if from_path not in added_nodes: # graph_data["nodes"].append({"id": from_path, "label": from_path}) # added_nodes.add(from_path) # if to_path not in added_nodes: # graph_data["nodes"].append({"id": to_path, "label": to_path}) # added_nodes.add(to_path) # graph_data["edges"].append({"from": from_path, "to": to_path}) # # Save the graph data to a JSON file # with open('graph_data.json', 'w') as f: # json.dump(graph_data, f) # # Save the graph data to a JSON file # with open('graph_data_tiplet.json', 'w') as f: # json.dump(tuples_list, f) # print("Graph data saved to graph_data.json") # # %% # import requests # from bs4 import BeautifulSoup # from markdownify import markdownify as md # import os # os.chdir("/home/zuz/Projetos/LAMFO/SGD/prototipo01_atendimento_govBR") # from Banco_de_Dados.Estruturado.data2json import format_for_markdown # # URL da página web # url = "https://www.gov.br/governodigital/pt-br/acessibilidade-e-usuario/atendimento-gov.br" # url = "https://www.gov.br/governodigital/pt-br/acessibilidade-e-usuario/atendimento-gov.br/atendimento-presencial" # url = "https://www.gov.br/governodigital/pt-br/acessibilidade-e-usuario/atendimento-gov.br/duvidas-na-conta-gov.br" # url = "https://www.gov.br/governodigital/pt-br/acessibilidade-e-usuario/atendimento-gov.br/duvidas-na-conta-gov.br/recuperar-conta-gov.br" # # Obter o HTML da página # response = requests.get(url) # html_content = response.text # # Usar BeautifulSoup para analisar o HTML # soup = BeautifulSoup(html_content, 'html.parser') # # Extrair o conteúdo da div com id 'main' # main_div = soup.find('div', id='main') # a = format_for_markdown(main_div) # print(a) # if main_div: # # Converter o conteúdo da div para Markdown # markdown_content = md(str(main_div)) # # Remover quebras de linha extras (\n\n) # markdown_content = "\n".join([line for line in markdown_content.split("\n\n") if line.strip()]) # print(markdown_content) # # Salvar o conteúdo em Markdown em um arquivo # with open("main_content.md", "w", encoding="utf-8") as file: # file.write(markdown_content) # print("Conversão concluída e salva em 'main_content.md'.") # else: # print("Div com id 'main' não encontrada.") # # %% # import requests # def pipeline(): # # url = input("website: ") # url = "https://www.gov.br/governodigital/pt-br/acessibilidade-e-usuario/atendimento-gov.br" # response = requests.get(url).text # print(response) # import os # def print_directory_structure(path, level=0): # if not os.path.isdir(path): # print(f"{path} is not a valid directory.") # return # prefix = ' ' * 4 * level + '|-- ' # print(prefix + os.path.basename(path) + '/') # for item in os.listdir(path): # item_path = os.path.join(path, item) # if os.path.isdir(item_path): # print_directory_structure(item_path, level + 1) # else: # print(' ' * 4 * (level + 1) + '|-- ' + item) # # Replace 'your_path_here' with the path you want to print # your_path_here = '/home/zuz/Projetos/LAMFO/SGD/prototipo01_atendimento_govBR/AI_agent' # print_directory_structure(your_path_here) # if __name__ == "__main__": # pipeline()