| from __future__ import annotations |
|
|
| import re |
| from typing import Any, Iterable, Mapping, Sequence |
|
|
|
|
| PROFILE_CATEGORIES = {"profile", "preference", "goal", "constraint", "stage_state", "status"} |
| PROFILE_TYPES = {"setup", "preference", "constraint", "goal", "avoid", "usage_context"} |
| PROFILE_SEMANTIC_SLOTS = {"identity", "research_topic", "education", "occupation"} |
| PROFILE_AGGREGATE_SOURCE_KIND = "public_dialog_profile" |
| PROFILE_AGGREGATE_CATEGORY = "profile" |
| PROFILE_CLUSTER_SOURCE_KIND = "public_dialog_profile_cluster" |
| PROFILE_CLUSTER_CATEGORY = "profile" |
| PROFILE_CONSOLIDATOR_VERSION = "profile_consolidator_v1_structured_summary" |
|
|
| _PROFILE_QUERY_MARKERS = ( |
| "preference", |
| "prefer", |
| "like", |
| "dislike", |
| "recommend", |
| "suggest", |
| "advice", |
| "advise", |
| "any advice", |
| "any suggestions", |
| "any tips", |
| "tips", |
| "trouble with", |
| "struggling with", |
| "what do you think", |
| "learn more", |
| "resources", |
| "suited", |
| "fit me", |
| "for me", |
| "based on me", |
| "based on my", |
| "my setup", |
| "my profile", |
| "my goal", |
| "my constraint", |
| "my occupation", |
| "my previous occupation", |
| "my role", |
| "my previous role", |
| "my job", |
| "my previous job", |
| "what was my", |
| "where did i work", |
| "worked as", |
| "occupation", |
| "previous occupation", |
| "role", |
| "previous role", |
| "job", |
| "career", |
| "background", |
| "identity", |
| "experience", |
| "avoid", |
| "what should i", |
| "should i", |
| "what should", |
| "serve", |
| "dinner", |
| "homegrown", |
| "ingredients", |
| "battery life", |
| "getting around", |
| "偏好", |
| "喜欢", |
| "不喜欢", |
| "推荐", |
| "建议", |
| "适合我", |
| "根据我", |
| "我的情况", |
| "我的配置", |
| "我的目标", |
| "我的约束", |
| "画像", |
| "避免", |
| ) |
|
|
|
|
| def _clean_text(value: Any) -> str: |
| return re.sub(r"\s+", " ", str(value or "").strip()) |
|
|
|
|
| def _normalize(value: Any) -> str: |
| return _clean_text(value).lower() |
|
|
|
|
| def _tokens(value: Any) -> list[str]: |
| text = _normalize(value) |
| english = re.findall(r"[a-z0-9_]+", text) |
| cjk = [char for char in text if "\u4e00" <= char <= "\u9fff"] |
| return _dedupe([*english, *cjk]) |
|
|
|
|
| def _slug(value: Any, *, fallback: str = "general") -> str: |
| text = _normalize(value) |
| parts = re.findall(r"[a-z0-9]+|[\u4e00-\u9fff]", text) |
| slug = "_".join(parts[:10]).strip("_") |
| return slug or fallback |
|
|
|
|
| def _dedupe(items: Iterable[Any], *, max_items: int | None = None) -> list[str]: |
| values: list[str] = [] |
| seen: set[str] = set() |
| for item in items: |
| text = _clean_text(item) |
| if not text: |
| continue |
| key = _normalize(text) |
| if key in seen: |
| continue |
| seen.add(key) |
| values.append(text) |
| if max_items is not None and len(values) >= max_items: |
| break |
| return values |
|
|
|
|
| def _first_nonempty(*values: Any) -> str: |
| for value in values: |
| text = _clean_text(value) |
| if text: |
| return text |
| return "" |
|
|
|
|
| def _bounded_union(*groups: Iterable[Any], max_items: int) -> list[str]: |
| return _dedupe([item for group in groups for item in group], max_items=max_items) |
|
|
|
|
| def _bounded_int_union(*groups: Iterable[Any], max_items: int) -> list[int]: |
| values: list[int] = [] |
| seen: set[int] = set() |
| for group in groups: |
| for item in group: |
| try: |
| value = int(item) |
| except Exception: |
| continue |
| if value in seen: |
| continue |
| seen.add(value) |
| values.append(value) |
| if len(values) >= max_items: |
| return sorted(values) |
| return sorted(values) |
|
|
|
|
| def is_profile_layer_record(*, category: Any, source_kind: Any = "", semantic_slot: Any = "", metadata: Mapping[str, Any] | None = None) -> bool: |
| data = dict(metadata or {}) |
| if bool(data.get("profile_layer")): |
| return True |
| category_text = _normalize(category) |
| slot_text = _normalize(semantic_slot or data.get("semantic_slot", "")) |
| source_text = _normalize(source_kind) |
| if category_text in PROFILE_CATEGORIES: |
| return True |
| if slot_text in PROFILE_SEMANTIC_SLOTS or slot_text.startswith("profile_"): |
| return True |
| return source_text in { |
| "public_dialog_profile", |
| "public_dialog_profile_cluster", |
| "public_dialog_preference", |
| "public_dialog_goal", |
| "public_dialog_constraint", |
| } |
|
|
|
|
| def infer_profile_type(*, category: Any, semantic_slot: Any = "", relation: Any = "", value: Any = "", metadata: Mapping[str, Any] | None = None) -> str: |
| data = dict(metadata or {}) |
| explicit = _normalize(data.get("profile_type", "")) |
| if explicit in PROFILE_TYPES: |
| return explicit |
| category_text = _normalize(category) |
| combined = _normalize(f"{semantic_slot} {relation} {value}") |
| if category_text == "preference" or any(marker in combined for marker in ("prefer", "like", "default", "偏好", "喜欢", "默认")): |
| return "preference" |
| if category_text == "constraint" or any(marker in combined for marker in ("must", "cannot", "forbid", "constraint", "必须", "不能", "约束")): |
| return "constraint" |
| if category_text == "goal" or any(marker in combined for marker in ("goal", "target", "objective", "目标")): |
| return "goal" |
| if any(marker in combined for marker in ("avoid", "dislike", "do not", "don't", "避免", "不喜欢", "不要")): |
| return "avoid" |
| if any(marker in combined for marker in ("setup", "environment", "workflow", "current", "配置", "环境", "流程", "当前")): |
| return "setup" |
| return "usage_context" |
|
|
|
|
| def infer_profile_domain( |
| *, |
| category: Any, |
| semantic_slot: Any = "", |
| slot_key: Any = "", |
| anchors: Sequence[Any] = (), |
| value: Any = "", |
| metadata: Mapping[str, Any] | None = None, |
| ) -> tuple[str, str]: |
| data = dict(metadata or {}) |
| explicit = _first_nonempty(data.get("profile_domain", ""), data.get("profile_domain_label", ""), data.get("domain", "")) |
| if explicit: |
| return _slug(explicit), explicit |
| subject = _first_nonempty(data.get("subject", ""), data.get("extracted_subject", "")) |
| if subject: |
| return _slug(subject), subject |
| slot = _normalize(semantic_slot) |
| if slot and slot not in {"profile", "preference", "goal", "constraint", "event", "fact", "status"}: |
| return _slug(slot), slot.replace("_", " ") |
| for anchor in anchors: |
| anchor_text = _clean_text(anchor) |
| if not anchor_text: |
| continue |
| if re.search(r"^\d{4}|\d{1,2}\s+[A-Za-z]+|^[A-Z][a-z]+$", anchor_text): |
| continue |
| return _slug(anchor_text), anchor_text |
| slot_key_text = _clean_text(slot_key) |
| if slot_key_text: |
| tail = slot_key_text.split(".")[-1].replace("_", " ") |
| if tail: |
| return _slug(tail), tail |
| value_tokens = _tokens(value) |
| if value_tokens: |
| label = " ".join(value_tokens[:4]) |
| return _slug(label), label |
| return _slug(category), _normalize(category) or "general" |
|
|
|
|
| def profile_candidate_metadata( |
| *, |
| category: Any, |
| semantic_slot: Any = "", |
| relation: Any = "", |
| value: Any = "", |
| source_span: Any = "", |
| slot_key: Any = "", |
| anchors: Sequence[Any] = (), |
| subject: Any = "", |
| subject_signature: Any = "", |
| proposal: Mapping[str, Any] | None = None, |
| ) -> dict[str, Any]: |
| proposal_data = dict(proposal or {}) |
| base_metadata = { |
| "profile_type": proposal_data.get("profile_type", ""), |
| "profile_domain": proposal_data.get("profile_domain", ""), |
| "profile_domain_label": proposal_data.get("profile_domain_label", ""), |
| "subject": subject, |
| "extracted_subject": proposal_data.get("extracted_subject", ""), |
| } |
| if not is_profile_layer_record(category=category, semantic_slot=semantic_slot, metadata=base_metadata): |
| return {} |
| profile_type = infer_profile_type( |
| category=category, |
| semantic_slot=semantic_slot, |
| relation=relation, |
| value=value or source_span, |
| metadata=proposal_data, |
| ) |
| domain, domain_label = infer_profile_domain( |
| category=category, |
| semantic_slot=semantic_slot, |
| slot_key=slot_key, |
| anchors=anchors, |
| value=value or source_span, |
| metadata={**proposal_data, "subject": subject}, |
| ) |
| normalized_subject_signature = _slug(subject_signature or subject or domain, fallback=domain) |
| route_terms = _dedupe( |
| [ |
| profile_type, |
| domain_label, |
| semantic_slot, |
| subject, |
| *list(anchors or []), |
| ], |
| max_items=12, |
| ) |
| return { |
| "profile_layer": True, |
| "profile_candidate_status": _clean_text(proposal_data.get("profile_candidate_status", "")) or "writer_candidate", |
| "profile_consolidation_stage": _clean_text(proposal_data.get("profile_consolidation_stage", "")) or "pre_consolidation", |
| "profile_type": profile_type, |
| "profile_domain": domain, |
| "profile_domain_label": domain_label, |
| "profile_subject_signature": normalized_subject_signature, |
| "profile_support_key": f"{profile_type}:{domain}:{normalized_subject_signature}", |
| "profile_route_terms": route_terms, |
| } |
|
|
|
|
| def profile_aggregate_slot_key(metadata: Mapping[str, Any]) -> str: |
| data = dict(metadata or {}) |
| support_key = _clean_text(data.get("profile_support_key", "")) |
| if support_key: |
| return f"tmcra.profile.aggregate.{_slug(support_key)}" |
| profile_type = _normalize(data.get("profile_type", "")) or "usage_context" |
| domain = _normalize(data.get("profile_domain", "")) or _slug(data.get("profile_domain_label", "general")) |
| subject = _normalize(data.get("profile_subject_signature", "")) or domain |
| return f"tmcra.profile.aggregate.{_slug(f'{profile_type}:{domain}:{subject}')}" |
|
|
|
|
| def profile_aggregate_value( |
| *, |
| profile_type: Any, |
| domain_label: Any, |
| support_values: Sequence[Any], |
| ) -> str: |
| typed = _clean_text(profile_type) or "usage_context" |
| domain = _clean_text(domain_label) or "general" |
| values = _dedupe(support_values, max_items=5) |
| if not values: |
| return f"User {typed} profile for {domain}." |
| return f"User {typed} profile for {domain}: " + "; ".join(values) |
|
|
|
|
| def _profile_output_kind(profile_types: Sequence[Any]) -> str: |
| normalized = {_normalize(item) for item in profile_types if _clean_text(item)} |
| if "constraint" in normalized: |
| return "constraint_profile" |
| if "goal" in normalized: |
| return "goal_profile" |
| if normalized.intersection({"preference", "avoid"}): |
| return "preference_profile" |
| if "setup" in normalized: |
| return "setup_profile" |
| return "usage_context_profile" |
|
|
|
|
| def _profile_update_policy(profile_types: Sequence[Any]) -> str: |
| normalized = {_normalize(item) for item in profile_types if _clean_text(item)} |
| if "constraint" in normalized: |
| return "preserve_until_explicitly_changed" |
| if normalized.intersection({"preference", "avoid", "goal", "setup"}): |
| return "update_on_newer_user_evidence" |
| return "background_context" |
|
|
|
|
| def _profile_memory_type(profile_types: Sequence[Any]) -> str: |
| normalized = {_normalize(item) for item in profile_types if _clean_text(item)} |
| if "constraint" in normalized: |
| return "hard_constraint" |
| if normalized.intersection({"preference", "avoid"}): |
| return "durable_preference" |
| return "profile_context" |
|
|
|
|
| def profile_summary( |
| *, |
| profile_types: Sequence[Any], |
| domain_label: Any, |
| support_values: Sequence[Any], |
| stage: str, |
| ) -> str: |
| kind = _profile_output_kind(profile_types) |
| domain = _clean_text(domain_label) or "general" |
| values = _dedupe(support_values, max_items=6 if stage == "cluster" else 4) |
| prefix = { |
| "constraint_profile": "User constraint profile", |
| "goal_profile": "User goal profile", |
| "preference_profile": "User preference profile", |
| "setup_profile": "User setup profile", |
| "usage_context_profile": "User usage-context profile", |
| }.get(kind, "User profile") |
| if not values: |
| return f"{prefix} for {domain}." |
| return f"{prefix} for {domain}: " + "; ".join(values) |
|
|
|
|
| def build_profile_aggregate_metadata( |
| *, |
| support_record_id: Any, |
| support_turn_index: int, |
| support_value: Any, |
| support_anchors: Sequence[Any], |
| support_metadata: Mapping[str, Any], |
| existing_metadata: Mapping[str, Any] | None = None, |
| ) -> dict[str, Any]: |
| support = dict(support_metadata or {}) |
| existing = dict(existing_metadata or {}) |
| profile_type = _normalize(support.get("profile_type", "")) or "usage_context" |
| domain = _normalize(support.get("profile_domain", "")) or _slug(support.get("profile_domain_label", "general")) |
| domain_label = _clean_text(support.get("profile_domain_label", "")) or domain.replace("_", " ") |
| subject_signature = _clean_text(support.get("profile_subject_signature", "")) or domain |
| support_ids = _bounded_union(existing.get("profile_support_ids", []) or [], [support_record_id], max_items=64) |
| support_turns = _bounded_int_union(existing.get("profile_support_turns", []) or [], [support_turn_index], max_items=64) |
| support_values = _bounded_union(existing.get("profile_support_values", []) or [], [support_value], max_items=12) |
| support_route_terms = _bounded_union( |
| existing.get("profile_route_terms", []) or [], |
| support.get("profile_route_terms", []) or [], |
| support_anchors, |
| max_items=24, |
| ) |
| profile_types = [profile_type] |
| output_kind = _profile_output_kind(profile_types) |
| memory_type = _profile_memory_type(profile_types) |
| summary = profile_summary( |
| profile_types=profile_types, |
| domain_label=domain_label, |
| support_values=support_values, |
| stage="aggregate", |
| ) |
| value = profile_aggregate_value( |
| profile_type=profile_type, |
| domain_label=domain_label, |
| support_values=support_values, |
| ) |
| return { |
| **existing, |
| "profile_layer": True, |
| "profile_candidate_status": "consolidated", |
| "profile_consolidation_stage": "aggregate", |
| "profile_consolidator_version": PROFILE_CONSOLIDATOR_VERSION, |
| "profile_aggregate_node": True, |
| "profile_type": profile_type, |
| "profile_domain": domain, |
| "profile_domain_label": domain_label, |
| "profile_subject_signature": subject_signature, |
| "profile_support_key": f"{profile_type}:{domain}:{subject_signature}", |
| "profile_support_ids": support_ids, |
| "profile_support_turns": support_turns, |
| "profile_support_values": support_values, |
| "profile_support_count": len(support_ids), |
| "profile_route_terms": support_route_terms, |
| "profile_value": value, |
| "profile_summary": summary, |
| "profile_output_kind": output_kind, |
| "profile_update_policy": _profile_update_policy(profile_types), |
| "profile_conflict_policy": "latest_active_support_only", |
| "profile_evidence_count": len(support_ids), |
| "memory_type": memory_type, |
| "durable_memory_type": memory_type, |
| "memory_chain_depth_layer": "profile", |
| "depth_layer": "profile", |
| } |
|
|
|
|
| _PROFILE_CLUSTER_STOPWORDS = { |
| "user", |
| "profile", |
| "preference", |
| "preferences", |
| "constraint", |
| "constraints", |
| "goal", |
| "goals", |
| "avoid", |
| "usage", |
| "context", |
| "general", |
| "default", |
| "should", |
| "would", |
| "could", |
| "want", |
| "wants", |
| "need", |
| "needs", |
| "when", |
| "then", |
| "than", |
| "with", |
| "that", |
| "this", |
| "from", |
| "into", |
| "instead", |
| } |
|
|
|
|
| def profile_cluster_tokens(*values: Any) -> list[str]: |
| tokens: list[str] = [] |
| for value in values: |
| if isinstance(value, (list, tuple, set)): |
| for item in value: |
| tokens.extend(_tokens(item)) |
| else: |
| tokens.extend(_tokens(value)) |
| filtered = [ |
| token |
| for token in tokens |
| if token |
| and token not in _PROFILE_CLUSTER_STOPWORDS |
| and not token.isdigit() |
| and (len(token) > 2 or any("\u4e00" <= char <= "\u9fff" for char in token)) |
| ] |
| return _dedupe(filtered, max_items=32) |
|
|
|
|
| def profile_cluster_similarity(source_metadata: Mapping[str, Any], target_metadata: Mapping[str, Any]) -> float: |
| source = dict(source_metadata or {}) |
| target = dict(target_metadata or {}) |
| source_tokens = set( |
| profile_cluster_tokens( |
| source.get("profile_domain_label", ""), |
| source.get("profile_domain", ""), |
| source.get("profile_route_terms", []) or [], |
| source.get("profile_support_values", []) or [], |
| source.get("profile_value", ""), |
| ) |
| ) |
| target_tokens = set( |
| profile_cluster_tokens( |
| target.get("profile_domain_label", ""), |
| target.get("profile_domain", ""), |
| target.get("profile_route_terms", []) or [], |
| target.get("profile_support_values", []) or [], |
| target.get("profile_value", ""), |
| ) |
| ) |
| if not source_tokens or not target_tokens: |
| return 0.0 |
| overlap = len(source_tokens & target_tokens) / max(1, len(source_tokens | target_tokens)) |
| containment = len(source_tokens & target_tokens) / max(1, min(len(source_tokens), len(target_tokens))) |
| type_bonus = 0.06 if _normalize(source.get("profile_type", "")) == _normalize(target.get("profile_type", "")) else 0.0 |
| return round(min(1.0, (0.62 * overlap) + (0.38 * containment) + type_bonus), 6) |
|
|
|
|
| def profile_cluster_slot_key(metadata: Mapping[str, Any]) -> str: |
| data = dict(metadata or {}) |
| tokens = profile_cluster_tokens( |
| data.get("profile_domain_label", ""), |
| data.get("profile_domain", ""), |
| data.get("profile_route_terms", []) or [], |
| data.get("profile_support_values", []) or [], |
| data.get("profile_value", ""), |
| ) |
| if tokens: |
| return f"tmcra.profile.cluster.{_slug('_'.join(tokens[:5]))}" |
| support_profiles = data.get("profile_support_profile_ids", []) or [] |
| support_seed = _clean_text(support_profiles[0] if support_profiles else "") |
| seed = support_seed or _clean_text(data.get("profile_support_key", "general")) |
| return f"tmcra.profile.cluster.{_slug(seed)}" |
|
|
|
|
| def profile_cluster_value(*, support_values: Sequence[Any]) -> str: |
| values = _dedupe(support_values, max_items=8) |
| if not values: |
| return "User profile cluster." |
| return "User profile cluster: " + "; ".join(values) |
|
|
|
|
| def build_profile_cluster_metadata( |
| *, |
| support_profile_id: Any, |
| support_metadata: Mapping[str, Any], |
| existing_metadata: Mapping[str, Any] | None = None, |
| ) -> dict[str, Any]: |
| support = dict(support_metadata or {}) |
| existing = dict(existing_metadata or {}) |
| support_profile_ids = _bounded_union( |
| existing.get("profile_support_profile_ids", []) or [], |
| [support_profile_id], |
| max_items=32, |
| ) |
| support_ids = _bounded_union( |
| existing.get("profile_support_ids", []) or [], |
| support.get("profile_support_ids", []) or [], |
| max_items=96, |
| ) |
| support_turns = _bounded_int_union( |
| existing.get("profile_support_turns", []) or [], |
| support.get("profile_support_turns", []) or [], |
| max_items=96, |
| ) |
| raw_support_values = support.get("profile_support_values", []) or [] |
| support_values = _bounded_union( |
| existing.get("profile_support_values", []) or [], |
| raw_support_values, |
| [] if raw_support_values else [support.get("profile_value", "")], |
| max_items=16, |
| ) |
| route_terms = _bounded_union( |
| existing.get("profile_route_terms", []) or [], |
| existing.get("profile_cluster_route_terms", []) or [], |
| support.get("profile_route_terms", []) or [], |
| profile_cluster_tokens( |
| support.get("profile_domain_label", ""), |
| support.get("profile_domain", ""), |
| support.get("profile_value", ""), |
| support.get("profile_support_values", []) or [], |
| ), |
| max_items=32, |
| ) |
| profile_types = _bounded_union( |
| existing.get("profile_cluster_types", []) or [], |
| [support.get("profile_type", "")], |
| max_items=8, |
| ) |
| domains = _bounded_union( |
| existing.get("profile_cluster_domains", []) or [], |
| [support.get("profile_domain", ""), support.get("profile_domain_label", "")], |
| max_items=16, |
| ) |
| primary_type = _clean_text(profile_types[0] if profile_types else support.get("profile_type", "")) or "usage_context" |
| primary_domain = _clean_text(domains[0] if domains else support.get("profile_domain", "")) or "general" |
| output_kind = _profile_output_kind(profile_types) |
| memory_type = _profile_memory_type(profile_types) |
| summary = profile_summary( |
| profile_types=profile_types, |
| domain_label=primary_domain, |
| support_values=support_values, |
| stage="cluster", |
| ) |
| value = profile_cluster_value(support_values=support_values) |
| return { |
| **existing, |
| "profile_layer": True, |
| "profile_candidate_status": "consolidated", |
| "profile_consolidation_stage": "cluster", |
| "profile_consolidator_version": PROFILE_CONSOLIDATOR_VERSION, |
| "profile_cluster_node": True, |
| "profile_type": primary_type, |
| "profile_domain": _slug(primary_domain), |
| "profile_domain_label": primary_domain.replace("_", " "), |
| "profile_subject_signature": _clean_text(existing.get("profile_subject_signature", "")) or _slug(primary_domain), |
| "profile_support_key": _clean_text(existing.get("profile_support_key", "")) or f"cluster:{_slug(primary_domain)}", |
| "profile_support_profile_ids": support_profile_ids, |
| "profile_support_ids": support_ids, |
| "profile_support_turns": support_turns, |
| "profile_support_values": support_values, |
| "profile_support_count": len(support_ids), |
| "profile_cluster_profile_count": len(support_profile_ids), |
| "profile_cluster_types": profile_types, |
| "profile_cluster_domains": domains, |
| "profile_cluster_route_terms": route_terms, |
| "profile_route_terms": route_terms, |
| "profile_value": value, |
| "profile_summary": summary, |
| "profile_output_kind": output_kind, |
| "profile_update_policy": _profile_update_policy(profile_types), |
| "profile_conflict_policy": "latest_active_support_only", |
| "profile_evidence_count": len(support_ids), |
| "memory_type": memory_type, |
| "durable_memory_type": memory_type, |
| "memory_chain_depth_layer": "profile", |
| "depth_layer": "profile", |
| } |
|
|
|
|
| def infer_profile_query_intent(query: Any) -> dict[str, Any]: |
| query_text = _clean_text(query) |
| lowered = _normalize(query_text) |
| enabled = any(marker in lowered for marker in _PROFILE_QUERY_MARKERS) |
| types: list[str] = [] |
| if any(marker in lowered for marker in ("prefer", "preference", "like", "dislike", "偏好", "喜欢", "不喜欢")): |
| types.append("preference") |
| if any(marker in lowered for marker in ("constraint", "must", "cannot", "policy", "约束", "限制", "必须", "不能")): |
| types.append("constraint") |
| if any(marker in lowered for marker in ("goal", "target", "objective", "目标")): |
| types.append("goal") |
| if any(marker in lowered for marker in ("avoid", "dislike", "避免", "不喜欢")): |
| types.append("avoid") |
| if any(marker in lowered for marker in ("setup", "profile", "current", "occupation", "role", "job", "career", "background", "identity", "experience", "配置", "画像", "当前")): |
| types.append("setup") |
| if enabled and any( |
| marker in lowered |
| for marker in ( |
| "recommend", |
| "suggest", |
| "advice", |
| "tips", |
| "trouble with", |
| "struggling with", |
| "what do you think", |
| "should i", |
| "what should", |
| "serve", |
| "dinner", |
| "battery life", |
| "getting around", |
| "resources", |
| "learn more", |
| "推荐", |
| "建议", |
| ) |
| ): |
| types.append("usage_context") |
| if not types and enabled: |
| types.append("usage_context") |
| return { |
| "enabled": enabled, |
| "types": _dedupe(types), |
| "tokens": _tokens(query_text), |
| } |
|
|
|
|
| def profile_query_score_delta( |
| *, |
| query: Any, |
| query_tokens: set[str], |
| category: Any, |
| source_kind: Any, |
| semantic_slot: Any, |
| value: Any, |
| anchors: Sequence[Any], |
| metadata: Mapping[str, Any] | None = None, |
| ) -> tuple[float, str]: |
| data = dict(metadata or {}) |
| intent = infer_profile_query_intent(query) |
| if not intent.get("enabled"): |
| return 0.0, "" |
| if not is_profile_layer_record(category=category, source_kind=source_kind, semantic_slot=semantic_slot, metadata=data): |
| return 0.0, "" |
| profile_type = _normalize(data.get("profile_type", "")) or infer_profile_type( |
| category=category, |
| semantic_slot=semantic_slot, |
| value=value, |
| metadata=data, |
| ) |
| domain = _normalize(data.get("profile_domain_label", "") or data.get("profile_domain", "")) |
| route_terms = " ".join(str(item) for item in data.get("profile_route_terms", []) or []) |
| record_tokens = set(_tokens(f"{profile_type} {domain} {semantic_slot} {value} {' '.join(str(item) for item in anchors)} {route_terms}")) |
| overlap = len(set(query_tokens) & record_tokens) / max(1, len(set(query_tokens) | record_tokens)) if query_tokens or record_tokens else 0.0 |
| type_match = profile_type in set(intent.get("types", []) or []) |
| delta = 0.20 + (0.16 if type_match else 0.0) + (0.18 * overlap) |
| if data.get("profile_candidate_status") == "consolidated": |
| delta += 0.06 |
| if data.get("profile_cluster_node") or data.get("profile_consolidation_stage") == "cluster": |
| delta += 0.08 |
| return round(min(delta, 0.54), 6), "profile_route" |
|
|
|
|
| def profile_edge_score(source_metadata: Mapping[str, Any], target_metadata: Mapping[str, Any], *, source_value: Any = "", target_value: Any = "") -> tuple[float, str]: |
| source = dict(source_metadata or {}) |
| target = dict(target_metadata or {}) |
| if not source.get("profile_layer") or not target.get("profile_layer"): |
| return 0.0, "" |
| source_key = _normalize(source.get("profile_support_key", "")) |
| target_key = _normalize(target.get("profile_support_key", "")) |
| source_domain = _normalize(source.get("profile_domain", "")) |
| target_domain = _normalize(target.get("profile_domain", "")) |
| source_type = _normalize(source.get("profile_type", "")) |
| target_type = _normalize(target.get("profile_type", "")) |
| source_tokens = set(_tokens(f"{source_value} {source.get('profile_domain_label', '')} {' '.join(source.get('profile_route_terms', []) or [])}")) |
| target_tokens = set(_tokens(f"{target_value} {target.get('profile_domain_label', '')} {' '.join(target.get('profile_route_terms', []) or [])}")) |
| overlap = len(source_tokens & target_tokens) / max(1, len(source_tokens | target_tokens)) if source_tokens or target_tokens else 0.0 |
| if source_key and source_key == target_key: |
| return round(0.72 + (0.14 * overlap), 6), "profile_support" |
| if source_domain and source_domain == target_domain: |
| return round(0.58 + (0.12 if source_type == target_type else 0.04) + (0.12 * overlap), 6), "profile_tunnel" |
| if source_type and source_type == target_type and overlap >= 0.22: |
| return round(0.44 + (0.18 * overlap), 6), "profile_soft_tunnel" |
| return 0.0, "" |
|
|