Spaces:
Sleeping
Sleeping
File size: 7,482 Bytes
69a077e 5217d0f 69a077e 5217d0f 69a077e |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 |
import asyncio
from typing import Dict, Optional,List
from html_loader import HTMLLoader
from data_extractor import DataExtractor
from dom_analyzer import DOMAnalyzer
from mongo_storage import MongoStorage
# from storage.neo4j_storage import Neo4jStorage
from settings import settings
class WebScrapingOrchestrator:
def __init__(self):
self.data_extractor = DataExtractor()
self.dom_analyzer = DOMAnalyzer()
self.mongo_storage = MongoStorage()
# self.neo4j_storage = Neo4jStorage()
async def process_url(self, url: str) -> Dict:
"""Complete pipeline to process a URL for LLM consumption"""
try:
print(f"Processing URL: {url}")
# Step 1: Load HTML content
async with HTMLLoader() as loader:
html_data = await loader.load_page(url)
if not html_data:
return {"error": "Failed to load page"}
print("β HTML loaded successfully")
# Step 2: Extract structured data
extracted_data = self.data_extractor.extract_structured_data(
html_data["html"],
html_data["url"]
)
print("β Data extracted successfully")
# Step 3: Analyze DOM structure
dom_structure = self.dom_analyzer.analyze_structure(html_data["html"])
print("β DOM structure analyzed")
# Step 4: Store in MongoDB
mongo_id = self.mongo_storage.store_page_data(
html_data["url"],
extracted_data,
dom_structure
)
print("β Data stored in MongoDB")
# Step 5: Store relationships in Neo4j
# self.neo4j_storage.store_relationships(
# html_data["url"],
# extracted_data,
# dom_structure
# )
print("β Relationships stored in Neo4j")
# Return LLM-ready summary
return {
"success": True,
"url": html_data["url"],
"title": html_data["title"],
"mongo_id": mongo_id,
"summary": {
"content_blocks": len(extracted_data["content"]),
"text_length": len(extracted_data["text_summary"]),
"links_found": len(extracted_data["links"]),
"images_found": len(extracted_data["images"]),
"dom_depth": dom_structure["statistics"]["max_depth"],
"content_type": self._identify_content_type(extracted_data)
},
"llm_ready_data": {
"text_summary": extracted_data["text_summary"],
"key_headings": [h["text"] for h in extracted_data["metadata"]["headings"][:5]],
"main_topics": self._extract_main_topics(extracted_data),
"study_hints": self._generate_study_hints(extracted_data, dom_structure)
}
}
except Exception as e:
print(f"β Error processing {url}: {str(e)}")
return {"error": str(e), "url": url}
def agent_for_chat():
pass
def get_page_for_llm(self, url: str) -> Optional[Dict]:
"""Retrieve page data optimized for LLM consumption"""
# Get from MongoDB
mongo_data = self.mongo_storage.get_page_data(url)
if not mongo_data:
return None
# Get relationships from Neo4j
# neo4j_data = self.neo4j_storage.get_page_relationships(url)
# Combine for LLM
return {
"content": mongo_data["content"]["text_summary"],
"title": mongo_data["title"],
"headings": [h["text"] for h in mongo_data["content"]["headings"]],
"structure": mongo_data["study_metadata"],
"relationships": {
"related_pages": mongo_data.get("internal_links", [])[:5],
"external_references": mongo_data.get("external_links", [])[:3]
},
"study_metadata": mongo_data["study_metadata"]
}
def search_for_llm(self, query: str, limit: int = 5) -> List[Dict]:
"""Search content for LLM context"""
results = self.mongo_storage.search_pages(query, limit)
llm_ready_results = []
for result in results:
llm_ready_results.append({
"url": result["url"],
"title": result["title"],
"summary": result["content"]["text_summary"][:500],
"content_type": result["study_metadata"]["content_type"],
"complexity": result["study_metadata"]["complexity_score"],
"key_topics": result["study_metadata"]["key_topics"][:5]
})
return llm_ready_results
def _identify_content_type(self, data: Dict) -> str:
"""Identify content type for processing hints"""
title = data["metadata"]["title"].lower()
text = data["text_summary"].lower()
if any(word in title for word in ["tutorial", "guide", "how to"]):
return "tutorial"
elif any(word in title for word in ["documentation", "docs", "api"]):
return "documentation"
elif any(word in title for word in ["blog", "article", "news"]):
return "article"
elif any(word in text for word in ["research", "study", "analysis"]):
return "research"
return "general"
def _extract_main_topics(self, data: Dict) -> List[str]:
"""Extract main topics for LLM understanding"""
topics = set()
# From title
title_words = [word for word in data["metadata"]["title"].split() if len(word) > 3]
topics.update(title_words[:3])
# From headings
for heading in data["metadata"]["headings"][:3]:
heading_words = [word for word in heading["text"].split() if len(word) > 3]
topics.update(heading_words[:2])
return list(topics)[:5]
def _generate_study_hints(self, extracted_data: Dict, dom_structure: Dict) -> Dict:
"""Generate study hints for LLM processing"""
return {
"difficulty_level": "beginner" if len(extracted_data["text_summary"]) < 2000 else "intermediate",
"estimated_study_time": f"{len(extracted_data['text_summary'].split()) // 250} minutes",
"content_structure": "well_structured" if len(extracted_data["metadata"]["headings"]) > 3 else "basic",
"has_examples": "code" in extracted_data["text_summary"].lower(),
"interactive_elements": dom_structure["statistics"]["tag_distribution"].get("form", 0) > 0
}
def close_connections(self):
"""Close all database connections"""
# self.neo4j_storage.close()
# Main execution function
async def main():
orchestrator = WebScrapingOrchestrator()
# Example usage
test_url = "https://en.wikipedia.org/wiki/Virat_Kohli"
result = await orchestrator.process_url(test_url)
print(f"Processing result: {result}")
# Clean up
orchestrator.close_connections()
if __name__ == "__main__":
asyncio.run(main()) |