import json import os from datetime import datetime, timezone, timedelta from dateutil import parser as dateparser import meilisearch from fasthtml.common import * from markdown import markdown from dotenv import load_dotenv from apscheduler.schedulers.background import BackgroundScheduler from apscheduler.triggers.cron import CronTrigger from contextlib import asynccontextmanager from constants import MeilisearchIndexFields from update import process_webhook, update_webhooks loaded = load_dotenv("./.env", override=True) print("Loaded .env file:", loaded) MS_URL = os.getenv("MS_URL") MS_SEARCH_KEY = os.getenv("MS_SEARCH_KEY") ms_client = meilisearch.Client(MS_URL, MS_SEARCH_KEY) css_content = open("styles.css").read() @asynccontextmanager async def lifespan(app): # Setup scheduler = BackgroundScheduler() scheduler.add_job(update_webhooks, CronTrigger.from_crontab("0 */3 * * *")) scheduler.start() yield # Cleanup scheduler.shutdown() app, rt = fast_app(hdrs=(Style(css_content),), lifespan=lifespan) md_exts = "codehilite", "smarty", "extra", "sane_lists" def Markdown(s, exts=md_exts, **kw): return Div(NotStr(markdown(s, extensions=exts)), **kw) scroll_script = Script( """ document.addEventListener('DOMContentLoaded', function() { var scrollButton = document.getElementById('scroll-top-btn'); window.onscroll = function() { if (document.body.scrollTop > 20 || document.documentElement.scrollTop > 20) { scrollButton.style.display = "block"; } else { scrollButton.style.display = "none"; } }; scrollButton.onclick = function() { document.body.scrollTop = 0; // For Safari document.documentElement.scrollTop = 0; // For Chrome, Firefox, IE and Opera }; }); """ ) def date_range_inputs(start_date, end_date): return Div( Input( type="date", name="start_date", value=start_date.strftime("%Y-%m-%d"), title="Start date", ), Input( type="date", name="end_date", value=end_date.strftime("%Y-%m-%d"), title="End date", ), cls="date-range", ) def search_form(start_date, end_date): return Form( Input(type="text", name="query", placeholder="Enter search query"), date_range_inputs(start_date, end_date), Button("Search", type="submit"), hx_post="/search", hx_target="#search-results", hx_trigger="submit", id="search-form", ) def iso_to_unix_timestamp(iso_string): dt = dateparser.isoparse(iso_string) return int(dt.timestamp()) def unix_timestamp_to_nice_format(timestamp): dt = datetime.fromtimestamp(timestamp, tz=timezone.utc) return dt.strftime("%b %d, %Y at %H:%M UTC") def make_query(query, start_date, end_date, page=1, limit=10): twenty_three_hours_59_minutes_59_seconds_in_seconds = (23 * 60 + 59) * 60 + 59 after_timestamp = iso_to_unix_timestamp(start_date) before_timestamp = ( iso_to_unix_timestamp(end_date) + twenty_three_hours_59_minutes_59_seconds_in_seconds ) options = { "limit": limit, "offset": (page - 1) * limit, "filter": f"{MeilisearchIndexFields.UPDATED_AT.value} >= {after_timestamp} AND {MeilisearchIndexFields.UPDATED_AT.value} < {before_timestamp}", "attributesToCrop": [MeilisearchIndexFields.CONTENT.value], "cropLength": 30, "attributesToHighlight": [ MeilisearchIndexFields.CONTENT.value, MeilisearchIndexFields.TITLE.value, ], "highlightPreTag": '', "highlightPostTag": "", "distinct": MeilisearchIndexFields.URL.value, } return ms_client.index(MeilisearchIndexFields.INDEX_NAME.value).search( query=query, opt_params=options ) def search_results(query, start_date, end_date, page=1): raw_results = make_query(query, start_date, end_date, page) return Div( make_results_bar(raw_results), Div(*[make_card(r) for r in raw_results["hits"]]), make_pagination(page, raw_results["estimatedTotalHits"]), id="search-results", ) def make_results_bar(results): processing_time = results["processingTimeMs"] estimated_hits = results["estimatedTotalHits"] return Div( Div(f"Processing time: {processing_time}ms"), Div(f"Estimated total hits: {estimated_hits}"), cls="results-bar", ) def make_card(result): result = result["_formatted"] url = result[MeilisearchIndexFields.URL.value] date = unix_timestamp_to_nice_format( int(result[MeilisearchIndexFields.UPDATED_AT.value]) ) return Div( Div( Strong(NotStr(result[MeilisearchIndexFields.TITLE.value])), P(NotStr(result[MeilisearchIndexFields.CONTENT.value]), cls="comment-text"), Div(Span(date)), A(url, href=url, target="_blank"), ), cls="card-item", ) def make_pagination(current_page, total_hits, limit=10): total_pages = -(-total_hits // limit) # Ceiling division children = [] if current_page > 1: children.append( Button( "Previous", hx_post=f"/search?page={current_page-1}", hx_target="#search-results", hx_include="[name='query'], [name='start_date'], [name='end_date']", ) ) children.append(Span(f"Page {current_page} of {total_pages}")) if current_page < total_pages: children.append( Button( "Next", hx_post=f"/search?page={current_page+1}", hx_target="#search-results", hx_include="[name='query'], [name='start_date'], [name='end_date']", ) ) return Div(*children, cls="pagination") scroll_button = Button( "Scroll to Top", id="scroll-top-btn", style=""" position: fixed; bottom: 20px; right: 20px; display: none; background-color: #007bff; color: white; border: none; border-radius: 5px; padding: 10px 15px; cursor: pointer; """, ) @rt("/") def get(): end_date = datetime.now() start_date = end_date - timedelta(days=7) return Titled( "HF Discussion Search", Div( search_form(start_date, end_date), Div(id="search-results"), scroll_button, scroll_script, cls="container", ), ) @rt("/search") def post(query: str, start_date: str, end_date: str, page: int = 1): return search_results(query, start_date, end_date, page) @app.post("/webhook") async def hf_webhook(request): return await process_webhook(request) serve()