yangdx commited on
Commit
dc02042
·
1 Parent(s): 5812726

Add multiple workers support for API Server

Browse files
lightrag/api/lightrag_server.py CHANGED
@@ -8,11 +8,12 @@ from fastapi import (
8
  )
9
  from fastapi.responses import FileResponse
10
  import asyncio
11
- import threading
12
  import os
13
- from fastapi.staticfiles import StaticFiles
14
  import logging
15
- from typing import Dict
 
 
16
  from pathlib import Path
17
  import configparser
18
  from ascii_colors import ASCIIColors
@@ -49,18 +50,6 @@ except Exception as e:
49
  config = configparser.ConfigParser()
50
  config.read("config.ini")
51
 
52
- # Global progress tracker
53
- scan_progress: Dict = {
54
- "is_scanning": False,
55
- "current_file": "",
56
- "indexed_count": 0,
57
- "total_files": 0,
58
- "progress": 0,
59
- }
60
-
61
- # Lock for thread-safe operations
62
- progress_lock = threading.Lock()
63
-
64
 
65
  class AccessLogFilter(logging.Filter):
66
  def __init__(self):
@@ -95,7 +84,6 @@ class AccessLogFilter(logging.Filter):
95
 
96
 
97
  def create_app(args):
98
-
99
  # Initialize verbose debug setting
100
  from lightrag.utils import set_verbose_debug
101
 
@@ -155,25 +143,12 @@ def create_app(args):
155
 
156
  # Auto scan documents if enabled
157
  if args.auto_scan_at_startup:
158
- # Start scanning in background
159
- with progress_lock:
160
- if not scan_progress["is_scanning"]:
161
- scan_progress["is_scanning"] = True
162
- scan_progress["indexed_count"] = 0
163
- scan_progress["progress"] = 0
164
- # Create background task
165
- task = asyncio.create_task(
166
- run_scanning_process(rag, doc_manager)
167
- )
168
- app.state.background_tasks.add(task)
169
- task.add_done_callback(app.state.background_tasks.discard)
170
- ASCIIColors.info(
171
- f"Started background scanning of documents from {args.input_dir}"
172
- )
173
- else:
174
- ASCIIColors.info(
175
- "Skip document scanning(another scanning is active)"
176
- )
177
 
178
  ASCIIColors.green("\nServer is ready to accept connections! 🚀\n")
179
 
@@ -429,48 +404,67 @@ def create_app(args):
429
  return app
430
 
431
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
432
  def main():
 
 
 
433
  args = parse_args()
434
- import uvicorn
435
- import logging.config
436
 
437
  # Configure uvicorn logging
438
- logging.config.dictConfig(
439
- {
440
- "version": 1,
441
- "disable_existing_loggers": False,
442
- "formatters": {
443
- "default": {
444
- "format": "%(levelname)s: %(message)s",
445
- },
446
  },
447
- "handlers": {
448
- "default": {
449
- "formatter": "default",
450
- "class": "logging.StreamHandler",
451
- "stream": "ext://sys.stderr",
452
- },
453
  },
454
- "loggers": {
455
- "uvicorn.access": {
456
- "handlers": ["default"],
457
- "level": "INFO",
458
- "propagate": False,
459
- },
460
  },
461
- }
462
- )
463
 
464
  # Add filter to uvicorn access logger
465
  uvicorn_access_logger = logging.getLogger("uvicorn.access")
466
  uvicorn_access_logger.addFilter(AccessLogFilter())
467
 
468
- app = create_app(args)
469
  display_splash_screen(args)
 
470
  uvicorn_config = {
471
- "app": app,
 
472
  "host": args.host,
473
  "port": args.port,
 
474
  "log_config": None, # Disable default config
475
  }
476
  if args.ssl:
 
8
  )
9
  from fastapi.responses import FileResponse
10
  import asyncio
 
11
  import os
12
+ import json
13
  import logging
14
+ import logging.config
15
+ import uvicorn
16
+ from fastapi.staticfiles import StaticFiles
17
  from pathlib import Path
18
  import configparser
19
  from ascii_colors import ASCIIColors
 
50
  config = configparser.ConfigParser()
51
  config.read("config.ini")
52
 
 
 
 
 
 
 
 
 
 
 
 
 
53
 
54
  class AccessLogFilter(logging.Filter):
55
  def __init__(self):
 
84
 
85
 
86
  def create_app(args):
 
87
  # Initialize verbose debug setting
88
  from lightrag.utils import set_verbose_debug
89
 
 
143
 
144
  # Auto scan documents if enabled
145
  if args.auto_scan_at_startup:
146
+ # Create background task
147
+ task = asyncio.create_task(
148
+ run_scanning_process(rag, doc_manager)
149
+ )
150
+ app.state.background_tasks.add(task)
151
+ task.add_done_callback(app.state.background_tasks.discard)
 
 
 
 
 
 
 
 
 
 
 
 
 
152
 
153
  ASCIIColors.green("\nServer is ready to accept connections! 🚀\n")
154
 
 
404
  return app
405
 
406
 
407
+ def get_application():
408
+ """Factory function for creating the FastAPI application"""
409
+ from .utils_api import initialize_manager
410
+ initialize_manager()
411
+
412
+ # Get args from environment variable
413
+ args_json = os.environ.get('LIGHTRAG_ARGS')
414
+ if not args_json:
415
+ args = parse_args() # Fallback to parsing args if env var not set
416
+ else:
417
+ import types
418
+ args = types.SimpleNamespace(**json.loads(args_json))
419
+
420
+ return create_app(args)
421
+
422
+
423
  def main():
424
+ from multiprocessing import freeze_support
425
+ freeze_support()
426
+
427
  args = parse_args()
428
+ # Save args to environment variable for child processes
429
+ os.environ['LIGHTRAG_ARGS'] = json.dumps(vars(args))
430
 
431
  # Configure uvicorn logging
432
+ logging.config.dictConfig({
433
+ "version": 1,
434
+ "disable_existing_loggers": False,
435
+ "formatters": {
436
+ "default": {
437
+ "format": "%(levelname)s: %(message)s",
 
 
438
  },
439
+ },
440
+ "handlers": {
441
+ "default": {
442
+ "formatter": "default",
443
+ "class": "logging.StreamHandler",
444
+ "stream": "ext://sys.stderr",
445
  },
446
+ },
447
+ "loggers": {
448
+ "uvicorn.access": {
449
+ "handlers": ["default"],
450
+ "level": "INFO",
451
+ "propagate": False,
452
  },
453
+ },
454
+ })
455
 
456
  # Add filter to uvicorn access logger
457
  uvicorn_access_logger = logging.getLogger("uvicorn.access")
458
  uvicorn_access_logger.addFilter(AccessLogFilter())
459
 
 
460
  display_splash_screen(args)
461
+
462
  uvicorn_config = {
463
+ "app": "lightrag.api.lightrag_server:get_application",
464
+ "factory": True,
465
  "host": args.host,
466
  "port": args.port,
467
+ "workers": args.workers,
468
  "log_config": None, # Disable default config
469
  }
470
  if args.ssl:
lightrag/api/routers/document_routes.py CHANGED
@@ -12,29 +12,23 @@ import pipmaster as pm
12
  from datetime import datetime
13
  from pathlib import Path
14
  from typing import Dict, List, Optional, Any
15
-
16
  from fastapi import APIRouter, BackgroundTasks, Depends, File, HTTPException, UploadFile
17
  from pydantic import BaseModel, Field, field_validator
18
 
19
  from lightrag import LightRAG
20
  from lightrag.base import DocProcessingStatus, DocStatus
21
- from ..utils_api import get_api_key_dependency
 
 
 
 
 
 
22
 
23
 
24
  router = APIRouter(prefix="/documents", tags=["documents"])
25
 
26
- # Global progress tracker
27
- scan_progress: Dict = {
28
- "is_scanning": False,
29
- "current_file": "",
30
- "indexed_count": 0,
31
- "total_files": 0,
32
- "progress": 0,
33
- }
34
-
35
- # Lock for thread-safe operations
36
- progress_lock = asyncio.Lock()
37
-
38
  # Temporary file prefix
39
  temp_prefix = "__tmp__"
40
 
@@ -167,13 +161,6 @@ class DocumentManager:
167
  new_files.append(file_path)
168
  return new_files
169
 
170
- # def scan_directory(self) -> List[Path]:
171
- # new_files = []
172
- # for ext in self.supported_extensions:
173
- # for file_path in self.input_dir.rglob(f"*{ext}"):
174
- # new_files.append(file_path)
175
- # return new_files
176
-
177
  def mark_as_indexed(self, file_path: Path):
178
  self.indexed_files.add(file_path)
179
 
@@ -390,24 +377,24 @@ async def save_temp_file(input_dir: Path, file: UploadFile = File(...)) -> Path:
390
 
391
 
392
  async def run_scanning_process(rag: LightRAG, doc_manager: DocumentManager):
393
- """Background task to scan and index documents"""
 
 
 
 
 
 
394
  try:
395
  new_files = doc_manager.scan_directory_for_new_files()
396
- scan_progress["total_files"] = len(new_files)
 
397
 
398
- logging.info(f"Found {len(new_files)} new files to index.")
399
- for file_path in new_files:
400
  try:
401
- async with progress_lock:
402
- scan_progress["current_file"] = os.path.basename(file_path)
403
-
404
  await pipeline_index_file(rag, file_path)
405
-
406
- async with progress_lock:
407
- scan_progress["indexed_count"] += 1
408
- scan_progress["progress"] = (
409
- scan_progress["indexed_count"] / scan_progress["total_files"]
410
- ) * 100
411
 
412
  except Exception as e:
413
  logging.error(f"Error indexing file {file_path}: {str(e)}")
@@ -415,8 +402,7 @@ async def run_scanning_process(rag: LightRAG, doc_manager: DocumentManager):
415
  except Exception as e:
416
  logging.error(f"Error during scanning process: {str(e)}")
417
  finally:
418
- async with progress_lock:
419
- scan_progress["is_scanning"] = False
420
 
421
 
422
  def create_document_routes(
@@ -436,14 +422,6 @@ def create_document_routes(
436
  Returns:
437
  dict: A dictionary containing the scanning status
438
  """
439
- async with progress_lock:
440
- if scan_progress["is_scanning"]:
441
- return {"status": "already_scanning"}
442
-
443
- scan_progress["is_scanning"] = True
444
- scan_progress["indexed_count"] = 0
445
- scan_progress["progress"] = 0
446
-
447
  # Start the scanning process in the background
448
  background_tasks.add_task(run_scanning_process, rag, doc_manager)
449
  return {"status": "scanning_started"}
@@ -461,8 +439,7 @@ def create_document_routes(
461
  - total_files: Total number of files to process
462
  - progress: Percentage of completion
463
  """
464
- async with progress_lock:
465
- return scan_progress
466
 
467
  @router.post("/upload", dependencies=[Depends(optional_api_key)])
468
  async def upload_to_input_dir(
 
12
  from datetime import datetime
13
  from pathlib import Path
14
  from typing import Dict, List, Optional, Any
15
+ from ascii_colors import ASCIIColors
16
  from fastapi import APIRouter, BackgroundTasks, Depends, File, HTTPException, UploadFile
17
  from pydantic import BaseModel, Field, field_validator
18
 
19
  from lightrag import LightRAG
20
  from lightrag.base import DocProcessingStatus, DocStatus
21
+ from ..utils_api import (
22
+ get_api_key_dependency,
23
+ scan_progress,
24
+ update_scan_progress_if_not_scanning,
25
+ update_scan_progress,
26
+ reset_scan_progress,
27
+ )
28
 
29
 
30
  router = APIRouter(prefix="/documents", tags=["documents"])
31
 
 
 
 
 
 
 
 
 
 
 
 
 
32
  # Temporary file prefix
33
  temp_prefix = "__tmp__"
34
 
 
161
  new_files.append(file_path)
162
  return new_files
163
 
 
 
 
 
 
 
 
164
  def mark_as_indexed(self, file_path: Path):
165
  self.indexed_files.add(file_path)
166
 
 
377
 
378
 
379
  async def run_scanning_process(rag: LightRAG, doc_manager: DocumentManager):
380
+ """Background task to scan and index documents"""
381
+ if not update_scan_progress_if_not_scanning():
382
+ ASCIIColors.info(
383
+ "Skip document scanning(another scanning is active)"
384
+ )
385
+ return
386
+
387
  try:
388
  new_files = doc_manager.scan_directory_for_new_files()
389
+ total_files = len(new_files)
390
+ update_scan_progress("", total_files, 0) # Initialize progress
391
 
392
+ logging.info(f"Found {total_files} new files to index.")
393
+ for idx, file_path in enumerate(new_files):
394
  try:
395
+ update_scan_progress(os.path.basename(file_path), total_files, idx)
 
 
396
  await pipeline_index_file(rag, file_path)
397
+ update_scan_progress(os.path.basename(file_path), total_files, idx + 1)
 
 
 
 
 
398
 
399
  except Exception as e:
400
  logging.error(f"Error indexing file {file_path}: {str(e)}")
 
402
  except Exception as e:
403
  logging.error(f"Error during scanning process: {str(e)}")
404
  finally:
405
+ reset_scan_progress()
 
406
 
407
 
408
  def create_document_routes(
 
422
  Returns:
423
  dict: A dictionary containing the scanning status
424
  """
 
 
 
 
 
 
 
 
425
  # Start the scanning process in the background
426
  background_tasks.add_task(run_scanning_process, rag, doc_manager)
427
  return {"status": "scanning_started"}
 
439
  - total_files: Total number of files to process
440
  - progress: Percentage of completion
441
  """
442
+ return dict(scan_progress)
 
443
 
444
  @router.post("/upload", dependencies=[Depends(optional_api_key)])
445
  async def upload_to_input_dir(
lightrag/api/utils_api.py CHANGED
@@ -6,6 +6,7 @@ import os
6
  import argparse
7
  from typing import Optional
8
  import sys
 
9
  from ascii_colors import ASCIIColors
10
  from lightrag.api import __api_version__
11
  from fastapi import HTTPException, Security
@@ -16,6 +17,66 @@ from starlette.status import HTTP_403_FORBIDDEN
16
  # Load environment variables
17
  load_dotenv(override=True)
18
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
19
 
20
  class OllamaServerInfos:
21
  # Constants for emulated Ollama model information
@@ -260,6 +321,14 @@ def parse_args() -> argparse.Namespace:
260
  help="Enable automatic scanning when the program starts",
261
  )
262
 
 
 
 
 
 
 
 
 
263
  # LLM and embedding bindings
264
  parser.add_argument(
265
  "--llm-binding",
 
6
  import argparse
7
  from typing import Optional
8
  import sys
9
+ from multiprocessing import Manager
10
  from ascii_colors import ASCIIColors
11
  from lightrag.api import __api_version__
12
  from fastapi import HTTPException, Security
 
17
  # Load environment variables
18
  load_dotenv(override=True)
19
 
20
+ # Global variables for manager and shared state
21
+ manager = None
22
+ scan_progress = None
23
+ scan_lock = None
24
+
25
+ def initialize_manager():
26
+ """Initialize manager and shared state for cross-process communication"""
27
+ global manager, scan_progress, scan_lock
28
+ if manager is None:
29
+ manager = Manager()
30
+ scan_progress = manager.dict({
31
+ "is_scanning": False,
32
+ "current_file": "",
33
+ "indexed_count": 0,
34
+ "total_files": 0,
35
+ "progress": 0,
36
+ })
37
+ scan_lock = manager.Lock()
38
+
39
+ def update_scan_progress_if_not_scanning():
40
+ """
41
+ Atomically check if scanning is not in progress and update scan_progress if it's not.
42
+ Returns True if the update was successful, False if scanning was already in progress.
43
+ """
44
+ with scan_lock:
45
+ if not scan_progress["is_scanning"]:
46
+ scan_progress.update({
47
+ "is_scanning": True,
48
+ "current_file": "",
49
+ "indexed_count": 0,
50
+ "total_files": 0,
51
+ "progress": 0,
52
+ })
53
+ return True
54
+ return False
55
+
56
+ def update_scan_progress(current_file: str, total_files: int, indexed_count: int):
57
+ """
58
+ Atomically update scan progress information.
59
+ """
60
+ progress = (indexed_count / total_files * 100) if total_files > 0 else 0
61
+ scan_progress.update({
62
+ "current_file": current_file,
63
+ "indexed_count": indexed_count,
64
+ "total_files": total_files,
65
+ "progress": progress,
66
+ })
67
+
68
+ def reset_scan_progress():
69
+ """
70
+ Atomically reset scan progress to initial state.
71
+ """
72
+ scan_progress.update({
73
+ "is_scanning": False,
74
+ "current_file": "",
75
+ "indexed_count": 0,
76
+ "total_files": 0,
77
+ "progress": 0,
78
+ })
79
+
80
 
81
  class OllamaServerInfos:
82
  # Constants for emulated Ollama model information
 
321
  help="Enable automatic scanning when the program starts",
322
  )
323
 
324
+ # Server workers configuration
325
+ parser.add_argument(
326
+ "--workers",
327
+ type=int,
328
+ default=get_env_value("WORKERS", 2, int),
329
+ help="Number of worker processes (default: from env or 2)",
330
+ )
331
+
332
  # LLM and embedding bindings
333
  parser.add_argument(
334
  "--llm-binding",