File size: 6,974 Bytes
8e29341
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
#file utils.py
import os
from modules import app_constants, app_to_vectorstore,app_page_definitions,common_utils
from modules import app_logger
import json
import requests
import hashlib
import re, csv

# Use the logger from app_config
app_logger = app_logger.app_logger
work_dir = app_constants.WORKSPACE_DIRECTORY
system_content_file = metadata_path=app_constants.SYSTEM_CONTENT_DATA

def download_file(url):
    try:
        response = requests.get(url)
        response.raise_for_status()
        sanitized_filename = sanitize_filename(url.split('/')[-1])
        sanitized_local_path = os.path.join(app_constants.WORKSPACE_DIRECTORY+"/docs/", sanitized_filename)
        with open(sanitized_local_path, 'wb') as f:
            f.write(response.content)
        app_logger.info(f"File downloaded successfully: {sanitized_local_path}")
        return True
    except Exception as e:
        app_logger.error(f"Failed to download file from {url}. Error: {e}")
        return False

def index_file(local_path, module):
    try:
        status = app_to_vectorstore.get_chroma_index(local_path,module,True)
        app_logger.info(f"File indexed successfully: {local_path}")
    except Exception as e:
        app_logger.error(f"Failed to index file. Error: {e}")
        db.persist()
        db = None
    return status
    
def compute_md5(file_path):
    hash_md5 = hashlib.md5()
    try:
        with open(file_path, "rb") as f:
            for chunk in iter(lambda: f.read(4096), b""):
                hash_md5.update(chunk)
        return hash_md5.hexdigest()
    except Exception as e:
        app_logger.error(f"Error computing MD5 for {file_path}: {e}")
        return None

def sanitize_filename(filename):
    """Sanitize the filename by removing or replacing invalid characters and handling URLs."""

    # Extract filename from URL or file path
    filename = os.path.basename(filename)

    # Make the filename lowercase and replace spaces with underscores
    sanitized = filename.lower().replace(' ', '_')

    # Replace invalid characters with underscores
    sanitized = re.sub(r'[^\w\-_\.]', '_', sanitized)

    # Shorten the filename if it's too long
    max_length = 255  # Max length can be adjusted
    if len(sanitized) > max_length:
        # Keep the file extension if present
        file_parts = os.path.splitext(sanitized)
        ext = file_parts[1]
        sanitized = sanitized[:max_length - len(ext)] + ext
    return sanitized

def delete_files(work_dir=work_dir):
    for root, dirs, files in os.walk(work_dir, topdown=False):
        for name in files:
            file_path = os.path.join(root, name)
            try:
                os.unlink(file_path)
                app_logger.info(f"Deleted file: {file_path}")
            except Exception as e:
                app_logger.error(f"Failed to delete {file_path}. Reason: {e}")

        for name in dirs:
            dir_path = os.path.join(root, name)
            try:
                os.rmdir(dir_path)
                app_logger.info(f"Deleted directory: {dir_path}")
            except Exception as e:
                app_logger.error(f"Failed to delete {dir_path}. Reason: {e}")
    remove_local_uploads()

def save_uploaded_file(uploaded_file, uploads_path, sanitized_filename=None):
    if sanitized_filename is None:
        sanitized_filename = sanitize_filename(uploaded_file.name)
    file_path = os.path.join(uploads_path, sanitized_filename)

    with open(file_path, "wb") as f:
        f.write(uploaded_file.getbuffer())
    app_logger.info(f"File '{sanitized_filename}' uploaded to {uploads_path}")
    return file_path

def perform_file_operation(resource, operation):
    url = resource.get("url", "")
    content_type = resource.get("content_type", "")
    file_name = work_dir+"docs/" +sanitize_filename(url)
    if operation == "download":
        #print(file_name)
        if url:
            download_success = download_file(url)
            if download_success:
                app_logger.info(f"File {resource['name']} downloaded successfully.")
            else:
                app_logger.error(f"Failed to download file {resource['name']}.")
    elif operation == "learn":
        module = common_utils.get_content_mapping_to_module(content_type)
        # Handle 'learn' operation here if needed
        index_file(file_name, module)
    else:
        app_logger.error(f"Unknown operation: {operation}")


def get_indexed_files_for_page(page_id):
    try:
        filtered_files = []

        # Open and read the CSV file
        with open(os.path.join(work_dir, app_constants.PROCESSED_DOCS), mode='r', newline='', encoding='utf-8') as file:
            csv_reader = csv.reader(file)
            for row in csv_reader:
                # Check if the second item in the row matches the page_id
                if len(row) > 2 and row[1].lower() == page_id.lower():
                    # Extract just the file name from the full path (third item in the row)
                    file_name = os.path.basename(row[2])
                    filtered_files.append(file_name)

        return filtered_files
    except Exception as e:
        return []

def update_json_file(data, file_path):
    with open(file_path, "w") as file:
        json.dump(data, file, indent=4)

def load_json_data(file_path):
    with open(file_path, "r") as file:
        return json.load(file)
    
def handle_content_update(uploaded_file=None, manual_name="", manual_url="", selected_content_type=""):
    system_content_file = app_constants.SYSTEM_CONTENT_DATA  # Define before use
    uploads_directory = os.path.join(work_dir, "docs")  # Define before use
    file_data = load_json_data(system_content_file)

    if uploaded_file:
        filename = sanitize_filename(uploaded_file.name if uploaded_file else manual_name) 
        file_path = save_file(uploaded_file, filename, uploads_directory)
    else:
        filename = sanitize_filename(manual_url)
        file_path = save_file(uploaded_file, filename, uploads_directory) if uploaded_file else manual_url

    new_entry = {"name": filename, "url": file_path, "content_type": selected_content_type}
    file_data.append(new_entry)
    update_json_file(file_data, system_content_file)

def save_file(uploaded_file, filename, directory):
    if not os.path.exists(directory):
        os.makedirs(directory)
    file_path = os.path.join(directory, filename)
    with open(file_path, "wb") as file:
        file.write(uploaded_file.getbuffer())
    return file_path

def remove_local_uploads(file_path=app_constants.SYSTEM_CONTENT_DATA):
    # Read the JSON data from the file
    with open(file_path, 'r') as file:
        data = json.load(file)
    # Filter out entries where the 'url' points to a local file
    filtered_data = [entry for entry in data if not entry['url'].startswith('./')]
    # Write the filtered data back to the file
    with open(file_path, 'w') as file:
        json.dump(filtered_data, file, indent=4)