|
""" |
|
Blog Data Update Script |
|
|
|
This script updates the blog data vector store when new posts are added. |
|
It can be scheduled to run periodically or manually executed. |
|
|
|
Usage: |
|
python pipeline.py [--force-recreate] [--data-dir DATA_DIR] [--output-dir OUTPUT_DIR] [--ci] |
|
|
|
Options: |
|
--force-recreate Force recreation of the vector store even if it exists |
|
--data-dir DIR Directory containing the blog posts (default: data/) |
|
--output-dir DIR Directory to save stats and artifacts (default: ./stats) |
|
--ci Run in CI mode (no interactive prompts, exit codes for CI) |
|
""" |
|
|
|
import os |
|
import sys |
|
import argparse |
|
from datetime import datetime |
|
import json |
|
import logging |
|
from pathlib import Path |
|
from lets_talk.config import ( |
|
CHUNK_OVERLAP, CHUNK_SIZE, VECTOR_STORAGE_PATH, DATA_DIR, |
|
FORCE_RECREATE, OUTPUT_DIR, USE_CHUNKING, SHOULD_SAVE_STATS |
|
) |
|
|
|
|
|
import lets_talk.utils.blog as blog |
|
|
|
|
|
logging.basicConfig( |
|
level=logging.INFO, |
|
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', |
|
handlers=[logging.StreamHandler()] |
|
) |
|
logger = logging.getLogger("blog-pipeline") |
|
|
|
def parse_args(): |
|
"""Parse command-line arguments""" |
|
parser = argparse.ArgumentParser(description="Update blog data vector store") |
|
parser.add_argument("--force-recreate", action="store_true", |
|
help="Force recreation of the vector store") |
|
parser.add_argument("--data-dir", default=DATA_DIR, |
|
help=f"Directory containing blog posts (default: {DATA_DIR})") |
|
parser.add_argument("--output-dir", default="./stats", |
|
help="Directory to save stats and artifacts (default: ./stats)") |
|
parser.add_argument("--ci", action="store_true", |
|
help="Run in CI mode (no interactive prompts, exit codes for CI)") |
|
parser.add_argument("--chunk-size", type=int, |
|
help=f"Size of each chunk in characters (default from config)") |
|
parser.add_argument("--chunk-overlap", type=int, |
|
help=f"Overlap between chunks in characters (default from config)") |
|
parser.add_argument("--no-chunking", action="store_true", |
|
help="Don't split documents into chunks (use whole documents)") |
|
return parser.parse_args() |
|
|
|
def save_stats(stats, output_dir="./stats", ci_mode=False): |
|
"""Save stats to a JSON file for tracking changes over time |
|
|
|
Args: |
|
stats: Dictionary containing statistics about the blog posts |
|
output_dir: Directory to save the stats file |
|
ci_mode: Whether to run in CI mode (use fixed filename) |
|
|
|
Returns: |
|
Tuple of (filename, stats_dict) |
|
""" |
|
|
|
Path(output_dir).mkdir(exist_ok=True, parents=True) |
|
|
|
|
|
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") |
|
if ci_mode: |
|
filename = f"{output_dir}/blog_stats_latest.json" |
|
|
|
history_filename = f"{output_dir}/blog_stats_{timestamp}.json" |
|
else: |
|
filename = f"{output_dir}/blog_stats_{timestamp}.json" |
|
|
|
|
|
basic_stats = { |
|
"timestamp": timestamp, |
|
"total_documents": stats["total_documents"], |
|
"total_characters": stats["total_characters"], |
|
"min_length": stats["min_length"], |
|
"max_length": stats["max_length"], |
|
"avg_length": stats["avg_length"], |
|
} |
|
|
|
with open(filename, "w") as f: |
|
json.dump(basic_stats, f, indent=2) |
|
|
|
|
|
if ci_mode: |
|
with open(history_filename, "w") as f: |
|
json.dump(basic_stats, f, indent=2) |
|
logger.info(f"Saved stats to {filename} and {history_filename}") |
|
else: |
|
logger.info(f"Saved stats to {filename}") |
|
|
|
return filename, basic_stats |
|
|
|
def create_vector_database(data_dir=DATA_DIR, storage_path=VECTOR_STORAGE_PATH, |
|
force_recreate=FORCE_RECREATE, output_dir=OUTPUT_DIR, ci_mode=False, |
|
use_chunking=USE_CHUNKING, should_save_stats=SHOULD_SAVE_STATS, |
|
chunk_size=CHUNK_SIZE, chunk_overlap=CHUNK_OVERLAP): |
|
""" |
|
Create or update the vector database with blog documents. |
|
|
|
Args: |
|
data_dir: Directory containing the blog posts (default from config) |
|
storage_path: Path where the vector database will be stored (default from config) |
|
force_recreate: Whether to force recreation of the vector store (default from config) |
|
output_dir: Directory to save stats and artifacts (default from config) |
|
ci_mode: Whether to run in CI mode |
|
use_chunking: Whether to split documents into chunks (default from config) |
|
should_save_stats: Whether to save statistics about the documents (default from config) |
|
chunk_size: Size of each chunk in characters (default from config) |
|
chunk_overlap: Overlap between chunks in characters (default from config) |
|
|
|
Returns: |
|
Tuple of (success status, message, stats, stats_file, stats_file_content) |
|
""" |
|
try: |
|
|
|
logger.info(f"Loading blog posts from {data_dir}") |
|
documents = blog.load_blog_posts(data_dir) |
|
documents = blog.update_document_metadata(documents) |
|
|
|
|
|
|
|
stats = blog.get_document_stats(documents) |
|
blog.display_document_stats(stats) |
|
|
|
|
|
stats_file = None |
|
stats_content = None |
|
if should_save_stats: |
|
stats_file, stats_content = save_stats(stats, output_dir=output_dir, ci_mode=ci_mode) |
|
|
|
if use_chunking: |
|
logger.info("Chunking documents...") |
|
|
|
chunking_params = {} |
|
if chunk_size is not None: |
|
chunking_params['chunk_size'] = chunk_size |
|
if chunk_overlap is not None: |
|
chunking_params['chunk_overlap'] = chunk_overlap |
|
|
|
logger.info(f"Using chunk size: {chunking_params.get('chunk_size', 'default')} and overlap: {chunking_params.get('chunk_overlap', 'default')}") |
|
documents = blog.split_documents(documents, **chunking_params) |
|
|
|
|
|
|
|
create_vector_store = (not Path.exists(Path(storage_path))) or force_recreate |
|
|
|
if create_vector_store: |
|
logger.info("Creating vector store...") |
|
vector_store = blog.create_vector_store( |
|
documents, |
|
storage_path=storage_path, |
|
force_recreate=force_recreate |
|
) |
|
vector_store.client.close() |
|
logger.info(f"Vector store successfully created at {storage_path}") |
|
|
|
|
|
if ci_mode: |
|
build_info = { |
|
"build_timestamp": datetime.now().strftime("%Y-%m-%d %H:%M:%S"), |
|
"document_count": stats["total_documents"], |
|
"storage_path": str(storage_path), |
|
"vector_store_size_bytes": get_directory_size(storage_path), |
|
} |
|
build_info_path = Path(output_dir) / "vector_store_build_info.json" |
|
with open(build_info_path, "w") as f: |
|
json.dump(build_info, f, indent=2) |
|
logger.info(f"Build info saved to {build_info_path}") |
|
|
|
return True, f"Vector store successfully created at {storage_path}", stats, stats_file, stats_content |
|
else: |
|
logger.info(f"Vector store already exists at {storage_path}") |
|
return True, f"Vector store already exists at {storage_path} (use --force-recreate to rebuild)", stats, stats_file, stats_content |
|
except Exception as e: |
|
logger.error(f"Error creating vector store: {str(e)}", exc_info=True) |
|
return False, f"Error creating vector store: {str(e)}", None, None, None |
|
|
|
def get_directory_size(path): |
|
"""Get the size of a directory in bytes""" |
|
total_size = 0 |
|
for dirpath, dirnames, filenames in os.walk(path): |
|
for filename in filenames: |
|
filepath = os.path.join(dirpath, filename) |
|
if not os.path.islink(filepath): |
|
total_size += os.path.getsize(filepath) |
|
return total_size |
|
|
|
def main(): |
|
"""Main function to update blog data""" |
|
args = parse_args() |
|
|
|
logger.info("=== Blog Data Update ===") |
|
logger.info(f"Data directory: {args.data_dir}") |
|
logger.info(f"Force recreate: {args.force_recreate}") |
|
logger.info(f"Output directory: {args.output_dir}") |
|
logger.info(f"CI mode: {args.ci}") |
|
logger.info(f"Chunking: {not args.no_chunking}") |
|
if not args.no_chunking: |
|
logger.info(f"Chunk size: {args.chunk_size if args.chunk_size else 'default from config'}") |
|
logger.info(f"Chunk overlap: {args.chunk_overlap if args.chunk_overlap else 'default from config'}") |
|
logger.info("========================") |
|
|
|
try: |
|
|
|
success, message, stats, stats_file, stats_content = create_vector_database( |
|
data_dir=args.data_dir, |
|
storage_path=VECTOR_STORAGE_PATH, |
|
force_recreate=args.force_recreate, |
|
output_dir=args.output_dir, |
|
ci_mode=args.ci, |
|
use_chunking=not args.no_chunking, |
|
chunk_size=args.chunk_size, |
|
chunk_overlap=args.chunk_overlap |
|
) |
|
|
|
logger.info("\n=== Update Summary ===") |
|
if stats: |
|
logger.info(f"Processed {stats['total_documents']} documents") |
|
logger.info(f"Stats saved to: {stats_file}") |
|
logger.info(f"Vector DB status: {message}") |
|
logger.info("=====================") |
|
|
|
|
|
if args.ci and stats: |
|
ci_summary_path = Path(args.output_dir) / "ci_summary.json" |
|
ci_summary = { |
|
"status": "success" if success else "failure", |
|
"message": message, |
|
"stats_file": stats_file, |
|
"document_count": stats["total_documents"], |
|
"vector_store_path": str(VECTOR_STORAGE_PATH) |
|
} |
|
with open(ci_summary_path, "w") as f: |
|
json.dump(ci_summary, f, indent=2) |
|
logger.info(f"CI summary saved to {ci_summary_path}") |
|
|
|
if not success: |
|
return 1 |
|
return 0 |
|
except Exception as e: |
|
logger.error(f"Error: {e}", exc_info=True) |
|
return 1 |
|
|
|
if __name__ == "__main__": |
|
sys.exit(main()) |
|
|