|
import gradio as gr |
|
import time |
|
import threading |
|
import logging |
|
from gradio.themes.utils import sizes |
|
from main import run_repository_ranking |
|
import agent |
|
|
|
|
|
|
|
|
|
LOG_BUFFER = [] |
|
LOG_BUFFER_LOCK = threading.Lock() |
|
|
|
class BufferLogHandler(logging.Handler): |
|
def emit(self, record): |
|
log_entry = self.format(record) |
|
with LOG_BUFFER_LOCK: |
|
LOG_BUFFER.append(log_entry) |
|
|
|
root_logger = logging.getLogger() |
|
if not any(isinstance(h, BufferLogHandler) for h in root_logger.handlers): |
|
handler = BufferLogHandler() |
|
formatter = logging.Formatter("%(asctime)s - %(levelname)s - %(message)s") |
|
handler.setFormatter(formatter) |
|
root_logger.addHandler(handler) |
|
|
|
def filter_logs(logs): |
|
filtered = [] |
|
last_was_fetching = False |
|
for log in logs: |
|
if "HTTP Request:" in log: |
|
if not last_was_fetching: |
|
filtered.append("Fetching repositories...") |
|
last_was_fetching = True |
|
else: |
|
filtered.append(log) |
|
last_was_fetching = False |
|
return filtered |
|
|
|
def parse_result_to_html(raw_result: str, num_results: int) -> (str, list): |
|
""" |
|
Parses the raw string output from run_repository_ranking to an HTML table. |
|
Only the top N results are displayed. |
|
Returns (html, repo_names) |
|
""" |
|
entries = raw_result.strip().split("Final Rank:") |
|
entries = entries[1:num_results+1] |
|
if not entries: |
|
return ("<p>No repositories found for your query.</p>", []) |
|
html = """ |
|
<table border="1" style="width:80%; margin: auto; border-collapse: collapse;"> |
|
<thead> |
|
<tr> |
|
<th>Rank</th> |
|
<th>Title</th> |
|
<th>Link</th> |
|
<th>Combined Score</th> |
|
</tr> |
|
</thead> |
|
<tbody> |
|
""" |
|
repo_names = [] |
|
for entry in entries: |
|
lines = entry.strip().split("\n") |
|
data = {} |
|
data["Final Rank"] = lines[0].strip() if lines else "" |
|
for line in lines[1:]: |
|
if ": " in line: |
|
key, val = line.split(": ", 1) |
|
data[key.strip()] = val.strip() |
|
|
|
link = data.get('Link', '') |
|
repo_name = '' |
|
if 'github.com/' in link: |
|
repo_name = link.split('github.com/')[-1].strip('/ ') |
|
if repo_name: |
|
repo_names.append(repo_name) |
|
html += f""" |
|
<tr> |
|
<td>{data.get('Final Rank', '')}</td> |
|
<td>{data.get('Title', '')}</td> |
|
<td><a href=\"{data.get('Link', '#')}\" target=\"_blank\">GitHub</a></td> |
|
<td>{data.get('Combined Score', '')}</td> |
|
</tr> |
|
""" |
|
html += "</tbody></table>" |
|
return html, repo_names |
|
|
|
|
|
|
|
|
|
def gpu_run_repo(topic: str, num_results: int): |
|
return run_repository_ranking(topic, num_results) |
|
|
|
def run_lite_workflow(topic, num_results, result_container): |
|
result = gpu_run_repo(topic, num_results) |
|
result_container["raw_result"] = result |
|
|
|
def stream_lite_workflow(topic, num_results): |
|
logging.info("[UI] User started a new search for topic: %s", topic) |
|
with LOG_BUFFER_LOCK: |
|
LOG_BUFFER.clear() |
|
result_container = {} |
|
workflow_thread = threading.Thread(target=run_lite_workflow, args=(topic, num_results, result_container)) |
|
workflow_thread.start() |
|
|
|
last_index = 0 |
|
while workflow_thread.is_alive() or (last_index < len(LOG_BUFFER)): |
|
with LOG_BUFFER_LOCK: |
|
new_logs = LOG_BUFFER[last_index:] |
|
last_index = len(LOG_BUFFER) |
|
if new_logs: |
|
filtered_logs = filter_logs(new_logs) |
|
status_msg = filtered_logs[-1] |
|
detail_msg = "<br/>".join(filtered_logs) |
|
yield status_msg, detail_msg, [] |
|
time.sleep(0.5) |
|
|
|
workflow_thread.join() |
|
with LOG_BUFFER_LOCK: |
|
final_logs = LOG_BUFFER[:] |
|
raw_result = result_container.get("raw_result", "No results returned.") |
|
html_result, repo_names = parse_result_to_html(raw_result, num_results) |
|
yield "", html_result, repo_names |
|
|
|
def lite_runner(topic, num_results): |
|
logging.info("[UI] Running lite_runner for topic: %s", topic) |
|
yield "Workflow started", "<p>Processing your request. Please wait...</p>", [] |
|
for status, details, repos in stream_lite_workflow(topic, num_results): |
|
yield status, details, repos |
|
|
|
|
|
|
|
|
|
with gr.Blocks( |
|
theme=gr.themes.Soft(text_size=sizes.text_md), |
|
title="DeepGit Lite", |
|
css=""" |
|
/* Center header and footer */ |
|
#header { text-align: center; margin-bottom: 20px; } |
|
#main-container { max-width: 800px; margin: auto; } |
|
#footer { text-align: center; margin-top: 20px; } |
|
""" |
|
) as demo: |
|
gr.Markdown( |
|
""" |
|
<div style="padding-top: 60px;"> |
|
<div style="display: flex; align-items: center; justify-content: center;"> |
|
<img src="https://img.icons8.com/?size=100&id=118557&format=png&color=000000" |
|
style="width: 60px; height: 60px; margin-right: 12px;"> |
|
<h1 style="margin: 0; font-size: 2.5em; font-family: 'Segoe UI', Tahoma, Geneva, Verdana, sans-serif;"> |
|
DeepGit Lite |
|
</h1> |
|
</div> |
|
<div style="text-align: center; margin-top: 20px; font-size: 1.1em; font-family: 'Segoe UI', Tahoma, Geneva, Verdana, sans-serif;"> |
|
<p> |
|
✨ DeepGit Lite is the lightweight pro version of <strong>DeepGit</strong>.<br> |
|
It harnesses advanced deep semantic search to explore GitHub repositories and deliver curated results.<br> |
|
Under the hood, it leverages a hybrid ranking approach combining dense retrieval, BM25 scoring, and cross-encoder re-ranking for optimal discovery.<br> |
|
</p> |
|
<p> |
|
<strong>New!</strong> 🗨️ After searching, you can <b>chat with the agent about any repository you find</b>.<br> |
|
The conversation agent runs in the background and is ready to answer your questions about setup, usage, or details for each repo.<br> |
|
Just click "Go to Chat" after your search, select a repository, and start your conversation! |
|
</p> |
|
<p> |
|
🚀 Check out the full DeepGit version on |
|
<a href="https://github.com/zamalali/DeepGit" target="_blank">GitHub</a> and ⭐ |
|
<strong>Star DeepGit</strong> on GitHub! |
|
</p> |
|
</div> |
|
</div> |
|
""", |
|
elem_id="header" |
|
) |
|
|
|
|
|
with gr.Column(elem_id="main-container", visible=True) as search_ui: |
|
research_input = gr.Textbox( |
|
label="Research Query", |
|
placeholder="Enter your research topic here, e.g., Looking for a low code/no code tool to augment images and annotations?", |
|
lines=3 |
|
) |
|
num_results_slider = gr.Slider( |
|
minimum=5, maximum=25, value=10, step=1, |
|
label="Number of Results to Display", |
|
info="Choose how many top repositories to show (sorted by score)" |
|
) |
|
run_button = gr.Button("Run DeepGit Lite", variant="primary") |
|
status_display = gr.Markdown(label="Status") |
|
detail_display = gr.HTML(label="Results") |
|
repo_state = gr.State([]) |
|
go_to_chat_btn = gr.Button("Go to Chat", visible=False) |
|
|
|
|
|
with gr.Column(visible=False) as chat_ui: |
|
|
|
repo_choice = gr.Radio(choices=[], label="Select a repository", interactive=True) |
|
chat_history = gr.Chatbot(label="Chat with GitHub Agent") |
|
user_input = gr.Textbox(label="Your question", placeholder="Ask about the selected repo...e.g., tell me a bit more and guide me to set this up and running?") |
|
send_btn = gr.Button("Send") |
|
chat_state = gr.State([]) |
|
back_btn = gr.Button("Back to Search") |
|
|
|
def update_chat_button(status, details, repos): |
|
logging.info("[UI] Search complete. Showing Go to Chat button: %s", bool(repos)) |
|
return gr.update(visible=bool(repos)), repos |
|
|
|
def show_chat_ui(repos): |
|
logging.info("[UI] Switching to Chat UI. Repositories available: %s", repos) |
|
return gr.update(visible=False), gr.update(visible=True), gr.update(choices=repos, value=None), [] |
|
|
|
def back_to_search(): |
|
logging.info("[UI] Switching back to Search UI.") |
|
return gr.update(visible=True), gr.update(visible=False), gr.update(value=[]), gr.update(value=None), [] |
|
|
|
def chat_with_agent(user_msg, repo, history): |
|
logging.info("[Chat] User sent message: '%s' for repo: '%s'", user_msg, repo) |
|
if not user_msg or not user_msg.strip(): |
|
|
|
return history + [["", "Please enter a message before sending."]], history |
|
if not repo: |
|
return history + [[user_msg, "Please select a repository first."]], history |
|
full_query = f"[{repo}] {user_msg}" |
|
try: |
|
result = agent.agent_executor.invoke({"input": full_query}) |
|
answer = result["output"] |
|
logging.info("[Chat] Agent response received.") |
|
except Exception as e: |
|
answer = f"Error: {e}" |
|
logging.error("[Chat] Error in agent_executor: %s", e) |
|
history = history + [[user_msg, answer]] |
|
return history, history |
|
|
|
|
|
def can_send(user_msg, repo): |
|
if not user_msg or not user_msg.strip(): |
|
return gr.update(interactive=False, value="Enter a message to send") |
|
if not repo: |
|
return gr.update(interactive=False, value="Select a repository") |
|
return gr.update(interactive=True, value="Send") |
|
user_input.change( |
|
fn=can_send, |
|
inputs=[user_input, repo_choice], |
|
outputs=[send_btn], |
|
show_progress=False |
|
) |
|
repo_choice.change( |
|
fn=can_send, |
|
inputs=[user_input, repo_choice], |
|
outputs=[send_btn], |
|
show_progress=False |
|
) |
|
|
|
run_button.click( |
|
fn=lite_runner, |
|
inputs=[research_input, num_results_slider], |
|
outputs=[status_display, detail_display, repo_state], |
|
api_name="deepgit_lite", |
|
show_progress=True |
|
).then( |
|
fn=update_chat_button, |
|
inputs=[status_display, detail_display, repo_state], |
|
outputs=[go_to_chat_btn, repo_state] |
|
) |
|
|
|
research_input.submit( |
|
fn=lite_runner, |
|
inputs=[research_input, num_results_slider], |
|
outputs=[status_display, detail_display, repo_state], |
|
api_name="deepgit_lite_submit", |
|
show_progress=True |
|
).then( |
|
fn=update_chat_button, |
|
inputs=[status_display, detail_display, repo_state], |
|
outputs=[go_to_chat_btn, repo_state] |
|
) |
|
|
|
go_to_chat_btn.click( |
|
fn=show_chat_ui, |
|
inputs=[repo_state], |
|
outputs=[search_ui, chat_ui, repo_choice, chat_state] |
|
) |
|
|
|
back_btn.click( |
|
fn=back_to_search, |
|
inputs=[], |
|
outputs=[search_ui, chat_ui, chat_history, repo_choice, chat_state] |
|
) |
|
|
|
send_btn.click( |
|
fn=chat_with_agent, |
|
inputs=[user_input, repo_choice, chat_state], |
|
outputs=[chat_history, chat_state], |
|
queue=False |
|
) |
|
user_input.submit( |
|
fn=chat_with_agent, |
|
inputs=[user_input, repo_choice, chat_state], |
|
outputs=[chat_history, chat_state], |
|
queue=False |
|
) |
|
|
|
gr.HTML( |
|
""" |
|
<div id="footer"> |
|
Made with ❤️ by <b>Zamal</b> |
|
</div> |
|
""" |
|
) |
|
|
|
demo.queue(max_size=10).launch() |
|
|