File size: 3,286 Bytes
70422d8
 
 
 
 
 
5684fd5
70422d8
 
 
 
 
5684fd5
 
 
 
 
 
70422d8
 
 
 
 
 
 
 
 
 
 
 
79bf545
70422d8
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
import random  
import requests  
import time  
import threading  
  
  
def fetch_proxies():
    """Fetch a list of proxy servers from proxyscrape.com.  
  
    Returns:  
        list: A list of proxy servers in the format "IP:Port".  
    """  
    url = "https://api.proxyscrape.com/v2/?request=displayproxies&protocol=http&timeout=10000&country=all&ssl=all&anonymity=all"
    response = requests.get(url)
    if response.status_code == 200:
        return response.text.split("\r\n")[:-1]
    print(f"Error fetching proxies: {response.status_code}")
    return []  
  
  
def test_proxy(proxy, prompt, timeout):  
    """Test the given proxy server with a specified prompt and timeout.  
  
    Args:  
        proxy (str): The proxy server in the format "IP:Port".  
        prompt (str): The test prompt to be used for testing.  
        timeout (int): The maximum time in seconds allowed for the test.  
    """  
    try:  
        start_time = time.time()  
        # res = gpt3.Completion.create(prompt=prompt, proxy=proxy)  
        end_time = time.time()  
        response_time = end_time - start_time  
  
        if response_time < timeout:  
            response_time = int(response_time*1000)  
            print(f'proxy: {proxy} [{response_time}ms] ✅')  
            add_working_proxy((proxy))  
    except Exception as e:  
        pass  
  
  
def add_working_proxy(proxy):  
    """Add a working proxy server to the global working_proxies list.  
  
    Args:  
        proxy (str): The proxy server in the format "IP:Port". 
    """  
    global working_proxies  
    working_proxies.append(proxy)  
  
  
def remove_proxy(proxy):  
    """Remove a proxy server from the global working_proxies list.  
  
    Args:  
        proxy (str): The proxy server in the format "IP:Port".  
    """  
    global working_proxies  
    if proxy in working_proxies:  
        working_proxies.remove(proxy)  
  
  
def get_working_proxies(prompt, timeout=5):  
    """Fetch and test proxy servers, adding working proxies to the global working_proxies list.  
  
    Args:  
        prompt (str): The test prompt to be used for testing.  
        timeout (int, optional): The maximum time in seconds allowed for testing. Defaults to 5.  
    """  
    proxy_list = fetch_proxies()  
    threads = []  
  
    for proxy in proxy_list:  
        thread = threading.Thread(target=test_proxy, args=(  
            proxy, prompt, timeout))  
        threads.append(thread)  
        thread.start()  
  
    for t in threads:  
        t.join(timeout)  
  
  
def update_working_proxies():  
    """Continuously update the global working_proxies list with working proxy servers."""  
    global working_proxies  
    test_prompt = "What is the capital of France?"  
  
    while True:  
        working_proxies = []  # Clear the list before updating  
        get_working_proxies(test_prompt)  
        print('proxies updated')  
        time.sleep(1800)  # Update proxies list every 30 minutes  
  
  
def get_random_proxy():  
    """Get a random working proxy server from the global working_proxies list.  
  
    Returns:  
        str: A random working proxy server in the format "IP:Port".  
    """  
    global working_proxies  
    return random.choice(working_proxies)