Spaces:
Sleeping
Sleeping
| import json | |
| from pathlib import Path | |
| from sentence_transformers import SentenceTransformer, util | |
| import re | |
| # ----------------------------- | |
| # normalize subject name (for better grouping) | |
| # ----------------------------- | |
| def normalize_subject(subject: str) -> str: | |
| if not subject: | |
| return "" | |
| subject = subject.strip().lower() | |
| subject = re.sub(r"\s+", " ", subject) # collapse spaces | |
| subject = re.sub(r"[^\w\s]", "", subject) # remove symbols | |
| return subject | |
| # ----------------------------- | |
| # Load all questions from multiple JSONs | |
| # ----------------------------- | |
| def load_all_questions(json_files): | |
| part_a = [] | |
| part_b = [] | |
| subject = None | |
| print("from all loaded question papers spliting into 3 parts subject, part a and part b") | |
| for file in json_files: | |
| data = json.loads(Path(file).read_text(encoding="utf-8")) | |
| # subject = subject or data.get("subject") | |
| # π SUBJECT NORMALIZATION & CHECK (ADD HERE) | |
| # current_subject_raw = data.get("subject") or "" | |
| # current_subject = normalize_subject(current_subject_raw) | |
| # if subject is None: | |
| # subject = current_subject | |
| # display_subject = current_subject_raw.strip() | |
| # elif subject != current_subject: | |
| # raise ValueError( | |
| # f"β Mixed subjects detected: '{subject}' vs '{current_subject}'" | |
| # ) | |
| current_subject_raw = data.get("subject") or "" | |
| current_subject = normalize_subject(current_subject_raw) | |
| # If current JSON has no subject, ignore it | |
| if not current_subject: | |
| pass | |
| # First valid subject wins | |
| elif subject is None: | |
| subject = current_subject | |
| display_subject = current_subject_raw.strip() | |
| # Conflict only if BOTH are non-empty and different | |
| elif subject != current_subject: | |
| raise ValueError( | |
| f"β Mixed subjects detected: '{subject}' vs '{current_subject}'" | |
| ) | |
| # PART A | |
| for sq in data.get("PART_A", []): | |
| question = sq.get("question") | |
| # β skip if None, empty, or not string | |
| if not isinstance(question, str): | |
| continue | |
| question = question.strip() | |
| if not question: | |
| continue | |
| part_a.append({ | |
| "text": question, | |
| "images": [sq.get("image")] if sq.get("image") else [] | |
| }) | |
| # for q in data["PART_A"]: | |
| # if not q["question"].strip(): | |
| # continue | |
| # part_a.append({ | |
| # "text": q["question"].strip(), | |
| # "images": [q["image"]] if q["image"] else [] | |
| # }) | |
| # PART B | |
| for block in data["PART_B"]: | |
| # for sq in block["subquestions"]: | |
| # if not sq["question"].strip(): | |
| # continue | |
| # part_b.append({ | |
| # "text": sq["question"].strip(), | |
| # "images": [sq["image"]] if sq["image"] else [] | |
| # }) | |
| for sq in block.get("subquestions", []): | |
| question = sq.get("question") | |
| # β skip if None, empty, or not string | |
| if not isinstance(question, str): | |
| continue | |
| question = question.strip() | |
| if not question: | |
| continue | |
| part_b.append({ | |
| "text": question, | |
| "images": [sq.get("image")] if sq.get("image") else [] | |
| }) | |
| print("data splited into 3 parts subject, part a and part b successfully") | |
| return subject, part_a, part_b | |
| # ----------------------------- | |
| # Semantic clustering + frequency | |
| # ----------------------------- | |
| def semantic_frequency(questions, threshold=0.70): | |
| print("clustring started for part using all-mpnet-base-v2 model") | |
| model = SentenceTransformer("all-mpnet-base-v2") #SentenceTransformer("all-MiniLM-L6-v2") | |
| texts = [q["text"] for q in questions] | |
| embeddings = model.encode(texts, convert_to_tensor=True, normalize_embeddings=True) | |
| visited = set() | |
| results = [] | |
| for i in range(len(texts)): | |
| if i in visited: | |
| continue | |
| cluster = [i] | |
| visited.add(i) | |
| sims = util.cos_sim(embeddings[i], embeddings)[0] | |
| for j in range(len(texts)): | |
| if j not in visited and sims[j] >= threshold: | |
| cluster.append(j) | |
| visited.add(j) | |
| # Representative question (NO rewriting) | |
| rep_question = texts[cluster[0]] | |
| # Merge all images | |
| images = set() | |
| for idx in cluster: | |
| images.update(questions[idx]["images"]) | |
| results.append({ | |
| "question": rep_question, | |
| "frequency": len(cluster), | |
| "images": list(images) if images else None | |
| }) | |
| return results | |
| # ----------------------------- | |
| # Main runner | |
| # ----------------------------- | |
| def run_semantic_frequency_multiple(input_jsons, output_json): | |
| print("in semantic frequency multiple function") | |
| subject, part_a, part_b = load_all_questions(input_jsons) | |
| output = { | |
| "subject": subject, | |
| "PART_A": semantic_frequency(part_a), | |
| "PART_B": semantic_frequency(part_b) | |
| } | |
| print("β Questions clustered and frequency calculated") | |
| Path(output_json).write_text( | |
| json.dumps(output, indent=2), | |
| encoding="utf-8" | |
| ) | |
| print("β Semantic Frequency Analysis Completed") | |
| print(f"π Output saved at: {output_json}") | |
| return output | |
| # ----------------------------- | |
| # Entry point | |
| # ----------------------------- | |
| # if __name__ == "__main__": | |
| # INPUT_JSONS = [ | |
| # "vl_output_bro\\query_3\\final_document.json", | |
| # "vl_output_bro\\query_2\\final_document.json", | |
| # ] | |
| # OUTPUT_JSON = "frequency_output.json" | |
| # run_semantic_frequency_multiple(INPUT_JSONS, OUTPUT_JSON) | |