import csv import asyncio import pandas as pd import re from telethon import TelegramClient from telethon.tl.types import MessageEntityUrl, MessageEntityTextUrl from telethon.errors import UsernameInvalidError, UsernameNotOccupiedError, FloodWaitError from tqdm import tqdm from flask import Flask, jsonify, send_from_directory # Directory for storing files from flask import Flask, render_template, send_from_directory from threading import Thread from multiprocessing import Process, Queue import os from dataclasses import dataclass from datetime import datetime FILE_DIRECTORY = os.getcwd() # Current working directory # Flask App app = Flask(__name__) # Add these global variables at the top level of your file @dataclass class Progress: current: int = 0 total: int = 0 current_group: str = "" status: str = "Not started" last_update: str = datetime.now().strftime("%Y-%m-%d %H:%M:%S") batch_current: int = 0 batch_total: int = 0 batch_percentage: float = 0 progress = Progress() # š¹ Flask API Endpoints @app.route('/') def index(): """Show available files for download and current progress as an HTML page.""" files = os.listdir(FILE_DIRECTORY) return render_template("index.html", files=files, progress=progress) @app.route('/download/<filename>') def download_file(filename): """Allow downloading any file from the directory.""" return send_from_directory(FILE_DIRECTORY, filename, as_attachment=True) @app.route('/progress') def get_progress(): """Return current progress as JSON for AJAX updates.""" return jsonify({ 'current': progress.current, 'total': progress.total, 'current_group': progress.current_group, 'status': progress.status, 'last_update': progress.last_update, 'batch_current': progress.batch_current, 'batch_total': progress.batch_total, 'batch_percentage': progress.batch_percentage }) def run_flask(): app.run(host='0.0.0.0', port=7860) # Telegram API credentials API_ID = 25216912 API_HASH = 'f65f6050fe9b342a4996c59e4283ab5e' PHONE_NUMBER = "+967730426743"#"+967735201519" # File names GROUPS_CSV = "my_telegram_groups.csv" LINKS_CSV = "telegram_links.csv" CLEANED_LINKS_CSV = "telegram_links (7).csv" FINAL_RESULTS_CSV = "final_results.csv" PHONE_NUMBERS = [ "+967730426743", "+967730446721", "+967730436848" ] # Your phone numbers with country code SESSION_DIRS = [ "session/mbot1", "session/mbot2", "session/mbot3" ] async def get_my_groups(client): """Step 1: Get all groups the user is member of""" print("\nš± Fetching all your Telegram groups...") dialogs = await client.get_dialogs() groups = [d for d in dialogs if d.is_group or d.is_channel] with open(GROUPS_CSV, mode="a", newline="", encoding="utf-8") as file: writer = csv.writer(file) writer.writerow(["phone_number","group_name", "link", "group_id", "member_count"]) for group in tqdm(groups, desc="Processing groups"): group_name = group.title group_id = group.entity.id username = (f"https://t.me/{group.entity.username}" if hasattr(group.entity, "username") and group.entity.username else "private_group") try: entity = await client.get_entity(group_id) member_count = (entity.participants_count if hasattr(entity, "participants_count") else "unknown") except: member_count = "unknown" writer.writerow([PHONE_NUMBER,group_name, username, group_id, member_count]) print(f"ā Groups data saved to {GROUPS_CSV}") return groups async def process_all_groups(client): """Step 2: Extract all Telegram links from the groups""" global progress progress.status = "Starting" progress.last_update = datetime.now().strftime("%Y-%m-%d %H:%M:%S") print("\nš Extracting Telegram links from all groups...") # Read groups from CSV file groups_df = pd.read_csv(GROUPS_CSV) progress.total = len(groups_df) progress.current = 0 # Create a set to track unique combinations in memory seen_links = set() MESSAGE_LIMIT = 300000 # Maximum messages to process per group # Open CSV file in write mode first to create/clear it with open(LINKS_CSV, mode='a', newline='', encoding='utf-8') as file: writer = csv.writer(file) writer.writerow(['link', 'source_group']) def clean_url(url): """Clean and validate a single URL""" if isinstance(url, str): url = url.strip() if url.startswith('tps://'): url = url.replace('tps://', 'https://') if '"' in url: url = url.split('"')[0] if ')' in url: url = url.split(')')[0] if ' ' in url or '\n' in url: url = url.split()[0] if not url.startswith('https://t.me/'): return None if url.count('/') > 3: return None return url return None def save_links_batch(links_batch): """Save a batch of links""" with open(LINKS_CSV, mode='a', newline='', encoding='utf-8') as file: writer = csv.writer(file) for url, group_id in links_batch: if (url, group_id) not in seen_links: seen_links.add((url, group_id)) writer.writerow([url, group_id]) async def process_message(message, group_id): """Process a single message and return found links""" links = [] if not message.text: return links # Extract links from message entities if message.entities: for entity in message.entities: if isinstance(entity, (MessageEntityUrl, MessageEntityTextUrl)): url = None if isinstance(entity, MessageEntityUrl): url = message.text[entity.offset:entity.offset + entity.length] elif isinstance(entity, MessageEntityTextUrl): url = entity.url if url and ('t.me/' in url or 'telegram.me/' in url): cleaned_url = clean_url(url) if cleaned_url: links.append((cleaned_url, group_id)) # Use regex to find Telegram links matches = re.finditer(r'(https?://)?(www\.)?(t|telegram)\.me/\S+', message.text) for match in matches: url = match.group(0) if not url.startswith('http'): url = f'https://{url}' cleaned_url = clean_url(url) if cleaned_url: links.append((cleaned_url, group_id)) return links total_rows = len(groups_df) BATCH_SIZE = 100 # Number of messages to process before saving # Iterate over the DataFrame with index for index, row in enumerate(groups_df.iterrows(), start=1): _, row = row # Unpack the row tuple group_id = row['username'] progress.current = index progress.current_group = row['group_name'] progress.last_update = datetime.now().strftime("%Y-%m-%d %H:%M:%S") try: group_entity = await client.get_entity(group_id) total_messages = (await client.get_messages(group_entity, limit=1)).total # Determine how many messages to process messages_to_process = min(total_messages, MESSAGE_LIMIT) if total_messages > MESSAGE_LIMIT: print(f"\nā ļø Group {row['group_name']} has {total_messages} messages. Processing only first {MESSAGE_LIMIT}.") # Show progress bar with current index and total rows progress_bar = tqdm(total=messages_to_process, desc=f"({index}/{total_rows}) from {row['group_name']}") links_batch = [] tasks = [] processed_messages = 0 async for message in client.iter_messages(group_entity): if processed_messages >= MESSAGE_LIMIT: break # Update batch progress progress.batch_current = processed_messages progress.batch_total = messages_to_process progress.batch_percentage = (processed_messages / messages_to_process) * 100 progress.last_update = datetime.now().strftime("%Y-%m-%d %H:%M:%S") # Create task for message processing task = asyncio.create_task(process_message(message, group_id)) tasks.append(task) processed_messages += 1 # Process in batches if len(tasks) >= BATCH_SIZE: # Wait for all tasks in batch to complete batch_results = await asyncio.gather(*tasks) for links in batch_results: links_batch.extend(links) # Save batch of links if links_batch: save_links_batch(links_batch) links_batch = [] tasks = [] progress_bar.update(BATCH_SIZE) # Process remaining messages if tasks: batch_results = await asyncio.gather(*tasks) for links in batch_results: links_batch.extend(links) if links_batch: save_links_batch(links_batch) progress_bar.update(len(tasks)) progress_bar.close() print(f"\nFinished fetching messages from {group_id}") except Exception as e: progress.status = f"Error in group: {str(e)}" print(f"\nError processing group {group_id}: {str(e)}") continue progress.status = "Completed" progress.last_update = datetime.now().strftime("%Y-%m-%d %H:%M:%S") print(f"ā Found {len(seen_links)} unique links saved to {LINKS_CSV}") def clean_and_validate_links(): """Step 3: Clean and validate the collected links""" print("\nš§¹ Cleaning and validating links...") df = pd.read_csv(LINKS_CSV) df.drop_duplicates(subset=['link'], inplace=True) def clean_url(url): if isinstance(url, str): url = url.strip() if url.startswith('tps://'): url = url.replace('tps://', 'https://') if '"' in url: url = url.split('"')[0] if ')' in url: url = url.split(')')[0] if ' ' in url or '\n' in url: url = url.split()[0] if not url.startswith('https://t.me/'): return None if url.count('/') > 3: return None return url return None df['link'] = df['link'].apply(clean_url) df.dropna(subset=['link'], inplace=True) df.to_csv(CLEANED_LINKS_CSV, index=False) print(f"ā Cleaned links saved to {CLEANED_LINKS_CSV}") return df async def check_telegram_link(client, link): """Check if a Telegram link is valid and get its information""" if not link.startswith("https://t.me/"): return {"status": False, "type": None, "title": None, "message": "Invalid URL format"} username = link.split("/")[-1] while True: try: entity = await client.get_entity(username) if entity.broadcast: return {"status": True, "type": "Channel", "title": entity.title} elif entity.megagroup: return {"status": True, "type": "Group", "title": entity.title} else: return {"status": True, "type": "Unknown", "title": entity.title} except FloodWaitError as e: wait_time = e.seconds print(f"\nā ļø FloodWaitError: Waiting for {wait_time} seconds...") await asyncio.sleep(wait_time) print("Resuming after flood wait...") continue # Retry after waiting except UsernameInvalidError: return {"status": False, "type": None, "title": None, "message": "Invalid username format"} except UsernameNotOccupiedError: return {"status": False, "type": None, "title": None, "message": "Username does not exist"} except Exception as e: return {"status": False, "type": None, "title": None, "message": str(e)} async def process_links_from_csv(client, input_file, output_file, delay_seconds: int = 2): """Step 4: Validate links and get their information""" print("\nš Checking Telegram links...") print(f"ā° Using default delay of {delay_seconds} seconds between requests") # Read input links input_df = pd.read_csv(input_file) # Read existing output file if it exists and get processed links try: output_df = pd.read_csv(output_file) processed_links = set(output_df['link']) print(f"Found {len(processed_links)} already processed links") except FileNotFoundError: processed_links = set() # Create output file with headers with open(output_file, 'w', newline='', encoding='utf-8') as f: writer = csv.DictWriter(f, fieldnames=['link', 'status', 'type', 'title', 'message']) writer.writeheader() # Filter out already processed links links_to_process = [link for link in input_df['link'] if link not in processed_links] print(f"Processing {len(links_to_process)} new links") # Process remaining links and save incrementally for i, link in enumerate(tqdm(links_to_process, desc="Checking links")): while True: try: # Add delay after first request if i > 0: # print(f"\nā³ Waiting {delay_seconds} seconds before next request...") await asyncio.sleep(delay_seconds) result = await check_telegram_link(client, link) row = { "link": link, "status": "1" if result["status"] else "0", "type": result.get("type", "N/A"), "title": result.get("title", "N/A"), "message": result.get("message", "") } # Append the result to the CSV file with open(output_file, 'a', newline='', encoding='utf-8') as f: writer = csv.DictWriter(f, fieldnames=['link', 'status', 'type', 'title', 'message']) writer.writerow(row) break # Exit the while loop if successful except Exception as e: print(f"\nā Error processing link {link}: {str(e)}") # Save error result row = { "link": link, "status": "0", "type": "N/A", "title": "N/A", "message": f"Error: {str(e)}" } with open(output_file, 'a', newline='', encoding='utf-8') as f: writer = csv.DictWriter(f, fieldnames=['link', 'status', 'type', 'title', 'message']) writer.writerow(row) break # Continue to next link print(f"ā Results saved to {output_file}") return pd.read_csv(output_file) async def main(): async with TelegramClient(SESSION_DIRS[1], API_ID, API_HASH) as client: # Step 1: Get all groups # await get_my_groups(client) # Step 2: Extract links from all groups using CSV file # await process_all_groups(client) # Step 3: Clean and validate links # cleaned_df = clean_and_validate_links() # # Step 4: Check Telegram links and get their information await process_links_from_csv(client, "2.csv", FINAL_RESULTS_CSV) def run_main(): asyncio.run(main()) if __name__ == "__main__": run_main() # p1 = Process(target=run_flask) # p1.start() # p2 = Process(target=run_main) # p2.start() # p1.join() # p2.join()