Spaces:
Runtime error
Runtime error
File size: 6,629 Bytes
8bbdb9e 7b31f93 8bbdb9e 8a00fb3 2288f21 0f0e2ce 8bbdb9e 0f0e2ce 7b31f93 8bbdb9e 62cc7a7 8bbdb9e d7e93db 8bbdb9e d7e93db 8bbdb9e 2288f21 8bbdb9e 7b31f93 8bbdb9e 8a00fb3 8bbdb9e 8a00fb3 8bbdb9e 8a00fb3 8bbdb9e 8a00fb3 8bbdb9e 8a00fb3 8bbdb9e 2288f21 8bbdb9e 7b31f93 8a00fb3 8bbdb9e 8a00fb3 8bbdb9e 8a00fb3 8bbdb9e 7b31f93 8a00fb3 8bbdb9e 7b31f93 8bbdb9e 7b31f93 2288f21 8bbdb9e |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 |
import asyncio
import gradio as gr
from sqlalchemy.exc import SQLAlchemyError
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession
from sqlalchemy.future import select
from sqlalchemy.orm import sessionmaker
import logging
import threading
import time
import requests
from bs4 import BeautifulSoup
logger = logging.getLogger(__name__)
logging.basicConfig(level=logging.INFO)
# Global variables
db_session = None
engine = None
# Function for dynamically setting the database connection
async def set_db_connection(host, port, user, password, db_name):
global db_session, engine
try:
engine = create_async_engine(
f"mysql+aiomysql://{user}:{password}@{host}:{port}/{db_name}",
echo=False
)
Session = sessionmaker(engine, class_=AsyncSession, expire_on_commit=False)
db_session = Session()
logger.info("Database connection established.")
return "Database connection established."
except Exception as e:
logger.error(f"Failed to establish database connection: {e}")
return f"Failed to connect to database: {e}"
# Function to update database status
def update_db_status():
global db_session
try:
if db_session:
asyncio.run(db_session.execute(select(1)))
return "Connected"
else:
return "Not connected"
except SQLAlchemyError as e:
logger.error(f"SQLAlchemyError: {e}")
return "Disconnected"
except Exception as e:
logger.error(f"Unexpected error: {e}")
return "Disconnected"
# Background task to update status
def background_update(db_status_textbox):
while True:
status = update_db_status()
db_status_textbox.update(value=status)
logger.info(f"Database status updated: {status}")
time.sleep(60)
# Function to scrape data from a URL
def scrape_data(url):
try:
response = requests.get(url)
response.raise_for_status()
soup = BeautifulSoup(response.text, 'html.parser')
# Extract data as needed, e.g., headlines, articles, etc.
data = [element.text for element in soup.find_all('h1')] # Example: extracting all h1 tags
return data
except requests.RequestException as e:
logger.error(f"Failed to scrape data from {url}: {e}")
return []
# Function to store scraped data in the database
async def store_data(data):
global db_session
try:
async with db_session() as session:
async with session.begin():
# Example: Assuming a table 'feeds' with a column 'content'
for item in data:
await session.execute(
"INSERT INTO feeds (content) VALUES (:content)",
{"content": item}
)
await session.commit()
logger.info("Data stored in the database.")
except SQLAlchemyError as e:
logger.error(f"Failed to store data in the database: {e}")
# Function to serve data to a target URL
def serve_data(target_url, data):
try:
response = requests.post(target_url, json={"data": data})
response.raise_for_status()
logger.info(f"Data served to {target_url} successfully.")
return "Data served successfully."
except requests.RequestException as e:
logger.error(f"Failed to serve data to {target_url}: {e}")
return f"Failed to serve data: {e}"
# Function to monitor and autopost data
def monitor_and_autopost(scrape_url, target_url):
while True:
data = scrape_data(scrape_url)
asyncio.run(store_data(data))
serve_data(target_url, data)
time.sleep(3600) # Run every hour
# Main application that runs Gradio UI and background tasks
def main():
with gr.Blocks(css=".gradio-container {background: linear-gradient(135deg, #6a0dad, #ffd700);}") as demo:
gr.Markdown("# Website Monitor and Chatbot", elem_id="title")
with gr.Row():
with gr.Column():
gr.Markdown("## Database Settings", elem_id="subtitle")
db_host = gr.Textbox(label="Database Host", placeholder="localhost", value="localhost")
db_port = gr.Textbox(label="Database Port", placeholder="3306", value="3306")
db_user = gr.Textbox(label="Database User", placeholder="username", value="")
db_pass = gr.Textbox(label="Database Password", placeholder="password", type="password", value="")
db_name = gr.Textbox(label="Database Name", placeholder="database_name", value="monitoring")
db_status_textbox = gr.Textbox(label="Database Status", interactive=False)
status_text = gr.Textbox(label="Status", interactive=False)
connect_button = gr.Button("Connect to Database")
gr.Markdown("## Scraping and Serving Settings", elem_id="subtitle")
scrape_url = gr.Textbox(label="Scrape URL", placeholder="https://example.com")
target_url = gr.Textbox(label="Target URL", placeholder="https://target.com/api")
start_monitoring_button = gr.Button("Start Monitoring and Autoposting")
# Connect button click event
def on_connect_click(host, port, user, password, db_name):
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
result = loop.run_until_complete(set_db_connection(host, port, user, password, db_name))
return result
connect_button.click(
on_connect_click,
inputs=[db_host, db_port, db_user, db_pass, db_name],
outputs=[status_text]
)
# Start monitoring and autoposting button click event
def on_start_monitoring_click(scrape_url, target_url):
threading.Thread(target=monitor_and_autopost, args=(scrape_url, target_url), daemon=True).start()
return "Monitoring and autoposting started."
start_monitoring_button.click(
on_start_monitoring_click,
inputs=[scrape_url, target_url],
outputs=[]
)
# Start background task to update status
threading.Thread(target=background_update, args=(db_status_textbox,), daemon=True).start()
# Launch the Gradio interface with a timeout
logger.info("Launching Gradio interface...")
demo.launch(prevent_thread_lock=True)
logger.info("Gradio interface launched successfully.")
if __name__ == "__main__":
main()
logger.info("Main function completed. App is running.") |