ParisNeo commited on
Commit
d8980e6
·
unverified ·
1 Parent(s): b198ec9

Create anthropic.py

Browse files
Files changed (1) hide show
  1. lightrag/llm/anthropic.py +271 -0
lightrag/llm/anthropic.py ADDED
@@ -0,0 +1,271 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ from ..utils import verbose_debug, VERBOSE_DEBUG
2
+ import sys
3
+ import os
4
+ import logging
5
+ import numpy as np
6
+ from typing import Any, Union, AsyncIterator
7
+ import pipmaster as pm # Pipmaster for dynamic library install
8
+
9
+ if sys.version_info < (3, 9):
10
+ from typing import AsyncIterator
11
+ else:
12
+ from collections.abc import AsyncIterator
13
+
14
+ # Install Anthropic SDK if not present
15
+ if not pm.is_installed("anthropic"):
16
+ pm.install("anthropic")
17
+
18
+ # Add Voyage AI import
19
+ if not pm.is_installed("voyageai"):
20
+ pm.install("voyageai")
21
+ import voyageai
22
+
23
+ from anthropic import (
24
+ AsyncAnthropic,
25
+ APIConnectionError,
26
+ RateLimitError,
27
+ APITimeoutError,
28
+ )
29
+ from tenacity import (
30
+ retry,
31
+ stop_after_attempt,
32
+ wait_exponential,
33
+ retry_if_exception_type,
34
+ )
35
+ from lightrag.utils import (
36
+ safe_unicode_decode,
37
+ logger,
38
+ )
39
+ from lightrag.api import __api_version__
40
+
41
+ # Custom exception for retry mechanism
42
+ class InvalidResponseError(Exception):
43
+ """Custom exception class for triggering retry mechanism"""
44
+ pass
45
+
46
+ # Core Anthropic completion function with retry
47
+ @retry(
48
+ stop=stop_after_attempt(3),
49
+ wait=wait_exponential(multiplier=1, min=4, max=10),
50
+ retry=retry_if_exception_type(
51
+ (RateLimitError, APIConnectionError, APITimeoutError, InvalidResponseError)
52
+ ),
53
+ )
54
+ async def anthropic_complete_if_cache(
55
+ model: str,
56
+ prompt: str,
57
+ system_prompt: str | None = None,
58
+ history_messages: list[dict[str, Any]] | None = None,
59
+ base_url: str | None = None,
60
+ api_key: str | None = None,
61
+ **kwargs: Any,
62
+ ) -> Union[str, AsyncIterator[str]]:
63
+ if history_messages is None:
64
+ history_messages = []
65
+ if not api_key:
66
+ api_key = os.environ.get("ANTHROPIC_API_KEY")
67
+
68
+ default_headers = {
69
+ "User-Agent": f"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_8) LightRAG/{__api_version__}",
70
+ "Content-Type": "application/json",
71
+ }
72
+
73
+ # Set logger level to INFO when VERBOSE_DEBUG is off
74
+ if not VERBOSE_DEBUG and logger.level == logging.DEBUG:
75
+ logging.getLogger("anthropic").setLevel(logging.INFO)
76
+
77
+ anthropic_async_client = (
78
+ AsyncAnthropic(default_headers=default_headers, api_key=api_key)
79
+ if base_url is None
80
+ else AsyncAnthropic(
81
+ base_url=base_url, default_headers=default_headers, api_key=api_key
82
+ )
83
+ )
84
+ kwargs.pop("hashing_kv", None)
85
+ messages: list[dict[str, Any]] = []
86
+ if system_prompt:
87
+ messages.append({"role": "system", "content": system_prompt})
88
+ messages.extend(history_messages)
89
+ messages.append({"role": "user", "content": prompt})
90
+
91
+ logger.debug("===== Sending Query to Anthropic LLM =====")
92
+ logger.debug(f"Model: {model} Base URL: {base_url}")
93
+ logger.debug(f"Additional kwargs: {kwargs}")
94
+ verbose_debug(f"Query: {prompt}")
95
+ verbose_debug(f"System prompt: {system_prompt}")
96
+
97
+ try:
98
+ response = await anthropic_async_client.messages.create(
99
+ model=model,
100
+ messages=messages,
101
+ stream=True,
102
+ **kwargs
103
+ )
104
+ except APIConnectionError as e:
105
+ logger.error(f"Anthropic API Connection Error: {e}")
106
+ raise
107
+ except RateLimitError as e:
108
+ logger.error(f"Anthropic API Rate Limit Error: {e}")
109
+ raise
110
+ except APITimeoutError as e:
111
+ logger.error(f"Anthropic API Timeout Error: {e}")
112
+ raise
113
+ except Exception as e:
114
+ logger.error(
115
+ f"Anthropic API Call Failed,\nModel: {model},\nParams: {kwargs}, Got: {e}"
116
+ )
117
+ raise
118
+
119
+ async def stream_response():
120
+ try:
121
+ async for event in response:
122
+ content = event.delta.text if hasattr(event, "delta") and event.delta.text else None
123
+ if content is None:
124
+ continue
125
+ if r"\u" in content:
126
+ content = safe_unicode_decode(content.encode("utf-8"))
127
+ yield content
128
+ except Exception as e:
129
+ logger.error(f"Error in stream response: {str(e)}")
130
+ raise
131
+
132
+ return stream_response()
133
+
134
+ # Generic Anthropic completion function
135
+ async def anthropic_complete(
136
+ prompt: str,
137
+ system_prompt: str | None = None,
138
+ history_messages: list[dict[str, Any]] | None = None,
139
+ **kwargs: Any,
140
+ ) -> Union[str, AsyncIterator[str]]:
141
+ if history_messages is None:
142
+ history_messages = []
143
+ model_name = kwargs["hashing_kv"].global_config["llm_model_name"]
144
+ return await anthropic_complete_if_cache(
145
+ model_name,
146
+ prompt,
147
+ system_prompt=system_prompt,
148
+ history_messages=history_messages,
149
+ **kwargs,
150
+ )
151
+
152
+ # Claude 3 Opus specific completion
153
+ async def claude_3_opus_complete(
154
+ prompt: str,
155
+ system_prompt: str | None = None,
156
+ history_messages: list[dict[str, Any]] | None = None,
157
+ **kwargs: Any,
158
+ ) -> Union[str, AsyncIterator[str]]:
159
+ if history_messages is None:
160
+ history_messages = []
161
+ return await anthropic_complete_if_cache(
162
+ "claude-3-opus-20240229",
163
+ prompt,
164
+ system_prompt=system_prompt,
165
+ history_messages=history_messages,
166
+ **kwargs,
167
+ )
168
+
169
+ # Claude 3 Sonnet specific completion
170
+ async def claude_3_sonnet_complete(
171
+ prompt: str,
172
+ system_prompt: str | None = None,
173
+ history_messages: list[dict[str, Any]] | None = None,
174
+ **kwargs: Any,
175
+ ) -> Union[str, AsyncIterator[str]]:
176
+ if history_messages is None:
177
+ history_messages = []
178
+ return await anthropic_complete_if_cache(
179
+ "claude-3-sonnet-20240229",
180
+ prompt,
181
+ system_prompt=system_prompt,
182
+ history_messages=history_messages,
183
+ **kwargs,
184
+ )
185
+
186
+ # Claude 3 Haiku specific completion
187
+ async def claude_3_haiku_complete(
188
+ prompt: str,
189
+ system_prompt: str | None = None,
190
+ history_messages: list[dict[str, Any]] | None = None,
191
+ **kwargs: Any,
192
+ ) -> Union[str, AsyncIterator[str]]:
193
+ if history_messages is None:
194
+ history_messages = []
195
+ return await anthropic_complete_if_cache(
196
+ "claude-3-haiku-20240307",
197
+ prompt,
198
+ system_prompt=system_prompt,
199
+ history_messages=history_messages,
200
+ **kwargs,
201
+ )
202
+
203
+ # Embedding function (placeholder, as Anthropic does not provide embeddings)
204
+ @retry(
205
+ stop=stop_after_attempt(3),
206
+ wait=wait_exponential(multiplier=1, min=4, max=60),
207
+ retry=retry_if_exception_type(
208
+ (RateLimitError, APIConnectionError, APITimeoutError)
209
+ ),
210
+ )
211
+ async def anthropic_embed(
212
+ texts: list[str],
213
+ model: str = "voyage-3", # Default to voyage-3 as a good general-purpose model
214
+ base_url: str = None,
215
+ api_key: str = None,
216
+ ) -> np.ndarray:
217
+ """
218
+ Generate embeddings using Voyage AI since Anthropic doesn't provide native embedding support.
219
+
220
+ Args:
221
+ texts: List of text strings to embed
222
+ model: Voyage AI model name (e.g., "voyage-3", "voyage-3-large", "voyage-code-3")
223
+ base_url: Optional custom base URL (not used for Voyage AI)
224
+ api_key: API key for Voyage AI (defaults to VOYAGE_API_KEY environment variable)
225
+
226
+ Returns:
227
+ numpy array of shape (len(texts), embedding_dimension) containing the embeddings
228
+ """
229
+ if not api_key:
230
+ api_key = os.environ.get("VOYAGE_API_KEY")
231
+ if not api_key:
232
+ logger.error("VOYAGE_API_KEY environment variable not set")
233
+ raise ValueError("VOYAGE_API_KEY environment variable is required for embeddings")
234
+
235
+ try:
236
+ # Initialize Voyage AI client
237
+ voyage_client = voyageai.Client(api_key=api_key)
238
+
239
+ # Get embeddings
240
+ result = voyage_client.embed(
241
+ texts,
242
+ model=model,
243
+ input_type="document" # Assuming document context; could be made configurable
244
+ )
245
+
246
+ # Convert list of embeddings to numpy array
247
+ embeddings = np.array(result.embeddings, dtype=np.float32)
248
+
249
+ logger.debug(f"Generated embeddings for {len(texts)} texts using {model}")
250
+ verbose_debug(f"Embedding shape: {embeddings.shape}")
251
+
252
+ return embeddings
253
+
254
+ except Exception as e:
255
+ logger.error(f"Voyage AI embedding failed: {str(e)}")
256
+ raise
257
+
258
+ # Optional: a helper function to get available embedding models
259
+ def get_available_embedding_models() -> dict[str, dict]:
260
+ """
261
+ Returns a dictionary of available Voyage AI embedding models and their properties.
262
+ """
263
+ return {
264
+ "voyage-3-large": {"context_length": 32000, "dimension": 1024, "description": "Best general-purpose and multilingual"},
265
+ "voyage-3": {"context_length": 32000, "dimension": 1024, "description": "General-purpose and multilingual"},
266
+ "voyage-3-lite": {"context_length": 32000, "dimension": 512, "description": "Optimized for latency and cost"},
267
+ "voyage-code-3": {"context_length": 32000, "dimension": 1024, "description": "Optimized for code"},
268
+ "voyage-finance-2": {"context_length": 32000, "dimension": 1024, "description": "Optimized for finance"},
269
+ "voyage-law-2": {"context_length": 16000, "dimension": 1024, "description": "Optimized for legal"},
270
+ "voyage-multimodal-3": {"context_length": 32000, "dimension": 1024, "description": "Multimodal text and images"},
271
+ }