import gradio as gr import time import threading import logging from gradio.themes.utils import sizes from main import run_repository_ranking import agent # --------------------------- # Global Logging Buffer Setup # --------------------------- LOG_BUFFER = [] LOG_BUFFER_LOCK = threading.Lock() class BufferLogHandler(logging.Handler): def emit(self, record): log_entry = self.format(record) with LOG_BUFFER_LOCK: LOG_BUFFER.append(log_entry) root_logger = logging.getLogger() if not any(isinstance(h, BufferLogHandler) for h in root_logger.handlers): handler = BufferLogHandler() formatter = logging.Formatter("%(asctime)s - %(levelname)s - %(message)s") handler.setFormatter(formatter) root_logger.addHandler(handler) def filter_logs(logs): filtered = [] last_was_fetching = False for log in logs: if "HTTP Request:" in log: if not last_was_fetching: filtered.append("Fetching repositories...") last_was_fetching = True else: filtered.append(log) last_was_fetching = False return filtered def parse_result_to_html(raw_result: str, num_results: int) -> (str, list): """ Parses the raw string output from run_repository_ranking to an HTML table. Only the top N results are displayed. Returns (html, repo_names) """ entries = raw_result.strip().split("Final Rank:") entries = entries[1:num_results+1] # Use only the first N entries if not entries: return ("

No repositories found for your query.

", []) html = """ """ repo_names = [] for entry in entries: lines = entry.strip().split("\n") data = {} data["Final Rank"] = lines[0].strip() if lines else "" for line in lines[1:]: if ": " in line: key, val = line.split(": ", 1) data[key.strip()] = val.strip() # Try to extract repo name from the Link (github.com/user/repo) link = data.get('Link', '') repo_name = '' if 'github.com/' in link: repo_name = link.split('github.com/')[-1].strip('/ ') if repo_name: repo_names.append(repo_name) html += f""" """ html += "
Rank Title Link Combined Score
{data.get('Final Rank', '')} {data.get('Title', '')} GitHub {data.get('Combined Score', '')}
" return html, repo_names # --------------------------- # GPU-enabled Wrapper for Repository Ranking # --------------------------- def gpu_run_repo(topic: str, num_results: int): return run_repository_ranking(topic, num_results) def run_lite_workflow(topic, num_results, result_container): result = gpu_run_repo(topic, num_results) result_container["raw_result"] = result def stream_lite_workflow(topic, num_results): logging.info("[UI] User started a new search for topic: %s", topic) with LOG_BUFFER_LOCK: LOG_BUFFER.clear() result_container = {} workflow_thread = threading.Thread(target=run_lite_workflow, args=(topic, num_results, result_container)) workflow_thread.start() last_index = 0 while workflow_thread.is_alive() or (last_index < len(LOG_BUFFER)): with LOG_BUFFER_LOCK: new_logs = LOG_BUFFER[last_index:] last_index = len(LOG_BUFFER) if new_logs: filtered_logs = filter_logs(new_logs) status_msg = filtered_logs[-1] detail_msg = "
".join(filtered_logs) yield status_msg, detail_msg, [] time.sleep(0.5) workflow_thread.join() with LOG_BUFFER_LOCK: final_logs = LOG_BUFFER[:] raw_result = result_container.get("raw_result", "No results returned.") html_result, repo_names = parse_result_to_html(raw_result, num_results) yield "", html_result, repo_names def lite_runner(topic, num_results): logging.info("[UI] Running lite_runner for topic: %s", topic) yield "Workflow started", "

Processing your request. Please wait...

", [] for status, details, repos in stream_lite_workflow(topic, num_results): yield status, details, repos # --------------------------- # App UI Setup Using Gradio Soft Theme with Centered Layout # --------------------------- with gr.Blocks( theme=gr.themes.Soft(text_size=sizes.text_md), title="DeepGit Lite", css=""" /* Center header and footer */ #header { text-align: center; margin-bottom: 20px; } #main-container { max-width: 800px; margin: auto; } #footer { text-align: center; margin-top: 20px; } """ ) as demo: gr.Markdown( """

DeepGit Lite

✨ DeepGit Lite is the lightweight pro version of DeepGit.
It harnesses advanced deep semantic search to explore GitHub repositories and deliver curated results.
Under the hood, it leverages a hybrid ranking approach combining dense retrieval, BM25 scoring, and cross-encoder re-ranking for optimal discovery.

New! 🗨️ After searching, you can chat with the agent about any repository you find.
The conversation agent runs in the background and is ready to answer your questions about setup, usage, or details for each repo.
Just click "Go to Chat" after your search, select a repository, and start your conversation!

🚀 Check out the full DeepGit version on GitHub and ⭐ Star DeepGit on GitHub!

""", elem_id="header" ) # --- Search UI --- with gr.Column(elem_id="main-container", visible=True) as search_ui: research_input = gr.Textbox( label="Research Query", placeholder="Enter your research topic here, e.g., Looking for a low code/no code tool to augment images and annotations?", lines=3 ) num_results_slider = gr.Slider( minimum=5, maximum=25, value=10, step=1, label="Number of Results to Display", info="Choose how many top repositories to show (sorted by score)" ) run_button = gr.Button("Run DeepGit Lite", variant="primary") status_display = gr.Markdown(label="Status") detail_display = gr.HTML(label="Results") repo_state = gr.State([]) go_to_chat_btn = gr.Button("Go to Chat", visible=False) # --- Chat UI --- with gr.Column(visible=False) as chat_ui: repo_choice = gr.Radio(choices=[], label="Select a repository", interactive=True) chat_history = gr.Chatbot(label="Chat with GitHub Agent") user_input = gr.Textbox(label="Your question", placeholder="Ask about the selected repo...e.g., tell me a bit more and guide me to set this up and running?") send_btn = gr.Button("Send") chat_state = gr.State([]) back_btn = gr.Button("Back to Search") def update_chat_button(status, details, repos): logging.info("[UI] Search complete. Showing Go to Chat button: %s", bool(repos)) return gr.update(visible=bool(repos)), repos def show_chat_ui(repos): logging.info("[UI] Switching to Chat UI. Repositories available: %s", repos) return gr.update(visible=False), gr.update(visible=True), gr.update(choices=repos, value=None), [] def back_to_search(): logging.info("[UI] Switching back to Search UI.") return gr.update(visible=True), gr.update(visible=False), gr.update(value=[]), gr.update(value=None), [] def chat_with_agent(user_msg, repo, history): logging.info("[Chat] User sent message: '%s' for repo: '%s'", user_msg, repo) if not user_msg or not user_msg.strip(): # Block blank messages return history + [["", "Please enter a message before sending."]], history if not repo: return history + [[user_msg, "Please select a repository first."]], history full_query = f"[{repo}] {user_msg}" try: result = agent.agent_executor.invoke({"input": full_query}) answer = result["output"] logging.info("[Chat] Agent response received.") except Exception as e: answer = f"Error: {e}" logging.error("[Chat] Error in agent_executor: %s", e) history = history + [[user_msg, answer]] return history, history # Disable send button if no repo is selected or message is blank, and show a helpful message def can_send(user_msg, repo): if not user_msg or not user_msg.strip(): return gr.update(interactive=False, value="Enter a message to send") if not repo: return gr.update(interactive=False, value="Select a repository") return gr.update(interactive=True, value="Send") user_input.change( fn=can_send, inputs=[user_input, repo_choice], outputs=[send_btn], show_progress=False ) repo_choice.change( fn=can_send, inputs=[user_input, repo_choice], outputs=[send_btn], show_progress=False ) run_button.click( fn=lite_runner, inputs=[research_input, num_results_slider], outputs=[status_display, detail_display, repo_state], api_name="deepgit_lite", show_progress=True ).then( fn=update_chat_button, inputs=[status_display, detail_display, repo_state], outputs=[go_to_chat_btn, repo_state] ) research_input.submit( fn=lite_runner, inputs=[research_input, num_results_slider], outputs=[status_display, detail_display, repo_state], api_name="deepgit_lite_submit", show_progress=True ).then( fn=update_chat_button, inputs=[status_display, detail_display, repo_state], outputs=[go_to_chat_btn, repo_state] ) go_to_chat_btn.click( fn=show_chat_ui, inputs=[repo_state], outputs=[search_ui, chat_ui, repo_choice, chat_state] ) back_btn.click( fn=back_to_search, inputs=[], outputs=[search_ui, chat_ui, chat_history, repo_choice, chat_state] ) send_btn.click( fn=chat_with_agent, inputs=[user_input, repo_choice, chat_state], outputs=[chat_history, chat_state], queue=False ) user_input.submit( fn=chat_with_agent, inputs=[user_input, repo_choice, chat_state], outputs=[chat_history, chat_state], queue=False ) gr.HTML( """ """ ) demo.queue(max_size=10).launch()