| | from django.shortcuts import render |
| | from django.http import JsonResponse |
| | from .forms import ImageUploadForm, ClassificationForm, RegisterFaceForm,TranscribeForm, YouTubeURLForm |
| | import shutil |
| | from django.conf import settings |
| | import torch |
| | import json |
| | import os |
| | from PIL import Image as PILImage |
| | import io |
| | import tempfile |
| | from django.core.cache import cache |
| | import numpy as numpy_lib |
| | import pickle |
| | from deepface import DeepFace |
| | import cv2 |
| | import base64 |
| | from io import BytesIO |
| | from . import globals |
| | import tempfile |
| | import mimetypes |
| | import subprocess |
| | import logging |
| | import uuid |
| | import yt_dlp |
| | import time |
| | import re |
| | from pydub import AudioSegment |
| | import pandas as pd |
| | import csv |
| | |
| |
|
| | |
| | logger = logging.getLogger(__name__) |
| | |
| | |
| |
|
| | |
| |
|
| |
|
| | model = globals.model |
| | tokenizer = globals.tokenizer |
| | devlab_image = globals.devlab_image |
| |
|
| | with open(f"{globals.save_path}/label_map.json", "r") as f: |
| | label_map = json.load(f) |
| |
|
| | index_to_label = {v: k for k, v in label_map.items()} |
| |
|
| |
|
| | |
| | def home(request): |
| | return render(request, 'home.html') |
| |
|
| |
|
| | def classification(request): |
| | from .library import simple_keyword_extraction, apify_scraper, priority_indexer, websearch, lowyat_crawler, sentiment_analyzer |
| |
|
| | if request.method == 'POST': |
| | progress_key = request.POST.get("progress_key", str(uuid.uuid4())) |
| | cache.set(progress_key, {'stage': 'starting', 'percent': 0}) |
| |
|
| | text = request.POST.get("claim", "") |
| | if not text: |
| | return JsonResponse({"error": "No text provided"}, status=400) |
| | |
| | claim_id = str(uuid.uuid4())[:8] |
| | |
| | try: |
| | |
| | cache.set(progress_key, {'stage': 'classifying', 'percent': 10}) |
| | inputs = tokenizer(text, return_tensors='pt', padding=True, truncation=True) |
| | with torch.no_grad(): |
| | outputs = model(**inputs) |
| | prediction = torch.argmax(outputs.logits, dim=-1).item() |
| | classification_result = index_to_label.get(prediction, "Unknown") |
| |
|
| | |
| | cache.set(progress_key, {'stage': 'extracting_keywords', 'percent': 20}) |
| | keywords = simple_keyword_extraction.extract_keywords(text) |
| |
|
| | |
| | output_path = os.path.join(settings.BASE_DIR, 'ai_api', 'library', 'output') |
| | report_path = os.path.join(settings.BASE_DIR, 'ai_api', 'library', 'reports') |
| | raw_data_path = os.path.join(output_path, f'{claim_id}.csv') |
| |
|
| | |
| | cache.set(progress_key, {'stage': 'scraping_tiktok', 'percent': 30}) |
| | apify_scraper.run( |
| | keywords, |
| | output_path=raw_data_path, |
| | ) |
| |
|
| | |
| | cache.set(progress_key, {'stage': 'searching_web', 'percent': 50}) |
| | web_search_results = websearch.run( |
| | keywords, |
| | output_path=os.path.join(output_path, f"{claim_id}_web.json"), |
| | full_claim=text |
| | ) |
| |
|
| | |
| | cache.set(progress_key, {'stage': 'crawling_forum', 'percent': 60}) |
| | lowyat_path = os.path.join(output_path, f"{claim_id}_lowyat.csv") |
| | lowyat_sections = ["Kopitiam", "SeriousKopitiam"] |
| | lowyat_results = lowyat_crawler.run( |
| | keywords, |
| | sections=lowyat_sections, |
| | output_path=lowyat_path, |
| | full_claim=text |
| | ) |
| |
|
| | |
| | cache.set(progress_key, {'stage': 'combining_data', 'percent': 70}) |
| | if os.path.exists(lowyat_path): |
| | lowyat_df = pd.read_csv(lowyat_path) |
| | if os.path.exists(raw_data_path): |
| | main_df = pd.read_csv(raw_data_path) |
| | combined_df = pd.concat([main_df, lowyat_df], ignore_index=True) |
| | combined_df.to_csv(raw_data_path, index=False) |
| | else: |
| | lowyat_df.to_csv(raw_data_path, index=False) |
| |
|
| | |
| | cache.set(progress_key, {'stage': 'analyzing_sentiment', 'percent': 80}) |
| | sentiment_csv = os.path.join(output_path, f"{claim_id}_sentiment.csv") |
| | sentiment_data = {} |
| | |
| | if os.path.exists(raw_data_path): |
| | sentiment_analyzer.run(raw_data_path, sentiment_csv) |
| | |
| | if os.path.exists(sentiment_csv): |
| | sentiment_df = pd.read_csv(sentiment_csv) |
| | sentiment_counts = sentiment_df['sentiment'].value_counts().to_dict() |
| | sentiment_map = {0: "neutral", 1: "positive", 2: "negative"} |
| | text_counts = {sentiment_map.get(k, k): v for k, v in sentiment_counts.items()} |
| | sentiment_data = { |
| | 'counts': text_counts, |
| | 'table_html': csv_to_html_table(sentiment_csv) |
| | } |
| |
|
| | |
| | cache.set(progress_key, {'stage': 'indexing_priority', 'percent': 90}) |
| | priority_json = os.path.join(report_path, f"{claim_id}_priority.json") |
| | priority_data = {} |
| | |
| | if os.path.exists(sentiment_csv): |
| | priority_indexer.run( |
| | claim=text, |
| | claim_id=claim_id, |
| | keywords=keywords, |
| | sentiment_csv=sentiment_csv, |
| | output_path=priority_json |
| | ) |
| |
|
| | if os.path.exists(priority_json): |
| | with open(priority_json, 'r') as f: |
| | priority_data = json.load(f) |
| | verdict = determine_verdict(priority_data) |
| |
|
| | |
| | cache.set(progress_key, {'stage': 'complete', 'percent': 100}) |
| |
|
| | return JsonResponse({ |
| | 'classification': classification_result, |
| | 'keywords': keywords, |
| | 'sentiment_data': sentiment_data, |
| | 'priority_data': priority_data, |
| | 'verdict': verdict if 'verdict' in locals() else "UNVERIFIED", |
| | 'progress_key': progress_key |
| | }) |
| |
|
| | except Exception as e: |
| | logger.error(f"Error in classification: {str(e)}") |
| | return JsonResponse({ |
| | 'error': str(e), |
| | 'progress_key': progress_key |
| | }, status=500) |
| |
|
| | else: |
| | form = ClassificationForm() |
| | return render(request, 'classification.html', { |
| | 'form': form, |
| | 'result': {} |
| | }) |
| |
|
| | def determine_verdict(priority_data): |
| | """Determine verdict based on priority data""" |
| | |
| | if isinstance(priority_data, dict): |
| | if "priority_flags" in priority_data: |
| | priority_flags = priority_data["priority_flags"] |
| | else: |
| | |
| | priority_flags = priority_data |
| | else: |
| | return "UNVERIFIED" |
| |
|
| | |
| | sentiment_counts = {} |
| | if "sentiment_counts" in priority_data: |
| | sentiment_counts = priority_data["sentiment_counts"] |
| | |
| | if any(not isinstance(k, str) for k in sentiment_counts.keys()): |
| | sentiment_counts = {str(k): v for k, v in sentiment_counts.items()} |
| |
|
| | |
| | priority_score = priority_data.get("priority_score", sum(priority_flags.values())) |
| |
|
| | |
| | claim = priority_data.get("claim", "").lower() |
| | keywords = priority_data.get("keywords", []) |
| | keywords_lower = [k.lower() for k in keywords] |
| |
|
| | |
| | is_azan_claim = any(word in claim for word in ["azan", "larang", "masjid", "pembesar suara"]) |
| | is_religious_claim = any(word in claim for word in ["islam", "agama", "masjid", "surau", "sembahyang", "solat", "zakat"]) |
| |
|
| | |
| | economic_related = priority_flags.get("economic_impact", 0) == 1 |
| |
|
| | |
| | government_related = priority_flags.get("affects_government", 0) == 1 |
| |
|
| | |
| | law_related = priority_flags.get("law_related", 0) == 1 |
| |
|
| | |
| | causes_confusion = priority_flags.get("cause_confusion", 0) == 1 |
| |
|
| | |
| | negative_dominant = False |
| | if sentiment_counts: |
| | pos = int(sentiment_counts.get("positive", sentiment_counts.get("1", 0))) |
| | neg = int(sentiment_counts.get("negative", sentiment_counts.get("2", 0))) |
| | neu = int(sentiment_counts.get("neutral", sentiment_counts.get("0", 0))) |
| | negative_dominant = neg > pos and neg > neu |
| |
|
| | |
| | if is_azan_claim and is_religious_claim and "larangan" in claim: |
| | return "FALSE" |
| |
|
| | |
| | if priority_score >= 7.0 and negative_dominant and (government_related or law_related): |
| | return "FALSE" |
| | elif priority_score >= 5.0 and causes_confusion: |
| | return "PARTIALLY_TRUE" |
| | elif priority_score <= 3.0 and not negative_dominant: |
| | return "TRUE" |
| | elif economic_related and government_related: |
| | |
| | if negative_dominant: |
| | return "FALSE" |
| | elif causes_confusion: |
| | return "PARTIALLY_TRUE" |
| | else: |
| | return "TRUE" |
| | else: |
| | return "UNVERIFIED" |
| | |
| | def image_profiling(request): |
| | |
| | |
| | result = None |
| | image_with_labels = None |
| | cropped_faces_base64 = [] |
| | texts = None |
| | proccessed = False |
| | uploded_base64 = None |
| | exifs = None |
| | metadata = None |
| | description = None |
| | reverse_images = None |
| |
|
| | if request.method == 'POST': |
| | form = ImageUploadForm(request.POST, request.FILES) |
| | if form.is_valid(): |
| | proccessed = True |
| | uploaded_image = request.FILES['image'] |
| | |
| | with tempfile.NamedTemporaryFile(delete=False, suffix=".jpg") as tmp: |
| | for chunk in uploaded_image.chunks(): |
| | tmp.write(chunk) |
| | tmp_path = tmp.name |
| |
|
| | image = PILImage.open(uploaded_image) |
| | image_np = numpy_lib.array(image.convert('RGB')) |
| | exifs = devlab_image.extract_exif(tmp_path) |
| | metadata = devlab_image.extract_metadata_exiftool(tmp_path) |
| | description = devlab_image.generate_description_blip(tmp_path) |
| | |
| | |
| | buffered = io.BytesIO() |
| | image.save(buffered, format="PNG") |
| | img_str = base64.b64encode(buffered.getvalue()).decode("utf-8") |
| | uploded_base64 = f"data:image/png;base64,{img_str}" |
| | |
| | texts = devlab_image.extract_text_numpy(image_np) |
| |
|
| | |
| | |
| | face_embeddings = DeepFace.represent(image_np, model_name="Facenet", enforce_detection=False) |
| | |
| |
|
| | if not face_embeddings: |
| | return "❌ No faces detected in the image." |
| |
|
| | recognized_faces = {} |
| | cropped_faces = [] |
| |
|
| | for face_data in face_embeddings: |
| | query_embedding = numpy_lib.array(face_data["embedding"], dtype=numpy_lib.float32).reshape(1, -1) |
| |
|
| | results = devlab_image.query_embedding(query_embedding,1) |
| | if results and len(results) > 0 and len(results[0]) > 0: |
| | entity = results[0][0].entity |
| | print(f"Entity: {entity}") |
| |
|
| | face_name = entity.get('name') if entity else 'Unknown' |
| | fdescription = entity.get('short_description') if entity else '' |
| | if fdescription is None: |
| | fdescription = '' |
| |
|
| | distance = round(results[0][0].distance, 4) |
| |
|
| | if distance*100>95: |
| | face_name = f"{face_name} (CLOSEST)" |
| | |
| | recognized_faces[f"clip_{len(recognized_faces) + 1}"] = { |
| | "name": face_name, |
| | "distance": distance, |
| | "description": fdescription, |
| | } |
| |
|
| | |
| | face_location = face_data["facial_area"] |
| | x, y, w, h = face_location["x"], face_location["y"], face_location["w"], face_location["h"] |
| |
|
| | |
| | |
| | cv2.rectangle(image_np, (x, y), (x + w, y + h), (0, 255, 0), 2) |
| |
|
| | |
| | cropped_face = image_np[y:y + h, x:x + w] |
| | cropped_faces.append([cropped_face, face_name, distance, fdescription]) |
| |
|
| | |
| |
|
| | else: |
| | print('No result found') |
| |
|
| | |
| |
|
| | |
| | _, buffer = cv2.imencode('.png', image_np) |
| | image_base64 = base64.b64encode(buffer).decode('utf-8') |
| |
|
| | |
| | cropped_faces_base64 = [] |
| | for face, face_name, distance, fdescription in cropped_faces: |
| | _, buffer = cv2.imencode('.png', face) |
| | face_base64 = base64.b64encode(buffer).decode('utf-8') |
| | cropped_faces_base64.append([f"data:image/png;base64,{face_base64}",face_name, distance, fdescription]) |
| |
|
| | |
| | result = recognized_faces |
| | image_with_labels = f"data:image/png;base64,{image_base64}" |
| | |
| |
|
| | else: |
| | form = ImageUploadForm() |
| |
|
| | return render(request, 'image_profiling.html', { |
| | 'form': form, |
| | 'proccessed' : proccessed, |
| | 'uploaded_base64': uploded_base64, |
| | 'image_with_labels': image_with_labels, |
| | 'cropped_faces': cropped_faces_base64, |
| | 'texts': texts, |
| | 'exifs': exifs, |
| | 'metadata': metadata, |
| | 'description': description, |
| | 'reverse_images': reverse_images |
| | }) |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| |
|
| | |
| | |
| | |
| |
|
| | |
| | |
| | |
| | |
| |
|
| | |
| | |
| | |
| |
|
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| |
|
| | |
| | |
| |
|
| | |
| | |
| |
|
| | |
| | |
| |
|
| | |
| | |
| |
|
| | |
| | |
| |
|
| | |
| | |
| |
|
| | |
| | |
| | |
| |
|
| | |
| | |
| | |
| | |
| | |
| |
|
| | |
| | |
| | |
| |
|
| | |
| | |
| | |
| |
|
| | |
| | |
| | |
| |
|
| | |
| |
|
| | |
| |
|
| | |
| | |
| | |
| |
|
| | |
| | |
| | |
| | |
| | |
| | |
| |
|
| | |
| | |
| | |
| |
|
| | |
| | |
| |
|
| | |
| | |
| | |
| | |
| | |
| | |
| |
|
| |
|
| | def register_face(request): |
| | from ai_api.library.devlab_image import DevLabImage |
| | import os |
| | from django.core.files.storage import FileSystemStorage |
| | from django.conf import settings |
| |
|
| | result = None |
| | if request.method == 'POST': |
| | form = RegisterFaceForm(request.POST) |
| | person = request.POST.get("person", "").upper() |
| | keywords = request.POST.get("keywords", "") |
| | files = request.FILES.getlist('images') |
| |
|
| | devlab_image = DevLabImage() |
| | |
| | |
| | if files: |
| | print('Upload manual') |
| | project_root = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) |
| | upload_dir = os.path.join(project_root, 'people', person) |
| |
|
| | print(f"Saving to: {upload_dir}") |
| | os.makedirs(upload_dir, exist_ok=True) |
| |
|
| | fs = FileSystemStorage(location=upload_dir) |
| | |
| | for file in files: |
| | filename = fs.save(file.name, file) |
| | file_url = fs.url(filename) |
| | print(f"Saved: {file_url}") |
| | devlab_image.extract_face( person, keywords) |
| | else: |
| | print('Download from Google') |
| | devlab_image.register_person(person, keywords) |
| | |
| |
|
| | else: |
| | form = RegisterFaceForm() |
| | |
| |
|
| | return render(request, 'register_face.html', { |
| | 'form': form, |
| | 'result': result, |
| | }) |
| |
|
| | def check_progress(request, key): |
| | |
| | progress = cache.get(key, {'stage': 'downloading', 'percent': 0}) |
| | |
| | return JsonResponse(progress) |
| |
|
| | def handle_uploaded_file(file): |
| | mime_type, _ = mimetypes.guess_type(file.name) |
| |
|
| | with tempfile.NamedTemporaryFile(delete=False, suffix='.wav') as temp_audio_file: |
| | output_audio_file = temp_audio_file.name |
| |
|
| | if mime_type and mime_type.startswith('video'): |
| | |
| | with tempfile.NamedTemporaryFile(delete=False, suffix=os.path.splitext(file.name)[-1]) as temp_video_file: |
| | for chunk in file.chunks(): |
| | temp_video_file.write(chunk) |
| | video_path = temp_video_file.name |
| |
|
| | |
| | command = [ |
| | 'ffmpeg', |
| | '-y', |
| | '-i', video_path, |
| | '-vn', |
| | '-acodec', 'pcm_s16le', |
| | '-ar', '16000', |
| | '-ac', '1', |
| | output_audio_file |
| | ] |
| |
|
| | try: |
| | result = subprocess.run(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, check=True) |
| | print("FFmpeg stderr:", result.stderr.decode()) |
| | |
| | except subprocess.CalledProcessError as e: |
| | logger.error(f"ffmpeg failed with error: {e.stderr.decode()}") |
| | raise Exception(f"Audio extraction failed: {e.stderr.decode()}") |
| |
|
| | |
| | os.remove(video_path) |
| |
|
| | else: |
| | |
| | with open(output_audio_file, 'wb') as f: |
| | for chunk in file.chunks(): |
| | f.write(chunk) |
| |
|
| | return output_audio_file |
| |
|
| | def format_time(seconds): |
| | |
| | m, s = divmod(seconds, 60) |
| | h, m = divmod(m, 60) |
| | ms = int((s - int(s)) * 1000) |
| | return f"{int(h):02}:{int(m):02}:{int(s):02}.{ms:03}" |
| |
|
| | def generate_vtt(segments): |
| | |
| | vtt_content = "WEBVTT\n\n" |
| | |
| | for segment in segments: |
| | start_time = segment['start'] |
| | end_time = segment['end'] |
| | text = segment['text'] |
| | |
| | |
| | start_time_str = format_time(start_time) |
| | end_time_str = format_time(end_time) |
| | |
| | vtt_content += f"{start_time_str} --> {end_time_str}\n{text}\n\n" |
| | |
| | return vtt_content |
| |
|
| | def save_vtt(output_audio_file, vtt): |
| | base_name = os.path.splitext(os.path.basename(output_audio_file))[0] |
| | new_filename = base_name + ".vtt" |
| |
|
| | final_path = os.path.join(settings.MEDIA_ROOT, 'vtt', new_filename) |
| | os.makedirs(os.path.dirname(final_path), exist_ok=True) |
| |
|
| | with open(final_path, "w", encoding="utf-8") as f: |
| | f.write(vtt) |
| |
|
| | return final_path |
| |
|
| | def transcription(request): |
| | |
| | |
| | transcription = None |
| | error = None |
| | progress_key = str(uuid.uuid4()) |
| | |
| | if request.method == "POST": |
| | |
| | progress_key = request.POST.get("progress_key", progress_key) |
| | |
| | model = globals.whisper_model |
| | form = YouTubeURLForm(request.POST) |
| |
|
| | |
| | file = request.FILES.get('file') |
| | if file: |
| | |
| | |
| | |
| | |
| | output_audio_file = handle_uploaded_file(file) |
| | if os.path.getsize(output_audio_file) == 0: |
| | raise RuntimeError("FFmpeg produced an empty audio file.") |
| | |
| | print(f"transcribing : {output_audio_file}") |
| | cache.set(progress_key, {'stage': 'transcribing', 'percent': 100}) |
| | result = model.transcribe(output_audio_file,verbose=False) |
| | vtt = generate_vtt(result['segments']) |
| | vtt_file = save_vtt(output_audio_file, vtt) |
| |
|
| |
|
| | else: |
| | cache.set(progress_key, {'stage': 'downloading', 'percent': 0}) |
| | ansi_escape = re.compile(r'\x1B(?:[@-Z\\-_]|\[[0-?]*[ -/]*[@-~])') |
| |
|
| | def progress_hook(d): |
| | |
| | if d['status'] == 'downloading': |
| | |
| | percent_str = d.get('_percent_str', '0%').strip() |
| | clean_str = ansi_escape.sub('', percent_str).strip() |
| | |
| |
|
| | try: |
| | match = re.search(r'(\d+(?:\.\d+)?)', clean_str) |
| | if match: |
| | percent = float(match.group(1)) |
| | else: |
| | print("❌ Regex didn't match!") |
| | percent = 0 |
| | except Exception as e: |
| | print(f"❌ Error parsing percent: {e}") |
| | percent = 0 |
| |
|
| | |
| | cache.set(progress_key, {'stage': 'downloading', 'percent': percent}) |
| | |
| | url = request.POST.get('url') |
| | unique_id = str(uuid.uuid4()) |
| | temp_dir = tempfile.gettempdir() |
| | base_filename = f"temp_{unique_id}" |
| | download_path = f"{temp_dir}/{base_filename}.%(ext)s" |
| | |
| | output_audio_file = f"{temp_dir}/{base_filename}.mp3" |
| |
|
| | ydl_opts = { |
| | 'format': 'bestaudio/best', |
| | 'outtmpl': download_path, |
| | 'postprocessors': [{ |
| | 'key': 'FFmpegExtractAudio', |
| | 'preferredcodec': 'mp3', |
| | 'preferredquality': '192', |
| | }], |
| | 'progress_hooks': [progress_hook], |
| | 'quiet': True, |
| | 'no_warnings': True, |
| | 'noplaylist': True, |
| | } |
| | print(f"downloading : {url}") |
| | try: |
| | with yt_dlp.YoutubeDL(ydl_opts) as ydl: |
| | ydl.download([url]) |
| | print(f"transcribing : {output_audio_file}") |
| | cache.set(progress_key, {'stage': 'transcribing', 'percent': 100}) |
| | result = model.transcribe(output_audio_file,verbose=False) |
| | vtt = generate_vtt(result['segments']) |
| | vtt_file = save_vtt(output_audio_file,vtt) |
| | except Exception as e: |
| | error = str(e) |
| | |
| | |
| | |
| |
|
| | |
| | |
| | |
| | |
| | |
| | |
| |
|
| | |
| | |
| | |
| | |
| | |
| | |
| |
|
| | |
| |
|
| | |
| | |
| | |
| |
|
| | |
| | |
| |
|
| |
|
| | cache.set(progress_key, {'stage': 'done', 'percent': 100}) |
| |
|
| | filename = os.path.basename(output_audio_file) |
| | final_path = os.path.join(settings.MEDIA_ROOT, 'uploads', filename) |
| | os.makedirs(os.path.dirname(final_path), exist_ok=True) |
| | shutil.move(output_audio_file, final_path) |
| |
|
| | |
| |
|
| | |
| | file_url = settings.MEDIA_URL + 'uploads/' + filename |
| | audio_html = f'<audio controls><source src="{file_url}" type="audio/wav">Your browser does not support the audio element.</audio>' |
| |
|
| |
|
| | return JsonResponse({'text': result['text'], 'segments': result['segments'], 'audio_file': audio_html }) |
| | |
| | |
| | |
| |
|
| | |
| | |
| | |
| | |
| | |
| | |
| |
|
| | else: |
| | form = TranscribeForm() |
| |
|
| | return render(request, 'transcription.html', { |
| | 'form': form, |
| | 'transcription': transcription, |
| | 'error': error, |
| | 'progress_key': progress_key, |
| | }) |
| | |
| | def csv_to_html_table(filepath): |
| | def is_valid_url(url): |
| | |
| | url_pattern = re.compile( |
| | r'^https?://' |
| | r'([a-zA-Z0-9]([a-zA-Z0-9-]{0,61}[a-zA-Z0-9])?\.)+' |
| | r'[a-zA-Z]{2,}' |
| | r'(/[a-zA-Z0-9-._~:/?#[\]@!$&\'()*+,;=]*)?$' |
| | ) |
| | return bool(url_pattern.match(url)) |
| |
|
| | html = '<table id="dataset" class="table table-bordered mt-2 smaller">' |
| | with open(filepath, newline='') as csvfile: |
| | reader = csv.reader(csvfile) |
| | for i, row in enumerate(reader): |
| | if i == 0: |
| | html += '<thead>' |
| | html += "<tr>" + "".join(f"<th>{col}</th>" for col in row) + "</tr>" |
| | html += '</thead>' |
| | else: |
| | html += "<tr>" + "".join( |
| | f'<td><a href="{col}" target="_blank" rel="noopener noreferrer">{col}</a></td>' if is_valid_url(col) else f"<td>{col}</td>" |
| | for col in row |
| | ) + "</tr>" |
| | html += "</table>" |
| | return html |