DeepGit / app.py
zamalali
Updated background agents latency checks
ae2c609
import gradio as gr
import time
import threading
import logging
from gradio.themes.utils import sizes
from main import run_repository_ranking
import agent
# ---------------------------
# Global Logging Buffer Setup
# ---------------------------
LOG_BUFFER = []
LOG_BUFFER_LOCK = threading.Lock()
class BufferLogHandler(logging.Handler):
def emit(self, record):
log_entry = self.format(record)
with LOG_BUFFER_LOCK:
LOG_BUFFER.append(log_entry)
root_logger = logging.getLogger()
if not any(isinstance(h, BufferLogHandler) for h in root_logger.handlers):
handler = BufferLogHandler()
formatter = logging.Formatter("%(asctime)s - %(levelname)s - %(message)s")
handler.setFormatter(formatter)
root_logger.addHandler(handler)
def filter_logs(logs):
filtered = []
last_was_fetching = False
for log in logs:
if "HTTP Request:" in log:
if not last_was_fetching:
filtered.append("Fetching repositories...")
last_was_fetching = True
else:
filtered.append(log)
last_was_fetching = False
return filtered
def parse_result_to_html(raw_result: str, num_results: int) -> (str, list):
"""
Parses the raw string output from run_repository_ranking to an HTML table.
Only the top N results are displayed.
Returns (html, repo_names)
"""
entries = raw_result.strip().split("Final Rank:")
entries = entries[1:num_results+1] # Use only the first N entries
if not entries:
return ("<p>No repositories found for your query.</p>", [])
html = """
<table border="1" style="width:80%; margin: auto; border-collapse: collapse;">
<thead>
<tr>
<th>Rank</th>
<th>Title</th>
<th>Link</th>
<th>Combined Score</th>
</tr>
</thead>
<tbody>
"""
repo_names = []
for entry in entries:
lines = entry.strip().split("\n")
data = {}
data["Final Rank"] = lines[0].strip() if lines else ""
for line in lines[1:]:
if ": " in line:
key, val = line.split(": ", 1)
data[key.strip()] = val.strip()
# Try to extract repo name from the Link (github.com/user/repo)
link = data.get('Link', '')
repo_name = ''
if 'github.com/' in link:
repo_name = link.split('github.com/')[-1].strip('/ ')
if repo_name:
repo_names.append(repo_name)
html += f"""
<tr>
<td>{data.get('Final Rank', '')}</td>
<td>{data.get('Title', '')}</td>
<td><a href=\"{data.get('Link', '#')}\" target=\"_blank\">GitHub</a></td>
<td>{data.get('Combined Score', '')}</td>
</tr>
"""
html += "</tbody></table>"
return html, repo_names
# ---------------------------
# GPU-enabled Wrapper for Repository Ranking
# ---------------------------
def gpu_run_repo(topic: str, num_results: int):
return run_repository_ranking(topic, num_results)
def run_lite_workflow(topic, num_results, result_container):
result = gpu_run_repo(topic, num_results)
result_container["raw_result"] = result
def stream_lite_workflow(topic, num_results):
logging.info("[UI] User started a new search for topic: %s", topic)
with LOG_BUFFER_LOCK:
LOG_BUFFER.clear()
result_container = {}
workflow_thread = threading.Thread(target=run_lite_workflow, args=(topic, num_results, result_container))
workflow_thread.start()
last_index = 0
while workflow_thread.is_alive() or (last_index < len(LOG_BUFFER)):
with LOG_BUFFER_LOCK:
new_logs = LOG_BUFFER[last_index:]
last_index = len(LOG_BUFFER)
if new_logs:
filtered_logs = filter_logs(new_logs)
status_msg = filtered_logs[-1]
detail_msg = "<br/>".join(filtered_logs)
yield status_msg, detail_msg, []
time.sleep(0.5)
workflow_thread.join()
with LOG_BUFFER_LOCK:
final_logs = LOG_BUFFER[:]
raw_result = result_container.get("raw_result", "No results returned.")
html_result, repo_names = parse_result_to_html(raw_result, num_results)
yield "", html_result, repo_names
def lite_runner(topic, num_results):
logging.info("[UI] Running lite_runner for topic: %s", topic)
yield "Workflow started", "<p>Processing your request. Please wait...</p>", []
for status, details, repos in stream_lite_workflow(topic, num_results):
yield status, details, repos
# ---------------------------
# App UI Setup Using Gradio Soft Theme with Centered Layout
# ---------------------------
with gr.Blocks(
theme=gr.themes.Soft(text_size=sizes.text_md),
title="DeepGit Lite",
css="""
/* Center header and footer */
#header { text-align: center; margin-bottom: 20px; }
#main-container { max-width: 800px; margin: auto; }
#footer { text-align: center; margin-top: 20px; }
"""
) as demo:
gr.Markdown(
"""
<div style="padding-top: 60px;">
<div style="display: flex; align-items: center; justify-content: center;">
<img src="https://img.icons8.com/?size=100&id=118557&format=png&color=000000"
style="width: 60px; height: 60px; margin-right: 12px;">
<h1 style="margin: 0; font-size: 2.5em; font-family: 'Segoe UI', Tahoma, Geneva, Verdana, sans-serif;">
DeepGit Lite
</h1>
</div>
<div style="text-align: center; margin-top: 20px; font-size: 1.1em; font-family: 'Segoe UI', Tahoma, Geneva, Verdana, sans-serif;">
<p>
✨ DeepGit Lite is the lightweight pro version of <strong>DeepGit</strong>.<br>
It harnesses advanced deep semantic search to explore GitHub repositories and deliver curated results.<br>
Under the hood, it leverages a hybrid ranking approach combining dense retrieval, BM25 scoring, and cross-encoder re-ranking for optimal discovery.<br>
</p>
<p>
<strong>New!</strong> 🗨️ After searching, you can <b>chat with the agent about any repository you find</b>.<br>
The conversation agent runs in the background and is ready to answer your questions about setup, usage, or details for each repo.<br>
Just click "Go to Chat" after your search, select a repository, and start your conversation!
</p>
<p>
🚀 Check out the full DeepGit version on
<a href="https://github.com/zamalali/DeepGit" target="_blank">GitHub</a> and ⭐
<strong>Star DeepGit</strong> on GitHub!
</p>
</div>
</div>
""",
elem_id="header"
)
# --- Search UI ---
with gr.Column(elem_id="main-container", visible=True) as search_ui:
research_input = gr.Textbox(
label="Research Query",
placeholder="Enter your research topic here, e.g., Looking for a low code/no code tool to augment images and annotations?",
lines=3
)
num_results_slider = gr.Slider(
minimum=5, maximum=25, value=10, step=1,
label="Number of Results to Display",
info="Choose how many top repositories to show (sorted by score)"
)
run_button = gr.Button("Run DeepGit Lite", variant="primary")
status_display = gr.Markdown(label="Status")
detail_display = gr.HTML(label="Results")
repo_state = gr.State([])
go_to_chat_btn = gr.Button("Go to Chat", visible=False)
# --- Chat UI ---
with gr.Column(visible=False) as chat_ui:
repo_choice = gr.Radio(choices=[], label="Select a repository", interactive=True)
chat_history = gr.Chatbot(label="Chat with GitHub Agent")
user_input = gr.Textbox(label="Your question", placeholder="Ask about the selected repo...e.g., tell me a bit more and guide me to set this up and running?")
send_btn = gr.Button("Send")
chat_state = gr.State([])
back_btn = gr.Button("Back to Search")
def update_chat_button(status, details, repos):
logging.info("[UI] Search complete. Showing Go to Chat button: %s", bool(repos))
return gr.update(visible=bool(repos)), repos
def show_chat_ui(repos):
logging.info("[UI] Switching to Chat UI. Repositories available: %s", repos)
return gr.update(visible=False), gr.update(visible=True), gr.update(choices=repos, value=None), []
def back_to_search():
logging.info("[UI] Switching back to Search UI.")
return gr.update(visible=True), gr.update(visible=False), gr.update(value=[]), gr.update(value=None), []
def chat_with_agent(user_msg, repo, history):
logging.info("[Chat] User sent message: '%s' for repo: '%s'", user_msg, repo)
if not user_msg or not user_msg.strip():
# Block blank messages
return history + [["", "Please enter a message before sending."]], history
if not repo:
return history + [[user_msg, "Please select a repository first."]], history
full_query = f"[{repo}] {user_msg}"
try:
result = agent.agent_executor.invoke({"input": full_query})
answer = result["output"]
logging.info("[Chat] Agent response received.")
except Exception as e:
answer = f"Error: {e}"
logging.error("[Chat] Error in agent_executor: %s", e)
history = history + [[user_msg, answer]]
return history, history
# Disable send button if no repo is selected or message is blank, and show a helpful message
def can_send(user_msg, repo):
if not user_msg or not user_msg.strip():
return gr.update(interactive=False, value="Enter a message to send")
if not repo:
return gr.update(interactive=False, value="Select a repository")
return gr.update(interactive=True, value="Send")
user_input.change(
fn=can_send,
inputs=[user_input, repo_choice],
outputs=[send_btn],
show_progress=False
)
repo_choice.change(
fn=can_send,
inputs=[user_input, repo_choice],
outputs=[send_btn],
show_progress=False
)
run_button.click(
fn=lite_runner,
inputs=[research_input, num_results_slider],
outputs=[status_display, detail_display, repo_state],
api_name="deepgit_lite",
show_progress=True
).then(
fn=update_chat_button,
inputs=[status_display, detail_display, repo_state],
outputs=[go_to_chat_btn, repo_state]
)
research_input.submit(
fn=lite_runner,
inputs=[research_input, num_results_slider],
outputs=[status_display, detail_display, repo_state],
api_name="deepgit_lite_submit",
show_progress=True
).then(
fn=update_chat_button,
inputs=[status_display, detail_display, repo_state],
outputs=[go_to_chat_btn, repo_state]
)
go_to_chat_btn.click(
fn=show_chat_ui,
inputs=[repo_state],
outputs=[search_ui, chat_ui, repo_choice, chat_state]
)
back_btn.click(
fn=back_to_search,
inputs=[],
outputs=[search_ui, chat_ui, chat_history, repo_choice, chat_state]
)
send_btn.click(
fn=chat_with_agent,
inputs=[user_input, repo_choice, chat_state],
outputs=[chat_history, chat_state],
queue=False
)
user_input.submit(
fn=chat_with_agent,
inputs=[user_input, repo_choice, chat_state],
outputs=[chat_history, chat_state],
queue=False
)
gr.HTML(
"""
<div id="footer">
Made with ❤️ by <b>Zamal</b>
</div>
"""
)
demo.queue(max_size=10).launch()