xNeSy / knowledge.py
AsiminaM's picture
resrtucturing of the repo
4a4f632
import os
import json
import pickle
from datetime import datetime
import rdflib
import re
import networkx as nx
# Storage file paths
KNOWLEDGE_FILE = "knowledge_graph.pkl"
BACKUP_FILE = "knowledge_backup.json"
# Global RDF graph
graph = rdflib.Graph()
# Mapping of fact IDs to triples for editing operations
fact_index = {}
def save_knowledge_graph():
try:
with open(KNOWLEDGE_FILE, 'wb') as f:
pickle.dump(graph, f)
backup_data = {
"timestamp": datetime.now().isoformat(),
"total_facts": len(graph),
"facts": []
}
for i, (s, p, o) in enumerate(graph):
backup_data["facts"].append({
"id": i+1,
"subject": str(s),
"predicate": str(p),
"object": str(o)
})
with open(BACKUP_FILE, 'w', encoding='utf-8') as f:
json.dump(backup_data, f, indent=2, ensure_ascii=False)
return f" Saved {len(graph)} facts to storage"
except Exception as e:
return f" Error saving knowledge: {e}"
def load_knowledge_graph():
global graph
try:
if os.path.exists(KNOWLEDGE_FILE):
with open(KNOWLEDGE_FILE, 'rb') as f:
graph = pickle.load(f)
return f"๐Ÿ“‚ Loaded {len(graph)} facts from storage"
else:
return "๐Ÿ“‚ No existing knowledge file found, starting fresh"
except Exception as e:
return f" Error loading knowledge: {e}"
def create_comprehensive_backup():
try:
backup_data = {
"metadata": {
"timestamp": datetime.now().isoformat(),
"total_facts": len(graph),
"backup_type": "comprehensive_knowledge_base",
"graph_size": len(graph)
},
"facts": []
}
for i, (s, p, o) in enumerate(graph):
subject = str(s).split(':')[-1] if ':' in str(s) else str(s)
predicate = str(p).split(':')[-1] if ':' in str(p) else str(p)
object_val = str(o)
backup_data["facts"].append({
"id": i + 1,
"subject": subject,
"predicate": predicate,
"object": object_val,
"full_subject": str(s),
"full_predicate": str(p),
"full_object": str(o)
})
with open(BACKUP_FILE, 'w', encoding='utf-8') as f:
json.dump(backup_data, f, indent=2, ensure_ascii=False)
except Exception:
create_error_backup("unknown")
def create_error_backup(error_message):
try:
backup_data = {
"metadata": {
"timestamp": datetime.now().isoformat(),
"total_facts": 0,
"backup_type": "error_backup",
"error": error_message
},
"facts": []
}
with open(BACKUP_FILE, 'w', encoding='utf-8') as f:
json.dump(backup_data, f, indent=2, ensure_ascii=False)
except Exception:
pass
def extract_entities(text):
entities = []
capitalized_words = re.findall(r'\b[A-Z][a-z]+\s+[A-Z][a-z]+(?:\s+[A-Z][a-z]+)?\b', text)
entities.extend(capitalized_words)
org_patterns = [
r'([A-Z][a-zA-Z\s]+)\s+(Inc|Ltd|LLC|Corp|Corporation|Company|Co\.|Ltd\.)',
r'([A-Z][a-zA-Z\s]+)\s+(University|Institute|Lab|Laboratory)',
]
for pattern in org_patterns:
matches = re.findall(pattern, text)
entities.extend([m[0].strip() for m in matches])
location_keywords = ['in ', 'at ', 'near ', 'from ']
for keyword in location_keywords:
pattern = f'{keyword}([A-Z][a-z]+(?:\s+[A-Z][a-z]+)?)'
matches = re.findall(pattern, text)
entities.extend(matches)
dates = re.findall(r'\b\d{1,2}[/-]\d{1,2}[/-]\d{2,4}\b|\b\d{4}\b', text)
entities.extend(dates)
entities = list(set([e.strip() for e in entities if len(e.strip()) > 3]))
return entities[:50]
def extract_regular_triples_improved(text, entities):
triples = []
sentences = re.split(r'[.!?\n]+', text)
for sentence in sentences:
sentence = sentence.strip()
if len(sentence) < 15:
continue
improved_patterns = [
(r'([A-Z][a-zA-Z\s]+(?:,\s+[A-Z][a-zA-Z\s]+)*)\s+(is|are|was|were|becomes|represents|means|refers to|denotes)\s+(.+)', 'relates to'),
(r'([A-Z][a-zA-Z\s]+)\s+(uses|employs|utilizes|applies)\s+(.+)', 'uses'),
(r'([A-Z][a-zA-Z\s]+)\s+(develops|created|designed|implemented)\s+(.+)', 'creates'),
(r'([A-Z][a-zA-Z\s]+)\s+(requires|needs|demands)\s+(.+)', 'requires'),
(r'([A-Z][a-zA-Z\s]+)\s+(enables|allows|permits)\s+(.+)', 'enables'),
(r'([A-Z][a-zA-Z\s]+)\s+(affects|impacts|influences|affects)\s+(.+)', 'affects'),
(r'([A-Z][a-zA-Z\s]+)\s+(found|discovered|identified|observed|detected)\s+(.+)', 'discovered'),
(r'([A-Z][a-zA-Z\s]+)\s+(studies|analyzes|examines|investigates)\s+(.+)', 'studies'),
(r'([A-Z][a-zA-Z\s]+)\s+(proposes|suggests|recommends)\s+(.+)', 'proposes'),
(r'([A-Z][a-zA-Z\s]+)\s+(results in|leads to|causes)\s+(.+)', 'causes'),
(r'([A-Z][a-zA-Z\s]+)\s+(works with|collaborates with|partnered with)\s+(.+)', 'works with'),
(r'([A-Z][a-zA-Z\s]+)\s+(located in|based in|situated in)\s+(.+)', 'located in'),
]
for pattern, predicate in improved_patterns:
match = re.search(pattern, sentence, re.IGNORECASE)
if match:
groups = match.groups()
subject = groups[0].strip() if len(groups) > 0 else ''
object_val = groups[-1].strip() if len(groups) > 1 else ''
subject = re.sub(r'^(the|a|an)\s+', '', subject, flags=re.IGNORECASE).strip()
object_val = re.sub(r'^(the|a|an)\s+', '', object_val, flags=re.IGNORECASE).strip()
if subject and object_val and len(subject) > 3 and len(object_val) > 3:
triples.append((subject, predicate, object_val))
break
clause_patterns = [
r'([A-Z][a-zA-Z\s]+)\s+which\s+(.+)',
r'([A-Z][a-zA-Z\s]+)\s+that\s+(.+)',
r'([A-Z][a-zA-Z\s]+)\s+who\s+(.+)',
]
for pattern in clause_patterns:
match = re.search(pattern, sentence)
if match:
subject = match.group(1).strip()
description = match.group(2).strip()
if subject and description and len(subject) > 3 and len(description) > 3:
triples.append((subject, 'has property', description[:150]))
return triples
def extract_structured_triples(text):
triples = []
lines = text.split('\n')
patterns = [
(r'date\s*:?\s*([0-9\/\-\.]+)', 'date', 'is'),
(r'time\s*:?\s*([0-9:]+)', 'time', 'is'),
(r'created\s*:?\s*([0-9\/\-\.]+)', 'created_date', 'is'),
(r'modified\s*:?\s*([0-9\/\-\.]+)', 'modified_date', 'is'),
(r'id\s*:?\s*([A-Z0-9\-]+)', 'id', 'is'),
(r'number\s*:?\s*([A-Z0-9\-]+)', 'number', 'is'),
(r'code\s*:?\s*([A-Z0-9\-]+)', 'code', 'is'),
(r'reference\s*:?\s*([A-Z0-9\-]+)', 'reference', 'is'),
(r'name\s*:?\s*([A-Za-z\s&.,]+)', 'name', 'is'),
(r'title\s*:?\s*([A-Za-z\s&.,]+)', 'title', 'is'),
(r'company\s*:?\s*([A-Za-z\s&.,]+)', 'company', 'is'),
(r'organization\s*:?\s*([A-Za-z\s&.,]+)', 'organization', 'is'),
(r'email\s*:?\s*([A-Za-z0-9@\.\-]+)', 'email', 'is'),
(r'phone\s*:?\s*([0-9\s\-\+\(\)]+)', 'phone', 'is'),
(r'address\s*:?\s*([A-Za-z0-9\s\-\.,]+)', 'address', 'is'),
(r'description\s*:?\s*([A-Za-z0-9\s\-\.,]+)', 'description', 'is'),
(r'type\s*:?\s*([A-Za-z0-9\s\-\.,]+)', 'type', 'is'),
(r'category\s*:?\s*([A-Za-z0-9\s\-\.,]+)', 'category', 'is'),
(r'status\s*:?\s*([A-Za-z0-9\s\-\.,]+)', 'status', 'is'),
(r'location\s*:?\s*([A-Za-z0-9\s\-\.,]+)', 'location', 'is'),
(r'department\s*:?\s*([A-Za-z0-9\s\-\.,]+)', 'department', 'is'),
(r'section\s*:?\s*([A-Za-z0-9\s\-\.,]+)', 'section', 'is'),
(r'amount\s*:?\s*\$?([0-9,]+\.?[0-9]*)', 'amount', 'is'),
(r'total\s*:?\s*\$?([0-9,]+\.?[0-9]*)', 'total', 'is'),
(r'price\s*:?\s*\$?([0-9,]+\.?[0-9]*)', 'price', 'is'),
(r'cost\s*:?\s*\$?([0-9,]+\.?[0-9]*)', 'cost', 'is'),
]
for line in lines:
line = line.strip()
if len(line) < 5:
continue
for pattern, subject, predicate in patterns:
match = re.search(pattern, line, re.IGNORECASE)
if match:
value = match.group(1).strip()
if value and len(value) > 1:
triples.append((subject, predicate, value))
break
kv_patterns = [
r'([A-Za-z\s]+):\s*([A-Za-z0-9\s\$\-\.\/,]+)',
r'([A-Za-z\s]+)\s*=\s*([A-Za-z0-9\s\$\-\.\/,]+)',
r'([A-Za-z\s]+)\s*-\s*([A-Za-z0-9\s\$\-\.\/,]+)',
]
for line in lines:
for pattern in kv_patterns:
match = re.search(pattern, line)
if match:
key = match.group(1).strip().lower().replace(' ', '_')
value = match.group(2).strip()
if len(key) > 2 and len(value) > 1:
triples.append((key, 'is', value))
return triples
def extract_regular_triples(text):
triples = []
sentences = re.split(r"[.?!\n]", text)
patterns = [
r"\s+(is|are|was|were)\s+",
r"\s+(has|have|had)\s+",
r"\s+(uses|used|using)\s+",
r"\s+(creates|created|creating)\s+",
r"\s+(develops|developed|developing)\s+",
r"\s+(leads|led|leading)\s+",
r"\s+(affects|affected|affecting)\s+",
r"\s+(contains|contained|containing)\s+",
r"\s+(includes|included|including)\s+",
r"\s+(requires|required|requiring)\s+",
r"\s+(causes|caused|causing)\s+",
r"\s+(results|resulted|resulting)\s+",
r"\s+(enables|enabled|enabling)\s+",
r"\s+(provides|provided|providing)\s+",
r"\s+(supports|supported|supporting)\s+",
r"\s+(located|situated|found)\s+",
r"\s+(connects|links|relates)\s+",
r"\s+(depends|relies|based)\s+",
r"\s+(represents|symbolizes|stands)\s+",
r"\s+(describes|explains|defines)\s+",
r"\s+(refers|referring|referenced)\s+",
r"\s+(concerns|concerning|concerned)\s+",
r"\s+(relates|relating|related)\s+",
]
for sentence in sentences:
sentence = sentence.strip()
if len(sentence) < 10:
continue
for pattern in patterns:
parts = re.split(pattern, sentence, maxsplit=1)
if len(parts) == 3:
subj, pred, obj = parts
subj = re.sub(r'^(the|a|an)\s+', '', subj.strip(), flags=re.IGNORECASE)
obj = re.sub(r'^(the|a|an)\s+', '', obj.strip(), flags=re.IGNORECASE)
if subj and pred and obj and len(subj) > 2 and len(obj) > 2:
triples.append((subj, pred.strip(), obj))
break
return triples
def extract_triples(text):
triples = []
entities = extract_entities(text)
for entity in entities:
triples.append((entity, 'type', 'entity'))
triples.extend(extract_structured_triples(text))
triples.extend(extract_regular_triples_improved(text, entities))
triples.extend(extract_regular_triples(text))
unique_triples = []
for s, p, o in triples:
if s and p and o and len(s) > 2 and len(p) > 1 and len(o) > 2:
s = s.strip()[:100]
p = p.strip()[:50]
o = o.strip()[:200]
if (s, p, o) not in unique_triples:
unique_triples.append((s, p, o))
return unique_triples
def add_to_graph(text):
new_triples = extract_triples(text)
for s, p, o in new_triples:
graph.add((rdflib.URIRef(f"urn:{s}"), rdflib.URIRef(f"urn:{p}"), rdflib.Literal(o)))
save_knowledge_graph()
return f" Added {len(new_triples)} new triples. Total facts stored: {len(graph)}.\n Saved"
def retrieve_context(question, limit=10):
matches = []
qwords = [w for w in question.lower().split() if w not in {
'the','a','an','and','or','but','in','on','at','to','for','of','with','by','is','are','was','were','be','been','have','has','had','do','does','did','will','would','could','should','may','might','can','what','how','when','where','why','who'
} and len(w) > 2]
scored_matches = []
for s, p, o in graph:
subject = str(s).split(':')[-1] if ':' in str(s) else str(s)
predicate = str(p).split(':')[-1] if ':' in str(p) else str(p)
object_val = str(o)
fact_text = f"{subject} {predicate} {object_val}".lower()
score = 0
for word in qwords:
if word in fact_text:
score += 1
if word == subject.lower() or word == predicate.lower():
score += 2
if score > 0:
scored_matches.append((score, f"{subject} {predicate} {object_val}"))
scored_matches.sort(key=lambda x: x[0], reverse=True)
matches = [m[1] for m in scored_matches[:limit]]
if matches:
result = "**Relevant Knowledge:**\n"
for i, match in enumerate(matches, 1):
result += f"{i}. {match}\n"
return result
return "**No directly relevant facts found.**\n\nTry asking about topics that might be in your knowledge base, or add more knowledge first!"
def show_graph_contents():
if len(graph) == 0:
return "**Knowledge Graph Status: EMPTY**\n\n**How to build your knowledge base:**\n1. **Add text directly** - Paste any text in the 'Add Knowledge from Text' box above\n2. **Upload documents** - Use the file upload to process PDF, DOCX, TXT, CSV files\n3. **Extract facts** - The system will automatically extract knowledge from your content\n4. **Build knowledge** - Add more text or files to expand your knowledge base\n5. **Save knowledge** - Use 'Save Knowledge' to persist your data\n\n**Start by adding some text or uploading a document!**"
facts_by_subject = {}
all_facts = []
for s, p, o in graph:
subject = str(s).split(':')[-1] if ':' in str(s) else str(s)
predicate = str(p).split(':')[-1] if ':' in str(p) else str(p)
object_val = str(o)
fact_text = f"{subject} {predicate} {object_val}"
all_facts.append(fact_text)
facts_by_subject.setdefault(subject, []).append(f"{predicate} {object_val}")
result = f"**Knowledge Graph Overview**\n"
result += f"**Total Facts:** {len(graph)}\n"
result += f"**Unique Subjects:** {len(facts_by_subject)}\n\n"
result += "## **Knowledge by Subject:**\n\n"
for i, (subject, facts) in enumerate(facts_by_subject.items()):
if i >= 10:
remaining = len(facts_by_subject) - 10
result += f"... and {remaining} more subjects\n"
break
result += f"**{subject}:**\n"
for fact in facts:
result += f" โ€ข {fact}\n"
result += "\n"
result += "## **All Facts:**\n\n"
for i, fact in enumerate(all_facts[:20]):
result += f"{i+1}. {fact}\n"
if len(all_facts) > 20:
result += f"\n... and {len(all_facts) - 20} more facts"
return result
def visualize_knowledge_graph():
if len(graph) == 0:
return "<p>No knowledge in graph. Add some text or upload a document first!</p>"
try:
G = nx.Graph()
fact_data = {}
for s, p, o in graph:
subject = str(s).split(':')[-1] if ':' in str(s) else str(s)
predicate = str(p).split(':')[-1] if ':' in str(p) else str(p)
object_val = str(o)
subject_short = (subject[:30] + "...") if len(subject) > 30 else subject
object_short = (object_val[:30] + "...") if len(object_val) > 30 else object_val
if subject not in G:
G.add_node(subject, display=subject_short, node_type='subject')
if object_val not in G:
G.add_node(object_val, display=object_short, node_type='object')
G.add_edge(subject, object_val, label=predicate)
fact_data[(subject, object_val)] = f"{subject} {predicate} {object_val}"
pos = nx.spring_layout(G, k=2, iterations=100, seed=42)
import numpy as np
x_positions = [pos[n][0] for n in G.nodes()]
y_positions = [pos[n][1] for n in G.nodes()]
x_min, x_max = min(x_positions), max(x_positions)
y_min, y_max = min(y_positions), max(y_positions)
scale = min(500 / (x_max - x_min), 400 / (y_max - y_min)) if (x_max - x_min) > 0 and (y_max - y_min) > 0 else 50
offset_x = 350
offset_y = 300
svg_elements = []
for edge in G.edges():
x1 = pos[edge[0]][0] * scale + offset_x
y1 = pos[edge[0]][1] * scale + offset_y
x2 = pos[edge[1]][0] * scale + offset_x
y2 = pos[edge[1]][1] * scale + offset_y
edge_data = G[edge[0]][edge[1]]
label = edge_data.get('label', 'has')
svg_elements.append(f"""
<line x1="{x1}" y1="{y1}" x2="{x2}" y2="{y2}"
stroke="#999" stroke-width="2" opacity="0.5">
<title>{label}</title>
</line>
""")
node_info = []
for i, node in enumerate(G.nodes()):
x = pos[node][0] * scale + offset_x
y = pos[node][1] * scale + offset_y
display_name = G.nodes[node].get('display', node)
node_type = G.nodes[node].get('node_type', 'unknown')
color = '#4CAF50' if node_type == 'subject' else ('#2196F3' if node_type == 'object' else '#546E7A')
neighbors = list(G.neighbors(node))
neighbor_count = len(neighbors)
node_info.append(f"""
<circle cx="{x}" cy="{y}" r="{max(40, min(30, neighbor_count * 2 + 20))}"
fill="{color}" stroke="#fff" stroke-width="2">
<title>{display_name} ({neighbor_count} connections)</title>
</circle>
<text x="{x}" y="{y+6}" text-anchor="middle" font-size="15" font-weight="bold" fill="#000"
pointer-events="none">{display_name[:15]}</text>
""")
svg_content = '\n'.join(svg_elements + node_info)
html = f"""
<div style="width: 100%; min-height: 700px; max-height: 800px; background: white; border: 2px solid #ddd; border-radius: 10px; padding: 20px; position: relative; overflow: auto;">
<svg width="100%" height="550" style="border: 1px solid #ddd; border-radius: 5px; background: #f9f9f9; display: block;">
{svg_content}
</svg>
</div>
"""
return html
except Exception as e:
return f"<p style='color: red; padding: 20px;'>Error creating visualization: {e}</p>"
def delete_all_knowledge():
global graph
count = len(graph)
graph = rdflib.Graph()
save_knowledge_graph()
return f"๐Ÿ—‘๏ธ Deleted all {count} facts from the knowledge graph. Graph is now empty."
def delete_knowledge_by_keyword(keyword):
global graph
if not keyword or keyword.strip() == "":
return "โš ๏ธ Please enter a keyword to search for."
keyword = keyword.strip().lower()
deleted_count = 0
facts_to_remove = []
for s, p, o in graph:
fact_text = f"{s} {p} {o}".lower()
if keyword in fact_text:
facts_to_remove.append((s, p, o))
for fact in facts_to_remove:
graph.remove(fact)
deleted_count += 1
if deleted_count > 0:
save_knowledge_graph()
return f"๐Ÿ—‘๏ธ Deleted {deleted_count} facts containing '{keyword}'"
else:
return f"โ„น๏ธ No facts found containing '{keyword}'"
def delete_recent_knowledge(count=5):
global graph
if len(graph) == 0:
return "โ„น๏ธ Knowledge graph is already empty."
facts = list(graph)
facts_to_remove = facts[-count:] if count < len(facts) else facts
for fact in facts_to_remove:
graph.remove(fact)
save_knowledge_graph()
return f"๐Ÿ—‘๏ธ Deleted {len(facts_to_remove)} most recent facts"
def list_facts_for_editing():
global fact_index
fact_index = {}
options = []
for i, (s, p, o) in enumerate(list(graph), start=1):
subject = str(s).split(':')[-1] if ':' in str(s) else str(s)
predicate = str(p).split(':')[-1] if ':' in str(p) else str(p)
object_val = str(o)
label = f"{i}. {subject} {predicate} {object_val}"
options.append(label)
fact_index[i] = (s, p, o)
return options
def load_fact_by_label(fact_label):
if not fact_label:
return None
try:
fact_id = int(fact_label.split('.', 1)[0].strip())
return fact_index.get(fact_id)
except Exception:
return None
def import_knowledge_from_json_file(file):
try:
if file is None:
return "โš ๏ธ No file selected."
file_path = file.name if hasattr(file, 'name') else str(file)
if not os.path.exists(file_path):
return f"โš ๏ธ File not found: {file_path}"
with open(file_path, 'r', encoding='utf-8') as f:
data = json.load(f)
if isinstance(data, dict) and 'facts' in data:
facts = data['facts']
elif isinstance(data, list):
facts = data
else:
return "โŒ Unsupported JSON structure. Expect an object with 'facts' or a list of facts."
added = 0
skipped = 0
for fact in facts:
try:
subject = fact.get('subject') or fact.get('full_subject')
predicate = fact.get('predicate') or fact.get('full_predicate')
obj = fact.get('object') or fact.get('full_object')
if not subject or not predicate or obj is None:
skipped += 1
continue
s_ref = rdflib.URIRef(subject if str(subject).startswith('urn:') else f"urn:{subject}")
p_ref = rdflib.URIRef(predicate if str(predicate).startswith('urn:') else f"urn:{predicate}")
o_lit = rdflib.Literal(obj)
graph.add((s_ref, p_ref, o_lit))
added += 1
except Exception:
skipped += 1
save_knowledge_graph()
return f"โœ… Imported {added} facts. Skipped {skipped}. Total facts: {len(graph)}."
except Exception as e:
return f"โŒ Import failed: {e}"