Spaces:
Paused
Paused
import os | |
import subprocess | |
from dotenv import load_dotenv | |
from redis import Redis | |
load_dotenv() | |
REDIS_URL = os.environ['REDIS_URL'] | |
redis_client = Redis.from_url(REDIS_URL) | |
def kill_all_redis_clients(): | |
try: | |
# Fetch Redis URL from environment variable | |
if not REDIS_URL: | |
raise ValueError("REDIS_URL is not set in the environment variables.") | |
# Extract host and port from the Redis URL | |
redis_host = REDIS_URL.split("//")[-1].split("@")[1].split(":")[0] | |
redis_port = REDIS_URL.split("//")[-1].split("@")[1].split(":")[1] | |
# Execute Redis CLI commands to list and kill clients | |
client_list_command = f'redis-cli -h {redis_host} -p {redis_port} CLIENT LIST' | |
client_list_output = subprocess.check_output(client_list_command, shell=True).decode('utf-8') | |
client_ids = [line.split(' ')[0].split('=')[1] for line in client_list_output.strip().split('\n') if 'id=' in line] | |
for client_id in client_ids: | |
kill_command = f'redis-cli -h {redis_host} -p {redis_port} CLIENT KILL ID {client_id}' | |
subprocess.check_call(kill_command, shell=True) | |
print("Successfully killed all Redis clients.") | |
except Exception as e: | |
print(f"An error occurred: {e}") | |
if __name__ == "__main__": | |
kill_all_redis_clients() | |