Spaces:
Running
Running
import random | |
import requests | |
import time | |
import threading | |
def fetch_proxies(): | |
"""Fetch a list of proxy servers from proxyscrape.com. | |
Returns: | |
list: A list of proxy servers in the format "IP:Port". | |
""" | |
url = "https://api.proxyscrape.com/v2/?request=displayproxies&protocol=http&timeout=10000&country=all&ssl=all&anonymity=all" | |
response = requests.get(url) | |
if response.status_code == 200: | |
return response.text.split("\r\n")[:-1] | |
print(f"Error fetching proxies: {response.status_code}") | |
return [] | |
def test_proxy(proxy, prompt, timeout): | |
"""Test the given proxy server with a specified prompt and timeout. | |
Args: | |
proxy (str): The proxy server in the format "IP:Port". | |
prompt (str): The test prompt to be used for testing. | |
timeout (int): The maximum time in seconds allowed for the test. | |
""" | |
try: | |
start_time = time.time() | |
# res = gpt3.Completion.create(prompt=prompt, proxy=proxy) | |
end_time = time.time() | |
response_time = end_time - start_time | |
if response_time < timeout: | |
response_time = int(response_time*1000) | |
print(f'proxy: {proxy} [{response_time}ms] β ') | |
add_working_proxy((proxy)) | |
except Exception as e: | |
pass | |
def add_working_proxy(proxy): | |
"""Add a working proxy server to the global working_proxies list. | |
Args: | |
proxy (str): The proxy server in the format "IP:Port". | |
""" | |
global working_proxies | |
working_proxies.append(proxy) | |
def remove_proxy(proxy): | |
"""Remove a proxy server from the global working_proxies list. | |
Args: | |
proxy (str): The proxy server in the format "IP:Port". | |
""" | |
global working_proxies | |
if proxy in working_proxies: | |
working_proxies.remove(proxy) | |
def get_working_proxies(prompt, timeout=5): | |
"""Fetch and test proxy servers, adding working proxies to the global working_proxies list. | |
Args: | |
prompt (str): The test prompt to be used for testing. | |
timeout (int, optional): The maximum time in seconds allowed for testing. Defaults to 5. | |
""" | |
proxy_list = fetch_proxies() | |
threads = [] | |
for proxy in proxy_list: | |
thread = threading.Thread(target=test_proxy, args=( | |
proxy, prompt, timeout)) | |
threads.append(thread) | |
thread.start() | |
for t in threads: | |
t.join(timeout) | |
def update_working_proxies(): | |
"""Continuously update the global working_proxies list with working proxy servers.""" | |
global working_proxies | |
test_prompt = "What is the capital of France?" | |
while True: | |
working_proxies = [] # Clear the list before updating | |
get_working_proxies(test_prompt) | |
print('proxies updated') | |
time.sleep(1800) # Update proxies list every 30 minutes | |
def get_random_proxy(): | |
"""Get a random working proxy server from the global working_proxies list. | |
Returns: | |
str: A random working proxy server in the format "IP:Port". | |
""" | |
global working_proxies | |
return random.choice(working_proxies) | |