import csv import asyncio import time from telethon import TelegramClient from tqdm import tqdm # Import tqdm for progress bar from telethon.tl.functions.channels import JoinChannelRequest from telethon.tl.functions.messages import ImportChatInviteRequest from telethon.errors.rpcerrorlist import InviteHashExpiredError from flask import Flask, jsonify, send_from_directory # Directory for storing files from flask import Flask, render_template, send_from_directory from telethon.tl.functions.channels import GetParticipantsRequest from telethon.tl.types import ChannelParticipantsSearch from telethon.errors import FloodWaitError, UserAdminInvalidError import json import asyncio import nest_asyncio import logging from telethon import TelegramClient, events from supabase import create_client, Client from flask import Flask, jsonify from threading import Thread from multiprocessing import Process, Queue import unicodedata from telegram.helpers import escape_markdown import re import os from telethon.tl.functions.channels import JoinChannelRequest, InviteToChannelRequest from telethon.tl.functions.channels import EditBannedRequest from telethon.tl.types import ChatBannedRights from telethon.errors.rpcerrorlist import UserAdminInvalidError, UserNotParticipantError from telethon.errors.rpcerrorlist import InviteHashExpiredError, UserAlreadyParticipantError from telethon.tl.types import Channel, Chat logging.basicConfig( level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", handlers=[logging.StreamHandler()] ) class ClientConfig: """Individual client configuration""" def __init__(self, data): self.session = data['session'] self.api_id = int(data['api_id']) self.api_hash = data['api_hash'] self.phone = data['phone'] self.sleep_time = data.get('sleep_time', 5) self.batch_size = data.get('batch_size', 30) class Config: """Main configuration loader""" def __init__(self): with open('config.json') as f: self.data = json.load(f) self.clients = [ClientConfig(c) for c in self.data['clients']] self.csv_filename = self.data.get('csv_filename', 'groups.csv') self.output_csv = self.data.get('output_csv', 'results.csv') self.max_users = self.data.get('max_users', 200) class TelegramClientManager: """Manage multiple Telegram clients with separate credentials""" def __init__(self, config): self.config = config self.clients = [] self.current_index = 0 async def __aenter__(self): self.clients = [] for client_config in self.config.clients: client = TelegramClient( client_config.session, client_config.api_id, client_config.api_hash ) await client.start(client_config.phone) self.clients.append({ 'client': client, 'config': client_config }) return self async def __aexit__(self, exc_type, exc_val, exc_tb): await asyncio.gather(*[c['client'].disconnect() for c in self.clients]) def get_client(self): """Get next client with round-robin distribution""" client_data = self.clients[self.current_index] self.current_index = (self.current_index + 1) % len(self.clients) return client_data class TelegramOperations: @staticmethod async def join_groups(client_manager): """Join groups using multiple clients""" config = Config() active_client = client_manager.get_client() joined = await TelegramOperations._get_joined_groups(active_client['client']) with open(config.csv_filename) as f, open(config.output_csv, 'a') as out: reader = csv.reader(f) writer = csv.writer(out) headers = next(reader) writer.writerow(headers + ["status", "client_session"]) for row in reader: client_data = client_manager.get_client() status = await TelegramOperations._process_group( client_data['client'], row, joined, client_data['config'] ) writer.writerow(row + [status, client_data['config'].session]) await asyncio.sleep(client_data['config'].sleep_time) @staticmethod async def fetch_users(client_manager, source_group, output_csv=None): """Fetch users from group with client rotation""" config = Config() client_data = client_manager.get_client() output_csv = output_csv or f"{source_group}_users.csv" entity = await client_data['client'].get_entity(source_group) existing_users = await TelegramOperations._get_existing_users(client_data['client'], entity) with open(output_csv, 'w') as f: writer = csv.writer(f) writer.writerow(["user_id", "username", "first_name", "last_name", "collected_by"]) offset = 0 while True: try: participants = await client_data['client'](GetParticipantsRequest( entity, ChannelParticipantsSearch(''), offset, client_data['config'].batch_size )) if not participants.users: break for user in participants.users: if user.id not in existing_users: writer.writerow([ user.id, user.username or "", user.first_name or "", user.last_name or "", client_data['config'].session ]) offset += len(participants.users) await asyncio.sleep(client_data['config'].sleep_time) except FloodWaitError as e: logging.warning(f"Flood wait: Sleeping {e.seconds} seconds") await asyncio.sleep(e.seconds) @staticmethod async def add_users(client_manager, destination_group, user_csv): """Add users with client rotation""" config = Config() client_data = client_manager.get_client() entity = await client_data['client'].get_entity(destination_group) existing_users = await TelegramOperations._get_existing_users(client_data['client'], entity) with open(user_csv) as f: reader = csv.reader(f) next(reader) users = [row[0] for row in reader if row[0].isdigit()] for index, user_id in enumerate(users): try: if int(user_id) in existing_users: continue await client_data['client'](InviteToChannelRequest(entity, [int(user_id)])) logging.info(f"Client {client_data['config'].session} added {user_id}") if (index+1) % client_data['config'].batch_size == 0: await asyncio.sleep(client_data['config'].sleep_time) except Exception as e: logging.error(f"Client {client_data['config'].session} failed: {str(e)}") @staticmethod async def _get_joined_groups(client): dialogs = await client.get_dialogs() return { f"t.me/{d.entity.username}" if d.entity.username else "private_group" for d in dialogs if d.is_group or d.is_channel } @staticmethod async def _get_existing_users(client, entity): return {user.id async for user in client.iter_participants(entity)} @staticmethod async def _process_group(client, row, joined_groups, client_config): _, name, username, group_id = row if username in joined_groups: return "Already member" try: if username != "private_group": await client(JoinChannelRequest(username)) else: await client(ImportChatInviteRequest(group_id)) return "Success" except UserAlreadyParticipantError: return "Already member" except InviteHashExpiredError: return "Expired invite" except Exception as e: return f"Error: {str(e)}" class FlaskApp: def __init__(self): self.app = Flask(__name__) self._setup_routes() def _setup_routes(self): @self.app.route('/') def index(): files = os.listdir(os.getcwd()) return render_template('index.html', files=files) @self.app.route('/download/') def download(filename): return send_from_directory(os.getcwd(), filename, as_attachment=True) def run(self): self.app.run(host='0.0.0.0', port=7860) async def main_operations(): config = Config() async with TelegramClientManager(config) as client_manager: # await TelegramOperations.join_groups(client_manager) # await TelegramOperations.fetch_users(client_manager, "source_group") await TelegramOperations.add_users(client_manager, "@searchai090", "user_list.csv") def run_flask(): FlaskApp().run() if __name__ == "__main__": flask_process = Process(target=run_flask) telegram_process = Process(target=asyncio.run, args=(main_operations(),)) flask_process.start() telegram_process.start() flask_process.join() telegram_process.join()