Spaces:
Paused
Paused
| import base64 | |
| import json | |
| import logging | |
| import secrets | |
| from typing import Optional | |
| import click | |
| from flask import current_app | |
| from werkzeug.exceptions import NotFound | |
| from configs import dify_config | |
| from constants.languages import languages | |
| from core.rag.datasource.vdb.vector_factory import Vector | |
| from core.rag.datasource.vdb.vector_type import VectorType | |
| from core.rag.models.document import Document | |
| from events.app_event import app_was_created | |
| from extensions.ext_database import db | |
| from extensions.ext_redis import redis_client | |
| from libs.helper import email as email_validate | |
| from libs.password import hash_password, password_pattern, valid_password | |
| from libs.rsa import generate_key_pair | |
| from models import Tenant | |
| from models.dataset import Dataset, DatasetCollectionBinding, DocumentSegment | |
| from models.dataset import Document as DatasetDocument | |
| from models.model import Account, App, AppAnnotationSetting, AppMode, Conversation, MessageAnnotation | |
| from models.provider import Provider, ProviderModel | |
| from services.account_service import RegisterService, TenantService | |
| def reset_password(email, new_password, password_confirm): | |
| """ | |
| Reset password of owner account | |
| Only available in SELF_HOSTED mode | |
| """ | |
| if str(new_password).strip() != str(password_confirm).strip(): | |
| click.echo(click.style("Passwords do not match.", fg="red")) | |
| return | |
| account = db.session.query(Account).filter(Account.email == email).one_or_none() | |
| if not account: | |
| click.echo(click.style("Account not found for email: {}".format(email), fg="red")) | |
| return | |
| try: | |
| valid_password(new_password) | |
| except: | |
| click.echo(click.style("Invalid password. Must match {}".format(password_pattern), fg="red")) | |
| return | |
| # generate password salt | |
| salt = secrets.token_bytes(16) | |
| base64_salt = base64.b64encode(salt).decode() | |
| # encrypt password with salt | |
| password_hashed = hash_password(new_password, salt) | |
| base64_password_hashed = base64.b64encode(password_hashed).decode() | |
| account.password = base64_password_hashed | |
| account.password_salt = base64_salt | |
| db.session.commit() | |
| click.echo(click.style("Password reset successfully.", fg="green")) | |
| def reset_email(email, new_email, email_confirm): | |
| """ | |
| Replace account email | |
| :return: | |
| """ | |
| if str(new_email).strip() != str(email_confirm).strip(): | |
| click.echo(click.style("New emails do not match.", fg="red")) | |
| return | |
| account = db.session.query(Account).filter(Account.email == email).one_or_none() | |
| if not account: | |
| click.echo(click.style("Account not found for email: {}".format(email), fg="red")) | |
| return | |
| try: | |
| email_validate(new_email) | |
| except: | |
| click.echo(click.style("Invalid email: {}".format(new_email), fg="red")) | |
| return | |
| account.email = new_email | |
| db.session.commit() | |
| click.echo(click.style("Email updated successfully.", fg="green")) | |
| def reset_encrypt_key_pair(): | |
| """ | |
| Reset the encrypted key pair of workspace for encrypt LLM credentials. | |
| After the reset, all LLM credentials will become invalid, requiring re-entry. | |
| Only support SELF_HOSTED mode. | |
| """ | |
| if dify_config.EDITION != "SELF_HOSTED": | |
| click.echo(click.style("This command is only for SELF_HOSTED installations.", fg="red")) | |
| return | |
| tenants = db.session.query(Tenant).all() | |
| for tenant in tenants: | |
| if not tenant: | |
| click.echo(click.style("No workspaces found. Run /install first.", fg="red")) | |
| return | |
| tenant.encrypt_public_key = generate_key_pair(tenant.id) | |
| db.session.query(Provider).filter(Provider.provider_type == "custom", Provider.tenant_id == tenant.id).delete() | |
| db.session.query(ProviderModel).filter(ProviderModel.tenant_id == tenant.id).delete() | |
| db.session.commit() | |
| click.echo( | |
| click.style( | |
| "Congratulations! The asymmetric key pair of workspace {} has been reset.".format(tenant.id), | |
| fg="green", | |
| ) | |
| ) | |
| def vdb_migrate(scope: str): | |
| if scope in {"knowledge", "all"}: | |
| migrate_knowledge_vector_database() | |
| if scope in {"annotation", "all"}: | |
| migrate_annotation_vector_database() | |
| def migrate_annotation_vector_database(): | |
| """ | |
| Migrate annotation datas to target vector database . | |
| """ | |
| click.echo(click.style("Starting annotation data migration.", fg="green")) | |
| create_count = 0 | |
| skipped_count = 0 | |
| total_count = 0 | |
| page = 1 | |
| while True: | |
| try: | |
| # get apps info | |
| apps = ( | |
| db.session.query(App) | |
| .filter(App.status == "normal") | |
| .order_by(App.created_at.desc()) | |
| .paginate(page=page, per_page=50) | |
| ) | |
| except NotFound: | |
| break | |
| page += 1 | |
| for app in apps: | |
| total_count = total_count + 1 | |
| click.echo( | |
| f"Processing the {total_count} app {app.id}. " + f"{create_count} created, {skipped_count} skipped." | |
| ) | |
| try: | |
| click.echo("Creating app annotation index: {}".format(app.id)) | |
| app_annotation_setting = ( | |
| db.session.query(AppAnnotationSetting).filter(AppAnnotationSetting.app_id == app.id).first() | |
| ) | |
| if not app_annotation_setting: | |
| skipped_count = skipped_count + 1 | |
| click.echo("App annotation setting disabled: {}".format(app.id)) | |
| continue | |
| # get dataset_collection_binding info | |
| dataset_collection_binding = ( | |
| db.session.query(DatasetCollectionBinding) | |
| .filter(DatasetCollectionBinding.id == app_annotation_setting.collection_binding_id) | |
| .first() | |
| ) | |
| if not dataset_collection_binding: | |
| click.echo("App annotation collection binding not found: {}".format(app.id)) | |
| continue | |
| annotations = db.session.query(MessageAnnotation).filter(MessageAnnotation.app_id == app.id).all() | |
| dataset = Dataset( | |
| id=app.id, | |
| tenant_id=app.tenant_id, | |
| indexing_technique="high_quality", | |
| embedding_model_provider=dataset_collection_binding.provider_name, | |
| embedding_model=dataset_collection_binding.model_name, | |
| collection_binding_id=dataset_collection_binding.id, | |
| ) | |
| documents = [] | |
| if annotations: | |
| for annotation in annotations: | |
| document = Document( | |
| page_content=annotation.question, | |
| metadata={"annotation_id": annotation.id, "app_id": app.id, "doc_id": annotation.id}, | |
| ) | |
| documents.append(document) | |
| vector = Vector(dataset, attributes=["doc_id", "annotation_id", "app_id"]) | |
| click.echo(f"Migrating annotations for app: {app.id}.") | |
| try: | |
| vector.delete() | |
| click.echo(click.style(f"Deleted vector index for app {app.id}.", fg="green")) | |
| except Exception as e: | |
| click.echo(click.style(f"Failed to delete vector index for app {app.id}.", fg="red")) | |
| raise e | |
| if documents: | |
| try: | |
| click.echo( | |
| click.style( | |
| f"Creating vector index with {len(documents)} annotations for app {app.id}.", | |
| fg="green", | |
| ) | |
| ) | |
| vector.create(documents) | |
| click.echo(click.style(f"Created vector index for app {app.id}.", fg="green")) | |
| except Exception as e: | |
| click.echo(click.style(f"Failed to created vector index for app {app.id}.", fg="red")) | |
| raise e | |
| click.echo(f"Successfully migrated app annotation {app.id}.") | |
| create_count += 1 | |
| except Exception as e: | |
| click.echo( | |
| click.style( | |
| "Error creating app annotation index: {} {}".format(e.__class__.__name__, str(e)), fg="red" | |
| ) | |
| ) | |
| continue | |
| click.echo( | |
| click.style( | |
| f"Migration complete. Created {create_count} app annotation indexes. Skipped {skipped_count} apps.", | |
| fg="green", | |
| ) | |
| ) | |
| def migrate_knowledge_vector_database(): | |
| """ | |
| Migrate vector database datas to target vector database . | |
| """ | |
| click.echo(click.style("Starting vector database migration.", fg="green")) | |
| create_count = 0 | |
| skipped_count = 0 | |
| total_count = 0 | |
| vector_type = dify_config.VECTOR_STORE | |
| upper_colletion_vector_types = { | |
| VectorType.MILVUS, | |
| VectorType.PGVECTOR, | |
| VectorType.RELYT, | |
| VectorType.WEAVIATE, | |
| VectorType.ORACLE, | |
| VectorType.ELASTICSEARCH, | |
| } | |
| lower_colletion_vector_types = { | |
| VectorType.ANALYTICDB, | |
| VectorType.CHROMA, | |
| VectorType.MYSCALE, | |
| VectorType.PGVECTO_RS, | |
| VectorType.TIDB_VECTOR, | |
| VectorType.OPENSEARCH, | |
| VectorType.TENCENT, | |
| VectorType.BAIDU, | |
| VectorType.VIKINGDB, | |
| VectorType.UPSTASH, | |
| VectorType.COUCHBASE, | |
| VectorType.OCEANBASE, | |
| } | |
| page = 1 | |
| while True: | |
| try: | |
| datasets = ( | |
| db.session.query(Dataset) | |
| .filter(Dataset.indexing_technique == "high_quality") | |
| .order_by(Dataset.created_at.desc()) | |
| .paginate(page=page, per_page=50) | |
| ) | |
| except NotFound: | |
| break | |
| page += 1 | |
| for dataset in datasets: | |
| total_count = total_count + 1 | |
| click.echo( | |
| f"Processing the {total_count} dataset {dataset.id}. {create_count} created, {skipped_count} skipped." | |
| ) | |
| try: | |
| click.echo("Creating dataset vector database index: {}".format(dataset.id)) | |
| if dataset.index_struct_dict: | |
| if dataset.index_struct_dict["type"] == vector_type: | |
| skipped_count = skipped_count + 1 | |
| continue | |
| collection_name = "" | |
| dataset_id = dataset.id | |
| if vector_type in upper_colletion_vector_types: | |
| collection_name = Dataset.gen_collection_name_by_id(dataset_id) | |
| elif vector_type == VectorType.QDRANT: | |
| if dataset.collection_binding_id: | |
| dataset_collection_binding = ( | |
| db.session.query(DatasetCollectionBinding) | |
| .filter(DatasetCollectionBinding.id == dataset.collection_binding_id) | |
| .one_or_none() | |
| ) | |
| if dataset_collection_binding: | |
| collection_name = dataset_collection_binding.collection_name | |
| else: | |
| raise ValueError("Dataset Collection Binding not found") | |
| else: | |
| collection_name = Dataset.gen_collection_name_by_id(dataset_id) | |
| elif vector_type in lower_colletion_vector_types: | |
| collection_name = Dataset.gen_collection_name_by_id(dataset_id).lower() | |
| else: | |
| raise ValueError(f"Vector store {vector_type} is not supported.") | |
| index_struct_dict = {"type": vector_type, "vector_store": {"class_prefix": collection_name}} | |
| dataset.index_struct = json.dumps(index_struct_dict) | |
| vector = Vector(dataset) | |
| click.echo(f"Migrating dataset {dataset.id}.") | |
| try: | |
| vector.delete() | |
| click.echo( | |
| click.style(f"Deleted vector index {collection_name} for dataset {dataset.id}.", fg="green") | |
| ) | |
| except Exception as e: | |
| click.echo( | |
| click.style( | |
| f"Failed to delete vector index {collection_name} for dataset {dataset.id}.", fg="red" | |
| ) | |
| ) | |
| raise e | |
| dataset_documents = ( | |
| db.session.query(DatasetDocument) | |
| .filter( | |
| DatasetDocument.dataset_id == dataset.id, | |
| DatasetDocument.indexing_status == "completed", | |
| DatasetDocument.enabled == True, | |
| DatasetDocument.archived == False, | |
| ) | |
| .all() | |
| ) | |
| documents = [] | |
| segments_count = 0 | |
| for dataset_document in dataset_documents: | |
| segments = ( | |
| db.session.query(DocumentSegment) | |
| .filter( | |
| DocumentSegment.document_id == dataset_document.id, | |
| DocumentSegment.status == "completed", | |
| DocumentSegment.enabled == True, | |
| ) | |
| .all() | |
| ) | |
| for segment in segments: | |
| document = Document( | |
| page_content=segment.content, | |
| metadata={ | |
| "doc_id": segment.index_node_id, | |
| "doc_hash": segment.index_node_hash, | |
| "document_id": segment.document_id, | |
| "dataset_id": segment.dataset_id, | |
| }, | |
| ) | |
| documents.append(document) | |
| segments_count = segments_count + 1 | |
| if documents: | |
| try: | |
| click.echo( | |
| click.style( | |
| f"Creating vector index with {len(documents)} documents of {segments_count}" | |
| f" segments for dataset {dataset.id}.", | |
| fg="green", | |
| ) | |
| ) | |
| vector.create(documents) | |
| click.echo(click.style(f"Created vector index for dataset {dataset.id}.", fg="green")) | |
| except Exception as e: | |
| click.echo(click.style(f"Failed to created vector index for dataset {dataset.id}.", fg="red")) | |
| raise e | |
| db.session.add(dataset) | |
| db.session.commit() | |
| click.echo(f"Successfully migrated dataset {dataset.id}.") | |
| create_count += 1 | |
| except Exception as e: | |
| db.session.rollback() | |
| click.echo( | |
| click.style("Error creating dataset index: {} {}".format(e.__class__.__name__, str(e)), fg="red") | |
| ) | |
| continue | |
| click.echo( | |
| click.style( | |
| f"Migration complete. Created {create_count} dataset indexes. Skipped {skipped_count} datasets.", fg="green" | |
| ) | |
| ) | |
| def convert_to_agent_apps(): | |
| """ | |
| Convert Agent Assistant to Agent App. | |
| """ | |
| click.echo(click.style("Starting convert to agent apps.", fg="green")) | |
| proceeded_app_ids = [] | |
| while True: | |
| # fetch first 1000 apps | |
| sql_query = """SELECT a.id AS id FROM apps a | |
| INNER JOIN app_model_configs am ON a.app_model_config_id=am.id | |
| WHERE a.mode = 'chat' | |
| AND am.agent_mode is not null | |
| AND ( | |
| am.agent_mode like '%"strategy": "function_call"%' | |
| OR am.agent_mode like '%"strategy": "react"%' | |
| ) | |
| AND ( | |
| am.agent_mode like '{"enabled": true%' | |
| OR am.agent_mode like '{"max_iteration": %' | |
| ) ORDER BY a.created_at DESC LIMIT 1000 | |
| """ | |
| with db.engine.begin() as conn: | |
| rs = conn.execute(db.text(sql_query)) | |
| apps = [] | |
| for i in rs: | |
| app_id = str(i.id) | |
| if app_id not in proceeded_app_ids: | |
| proceeded_app_ids.append(app_id) | |
| app = db.session.query(App).filter(App.id == app_id).first() | |
| apps.append(app) | |
| if len(apps) == 0: | |
| break | |
| for app in apps: | |
| click.echo("Converting app: {}".format(app.id)) | |
| try: | |
| app.mode = AppMode.AGENT_CHAT.value | |
| db.session.commit() | |
| # update conversation mode to agent | |
| db.session.query(Conversation).filter(Conversation.app_id == app.id).update( | |
| {Conversation.mode: AppMode.AGENT_CHAT.value} | |
| ) | |
| db.session.commit() | |
| click.echo(click.style("Converted app: {}".format(app.id), fg="green")) | |
| except Exception as e: | |
| click.echo(click.style("Convert app error: {} {}".format(e.__class__.__name__, str(e)), fg="red")) | |
| click.echo(click.style("Conversion complete. Converted {} agent apps.".format(len(proceeded_app_ids)), fg="green")) | |
| def add_qdrant_doc_id_index(field: str): | |
| click.echo(click.style("Starting Qdrant doc_id index creation.", fg="green")) | |
| vector_type = dify_config.VECTOR_STORE | |
| if vector_type != "qdrant": | |
| click.echo(click.style("This command only supports Qdrant vector store.", fg="red")) | |
| return | |
| create_count = 0 | |
| try: | |
| bindings = db.session.query(DatasetCollectionBinding).all() | |
| if not bindings: | |
| click.echo(click.style("No dataset collection bindings found.", fg="red")) | |
| return | |
| import qdrant_client | |
| from qdrant_client.http.exceptions import UnexpectedResponse | |
| from qdrant_client.http.models import PayloadSchemaType | |
| from core.rag.datasource.vdb.qdrant.qdrant_vector import QdrantConfig | |
| for binding in bindings: | |
| if dify_config.QDRANT_URL is None: | |
| raise ValueError("Qdrant URL is required.") | |
| qdrant_config = QdrantConfig( | |
| endpoint=dify_config.QDRANT_URL, | |
| api_key=dify_config.QDRANT_API_KEY, | |
| root_path=current_app.root_path, | |
| timeout=dify_config.QDRANT_CLIENT_TIMEOUT, | |
| grpc_port=dify_config.QDRANT_GRPC_PORT, | |
| prefer_grpc=dify_config.QDRANT_GRPC_ENABLED, | |
| ) | |
| try: | |
| client = qdrant_client.QdrantClient(**qdrant_config.to_qdrant_params()) | |
| # create payload index | |
| client.create_payload_index(binding.collection_name, field, field_schema=PayloadSchemaType.KEYWORD) | |
| create_count += 1 | |
| except UnexpectedResponse as e: | |
| # Collection does not exist, so return | |
| if e.status_code == 404: | |
| click.echo(click.style(f"Collection not found: {binding.collection_name}.", fg="red")) | |
| continue | |
| # Some other error occurred, so re-raise the exception | |
| else: | |
| click.echo( | |
| click.style( | |
| f"Failed to create Qdrant index for collection: {binding.collection_name}.", fg="red" | |
| ) | |
| ) | |
| except Exception as e: | |
| click.echo(click.style("Failed to create Qdrant client.", fg="red")) | |
| click.echo(click.style(f"Index creation complete. Created {create_count} collection indexes.", fg="green")) | |
| def create_tenant(email: str, language: Optional[str] = None, name: Optional[str] = None): | |
| """ | |
| Create tenant account | |
| """ | |
| if not email: | |
| click.echo(click.style("Email is required.", fg="red")) | |
| return | |
| # Create account | |
| email = email.strip() | |
| if "@" not in email: | |
| click.echo(click.style("Invalid email address.", fg="red")) | |
| return | |
| account_name = email.split("@")[0] | |
| if language not in languages: | |
| language = "en-US" | |
| name = name.strip() | |
| # generate random password | |
| new_password = secrets.token_urlsafe(16) | |
| # register account | |
| account = RegisterService.register(email=email, name=account_name, password=new_password, language=language) | |
| TenantService.create_owner_tenant_if_not_exist(account, name) | |
| click.echo( | |
| click.style( | |
| "Account and tenant created.\nAccount: {}\nPassword: {}".format(email, new_password), | |
| fg="green", | |
| ) | |
| ) | |
| def upgrade_db(): | |
| click.echo("Preparing database migration...") | |
| lock = redis_client.lock(name="db_upgrade_lock", timeout=60) | |
| if lock.acquire(blocking=False): | |
| try: | |
| click.echo(click.style("Starting database migration.", fg="green")) | |
| # run db migration | |
| import flask_migrate | |
| flask_migrate.upgrade() | |
| click.echo(click.style("Database migration successful!", fg="green")) | |
| except Exception as e: | |
| logging.exception(f"Database migration failed: {e}") | |
| finally: | |
| lock.release() | |
| else: | |
| click.echo("Database migration skipped") | |
| def fix_app_site_missing(): | |
| """ | |
| Fix app related site missing issue. | |
| """ | |
| click.echo(click.style("Starting fix for missing app-related sites.", fg="green")) | |
| failed_app_ids = [] | |
| while True: | |
| sql = """select apps.id as id from apps left join sites on sites.app_id=apps.id | |
| where sites.id is null limit 1000""" | |
| with db.engine.begin() as conn: | |
| rs = conn.execute(db.text(sql)) | |
| processed_count = 0 | |
| for i in rs: | |
| processed_count += 1 | |
| app_id = str(i.id) | |
| if app_id in failed_app_ids: | |
| continue | |
| try: | |
| app = db.session.query(App).filter(App.id == app_id).first() | |
| tenant = app.tenant | |
| if tenant: | |
| accounts = tenant.get_accounts() | |
| if not accounts: | |
| print("Fix failed for app {}".format(app.id)) | |
| continue | |
| account = accounts[0] | |
| print("Fixing missing site for app {}".format(app.id)) | |
| app_was_created.send(app, account=account) | |
| except Exception as e: | |
| failed_app_ids.append(app_id) | |
| click.echo(click.style("Failed to fix missing site for app {}".format(app_id), fg="red")) | |
| logging.exception(f"Fix app related site missing issue failed, error: {e}") | |
| continue | |
| if not processed_count: | |
| break | |
| click.echo(click.style("Fix for missing app-related sites completed successfully!", fg="green")) | |
| def register_commands(app): | |
| app.cli.add_command(reset_password) | |
| app.cli.add_command(reset_email) | |
| app.cli.add_command(reset_encrypt_key_pair) | |
| app.cli.add_command(vdb_migrate) | |
| app.cli.add_command(convert_to_agent_apps) | |
| app.cli.add_command(add_qdrant_doc_id_index) | |
| app.cli.add_command(create_tenant) | |
| app.cli.add_command(upgrade_db) | |
| app.cli.add_command(fix_app_site_missing) | |