|
import random |
|
import requests |
|
import time |
|
import threading |
|
import socket |
|
|
|
|
|
def fetch_proxies(): |
|
"""Fetch a list of proxy servers from proxyscrape.com. |
|
|
|
Returns: |
|
list: A list of proxy servers in the format "IP:Port". |
|
""" |
|
url = "https://www.proxy-list.download/api/v1/get?type=https" |
|
response = requests.get(url) |
|
if response.status_code == 200: |
|
return response.text.split("\r\n")[:-1] |
|
print(f"Error fetching proxies: {response.status_code}") |
|
return [] |
|
|
|
|
|
def test_proxy(proxy, prompt, timeout): |
|
"""Test the given proxy server with a specified prompt and timeout. |
|
|
|
Args: |
|
proxy (str): The proxy server in the format "IP:Port". |
|
prompt (str): The test prompt to be used for testing. |
|
timeout (int): The maximum time in seconds allowed for the test. |
|
""" |
|
try: |
|
|
|
ip, port = proxy.split(':') |
|
|
|
|
|
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) |
|
|
|
|
|
start_time = time.time() |
|
|
|
|
|
sock.connect((ip, int(port))) |
|
|
|
|
|
end_time = time.time() |
|
elapsed_time = end_time - start_time |
|
|
|
|
|
|
|
|
|
|
|
sock.close() |
|
|
|
|
|
if elapsed_time < timeout: |
|
print(f"proxy: {proxy} ✅ | Elapsed time: {elapsed_time} seconds") |
|
add_working_proxy(proxy) |
|
except Exception as e: |
|
pass |
|
|
|
|
|
def add_working_proxy(proxy): |
|
"""Add a working proxy server to the global working_proxies list. |
|
|
|
Args: |
|
proxy (str): The proxy server in the format "IP:Port". |
|
""" |
|
global working_proxies |
|
working_proxies.append(proxy) |
|
|
|
|
|
def remove_proxy(proxy): |
|
"""Remove a proxy server from the global working_proxies list. |
|
|
|
Args: |
|
proxy (str): The proxy server in the format "IP:Port". |
|
""" |
|
global working_proxies |
|
if proxy in working_proxies: |
|
working_proxies.remove(proxy) |
|
|
|
|
|
def get_working_proxies(prompt, timeout=5): |
|
"""Fetch and test proxy servers, adding working proxies to the global working_proxies list. |
|
|
|
Args: |
|
prompt (str): The test prompt to be used for testing. |
|
timeout (int, optional): The maximum time in seconds allowed for testing. Defaults to 5. |
|
""" |
|
proxy_list = fetch_proxies() |
|
threads = [] |
|
|
|
for proxy in proxy_list: |
|
thread = threading.Thread(target=test_proxy, args=( |
|
proxy, prompt, timeout)) |
|
threads.append(thread) |
|
thread.start() |
|
|
|
for t in threads: |
|
t.join(timeout) |
|
|
|
|
|
def update_working_proxies(): |
|
"""Continuously update the global working_proxies list with working proxy servers.""" |
|
global working_proxies |
|
test_prompt = "What is the capital of France?" |
|
|
|
while True: |
|
working_proxies = [] |
|
get_working_proxies(test_prompt) |
|
print('proxies updated') |
|
time.sleep(1800) |
|
|
|
|
|
def get_random_proxy(): |
|
"""Get a random working proxy server from the global working_proxies list. |
|
|
|
Returns: |
|
str: A random working proxy server in the format "IP:Port". |
|
""" |
|
global working_proxies |
|
return random.choice(working_proxies) |