File size: 10,941 Bytes
9681c5d 4e87dd5 9681c5d 4e87dd5 9681c5d 4e87dd5 9681c5d a092eef 9681c5d 4dc95d7 9681c5d 4e87dd5 9681c5d d5d262c 4e87dd5 3379e0a 9681c5d 4e87dd5 9681c5d 4e87dd5 9681c5d 4e87dd5 9681c5d 4e87dd5 9681c5d 4e87dd5 a092eef f5df877 a092eef 4e87dd5 a092eef 3379e0a f5df877 4e87dd5 f5df877 d5d262c 4e87dd5 d5d262c 9ffcda2 d5d262c 9ffcda2 d5d262c 9ffcda2 3379e0a 9ffcda2 3379e0a 9ffcda2 d5d262c f5df877 4e87dd5 f5df877 4e87dd5 f5df877 4e87dd5 f5df877 4e87dd5 f5df877 9681c5d 4e87dd5 3379e0a 4e87dd5 9681c5d f5df877 4e87dd5 f5df877 4e87dd5 3379e0a f5df877 9681c5d 4e87dd5 9681c5d f5df877 9681c5d 4e87dd5 9681c5d |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 |
"""
Blog Data Update Script
This script updates the blog data vector store when new posts are added.
It can be scheduled to run periodically or manually executed.
Usage:
python pipeline.py [--force-recreate] [--data-dir DATA_DIR] [--output-dir OUTPUT_DIR] [--ci]
Options:
--force-recreate Force recreation of the vector store even if it exists
--data-dir DIR Directory containing the blog posts (default: data/)
--output-dir DIR Directory to save stats and artifacts (default: ./stats)
--ci Run in CI mode (no interactive prompts, exit codes for CI)
"""
import os
import sys
import argparse
from datetime import datetime
import json
import logging
from pathlib import Path
from lets_talk.config import (
CHUNK_OVERLAP, CHUNK_SIZE, VECTOR_STORAGE_PATH, DATA_DIR,
FORCE_RECREATE, OUTPUT_DIR, USE_CHUNKING, SHOULD_SAVE_STATS
)
# Import the blog utilities module
import lets_talk.utils.blog as blog
# Set up logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[logging.StreamHandler()]
)
logger = logging.getLogger("blog-pipeline")
def parse_args():
"""Parse command-line arguments"""
parser = argparse.ArgumentParser(description="Update blog data vector store")
parser.add_argument("--force-recreate", action="store_true",
help="Force recreation of the vector store")
parser.add_argument("--data-dir", default=DATA_DIR,
help=f"Directory containing blog posts (default: {DATA_DIR})")
parser.add_argument("--output-dir", default="./stats",
help="Directory to save stats and artifacts (default: ./stats)")
parser.add_argument("--ci", action="store_true",
help="Run in CI mode (no interactive prompts, exit codes for CI)")
parser.add_argument("--chunk-size", type=int,
help=f"Size of each chunk in characters (default from config)")
parser.add_argument("--chunk-overlap", type=int,
help=f"Overlap between chunks in characters (default from config)")
parser.add_argument("--no-chunking", action="store_true",
help="Don't split documents into chunks (use whole documents)")
return parser.parse_args()
def save_stats(stats, output_dir="./stats", ci_mode=False):
"""Save stats to a JSON file for tracking changes over time
Args:
stats: Dictionary containing statistics about the blog posts
output_dir: Directory to save the stats file
ci_mode: Whether to run in CI mode (use fixed filename)
Returns:
Tuple of (filename, stats_dict)
"""
# Create directory if it doesn't exist
Path(output_dir).mkdir(exist_ok=True, parents=True)
# Create filename with timestamp or use fixed name for CI
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
if ci_mode:
filename = f"{output_dir}/blog_stats_latest.json"
# Also create a timestamped version for historical tracking
history_filename = f"{output_dir}/blog_stats_{timestamp}.json"
else:
filename = f"{output_dir}/blog_stats_{timestamp}.json"
# Save only the basic stats, not the full document list
basic_stats = {
"timestamp": timestamp,
"total_documents": stats["total_documents"],
"total_characters": stats["total_characters"],
"min_length": stats["min_length"],
"max_length": stats["max_length"],
"avg_length": stats["avg_length"],
}
with open(filename, "w") as f:
json.dump(basic_stats, f, indent=2)
# In CI mode, also save a timestamped version
if ci_mode:
with open(history_filename, "w") as f:
json.dump(basic_stats, f, indent=2)
logger.info(f"Saved stats to {filename} and {history_filename}")
else:
logger.info(f"Saved stats to {filename}")
return filename, basic_stats
def create_vector_database(data_dir=DATA_DIR, storage_path=VECTOR_STORAGE_PATH,
force_recreate=FORCE_RECREATE, output_dir=OUTPUT_DIR, ci_mode=False,
use_chunking=USE_CHUNKING, should_save_stats=SHOULD_SAVE_STATS,
chunk_size=CHUNK_SIZE, chunk_overlap=CHUNK_OVERLAP):
"""
Create or update the vector database with blog documents.
Args:
data_dir: Directory containing the blog posts (default from config)
storage_path: Path where the vector database will be stored (default from config)
force_recreate: Whether to force recreation of the vector store (default from config)
output_dir: Directory to save stats and artifacts (default from config)
ci_mode: Whether to run in CI mode
use_chunking: Whether to split documents into chunks (default from config)
should_save_stats: Whether to save statistics about the documents (default from config)
chunk_size: Size of each chunk in characters (default from config)
chunk_overlap: Overlap between chunks in characters (default from config)
Returns:
Tuple of (success status, message, stats, stats_file, stats_file_content)
"""
try:
# Load and process documents
logger.info(f"Loading blog posts from {data_dir}")
documents = blog.load_blog_posts(data_dir)
documents = blog.update_document_metadata(documents)
# Get stats
stats = blog.get_document_stats(documents)
blog.display_document_stats(stats)
# Save stats for tracking
stats_file = None
stats_content = None
if should_save_stats:
stats_file, stats_content = save_stats(stats, output_dir=output_dir, ci_mode=ci_mode)
if use_chunking:
logger.info("Chunking documents...")
# Use provided chunk_size and chunk_overlap or default from config
chunking_params = {}
if chunk_size is not None:
chunking_params['chunk_size'] = chunk_size
if chunk_overlap is not None:
chunking_params['chunk_overlap'] = chunk_overlap
logger.info(f"Using chunk size: {chunking_params.get('chunk_size', 'default')} and overlap: {chunking_params.get('chunk_overlap', 'default')}")
documents = blog.split_documents(documents, **chunking_params)
create_vector_store = (not Path.exists(Path(storage_path))) or force_recreate
if create_vector_store:
logger.info("Creating vector store...")
vector_store = blog.create_vector_store(
documents,
storage_path=storage_path,
force_recreate=force_recreate
)
vector_store.client.close()
logger.info(f"Vector store successfully created at {storage_path}")
# In CI mode, create a metadata file with the build info
if ci_mode:
build_info = {
"build_timestamp": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
"document_count": stats["total_documents"],
"storage_path": str(storage_path),
"vector_store_size_bytes": get_directory_size(storage_path),
}
build_info_path = Path(output_dir) / "vector_store_build_info.json"
with open(build_info_path, "w") as f:
json.dump(build_info, f, indent=2)
logger.info(f"Build info saved to {build_info_path}")
return True, f"Vector store successfully created at {storage_path}", stats, stats_file, stats_content
else:
logger.info(f"Vector store already exists at {storage_path}")
return True, f"Vector store already exists at {storage_path} (use --force-recreate to rebuild)", stats, stats_file, stats_content
except Exception as e:
logger.error(f"Error creating vector store: {str(e)}", exc_info=True)
return False, f"Error creating vector store: {str(e)}", None, None, None
def get_directory_size(path):
"""Get the size of a directory in bytes"""
total_size = 0
for dirpath, dirnames, filenames in os.walk(path):
for filename in filenames:
filepath = os.path.join(dirpath, filename)
if not os.path.islink(filepath):
total_size += os.path.getsize(filepath)
return total_size
def main():
"""Main function to update blog data"""
args = parse_args()
logger.info("=== Blog Data Update ===")
logger.info(f"Data directory: {args.data_dir}")
logger.info(f"Force recreate: {args.force_recreate}")
logger.info(f"Output directory: {args.output_dir}")
logger.info(f"CI mode: {args.ci}")
logger.info(f"Chunking: {not args.no_chunking}")
if not args.no_chunking:
logger.info(f"Chunk size: {args.chunk_size if args.chunk_size else 'default from config'}")
logger.info(f"Chunk overlap: {args.chunk_overlap if args.chunk_overlap else 'default from config'}")
logger.info("========================")
try:
# Create or update vector database
success, message, stats, stats_file, stats_content = create_vector_database(
data_dir=args.data_dir,
storage_path=VECTOR_STORAGE_PATH,
force_recreate=args.force_recreate,
output_dir=args.output_dir,
ci_mode=args.ci,
use_chunking=not args.no_chunking,
chunk_size=args.chunk_size,
chunk_overlap=args.chunk_overlap
)
logger.info("\n=== Update Summary ===")
if stats:
logger.info(f"Processed {stats['total_documents']} documents")
logger.info(f"Stats saved to: {stats_file}")
logger.info(f"Vector DB status: {message}")
logger.info("=====================")
# In CI mode, create a summary file that GitHub Actions can use to set outputs
if args.ci and stats:
ci_summary_path = Path(args.output_dir) / "ci_summary.json"
ci_summary = {
"status": "success" if success else "failure",
"message": message,
"stats_file": stats_file,
"document_count": stats["total_documents"],
"vector_store_path": str(VECTOR_STORAGE_PATH)
}
with open(ci_summary_path, "w") as f:
json.dump(ci_summary, f, indent=2)
logger.info(f"CI summary saved to {ci_summary_path}")
if not success:
return 1
return 0
except Exception as e:
logger.error(f"Error: {e}", exc_info=True)
return 1
if __name__ == "__main__":
sys.exit(main())
|