Spaces:
Runtime error
Runtime error
"""Lightweight wrapper around requests library, with async support.""" | |
from contextlib import asynccontextmanager | |
from typing import Any, AsyncGenerator, Dict, Optional | |
import aiohttp | |
import requests | |
from langchain_core.pydantic_v1 import BaseModel, Extra | |
class Requests(BaseModel): | |
"""Wrapper around requests to handle auth and async. | |
The main purpose of this wrapper is to handle authentication (by saving | |
headers) and enable easy async methods on the same base object. | |
""" | |
headers: Optional[Dict[str, str]] = None | |
aiosession: Optional[aiohttp.ClientSession] = None | |
auth: Optional[Any] = None | |
class Config: | |
"""Configuration for this pydantic object.""" | |
extra = Extra.forbid | |
arbitrary_types_allowed = True | |
def get(self, url: str, **kwargs: Any) -> requests.Response: | |
"""GET the URL and return the text.""" | |
return requests.get(url, headers=self.headers, auth=self.auth, **kwargs) | |
def post(self, url: str, data: Dict[str, Any], **kwargs: Any) -> requests.Response: | |
"""POST to the URL and return the text.""" | |
return requests.post( | |
url, json=data, headers=self.headers, auth=self.auth, **kwargs | |
) | |
def patch(self, url: str, data: Dict[str, Any], **kwargs: Any) -> requests.Response: | |
"""PATCH the URL and return the text.""" | |
return requests.patch( | |
url, json=data, headers=self.headers, auth=self.auth, **kwargs | |
) | |
def put(self, url: str, data: Dict[str, Any], **kwargs: Any) -> requests.Response: | |
"""PUT the URL and return the text.""" | |
return requests.put( | |
url, json=data, headers=self.headers, auth=self.auth, **kwargs | |
) | |
def delete(self, url: str, **kwargs: Any) -> requests.Response: | |
"""DELETE the URL and return the text.""" | |
return requests.delete(url, headers=self.headers, auth=self.auth, **kwargs) | |
async def _arequest( | |
self, method: str, url: str, **kwargs: Any | |
) -> AsyncGenerator[aiohttp.ClientResponse, None]: | |
"""Make an async request.""" | |
if not self.aiosession: | |
async with aiohttp.ClientSession() as session: | |
async with session.request( | |
method, url, headers=self.headers, auth=self.auth, **kwargs | |
) as response: | |
yield response | |
else: | |
async with self.aiosession.request( | |
method, url, headers=self.headers, auth=self.auth, **kwargs | |
) as response: | |
yield response | |
async def aget( | |
self, url: str, **kwargs: Any | |
) -> AsyncGenerator[aiohttp.ClientResponse, None]: | |
"""GET the URL and return the text asynchronously.""" | |
async with self._arequest("GET", url, **kwargs) as response: | |
yield response | |
async def apost( | |
self, url: str, data: Dict[str, Any], **kwargs: Any | |
) -> AsyncGenerator[aiohttp.ClientResponse, None]: | |
"""POST to the URL and return the text asynchronously.""" | |
async with self._arequest("POST", url, json=data, **kwargs) as response: | |
yield response | |
async def apatch( | |
self, url: str, data: Dict[str, Any], **kwargs: Any | |
) -> AsyncGenerator[aiohttp.ClientResponse, None]: | |
"""PATCH the URL and return the text asynchronously.""" | |
async with self._arequest("PATCH", url, json=data, **kwargs) as response: | |
yield response | |
async def aput( | |
self, url: str, data: Dict[str, Any], **kwargs: Any | |
) -> AsyncGenerator[aiohttp.ClientResponse, None]: | |
"""PUT the URL and return the text asynchronously.""" | |
async with self._arequest("PUT", url, json=data, **kwargs) as response: | |
yield response | |
async def adelete( | |
self, url: str, **kwargs: Any | |
) -> AsyncGenerator[aiohttp.ClientResponse, None]: | |
"""DELETE the URL and return the text asynchronously.""" | |
async with self._arequest("DELETE", url, **kwargs) as response: | |
yield response | |
class TextRequestsWrapper(BaseModel): | |
"""Lightweight wrapper around requests library. | |
The main purpose of this wrapper is to always return a text output. | |
""" | |
headers: Optional[Dict[str, str]] = None | |
aiosession: Optional[aiohttp.ClientSession] = None | |
auth: Optional[Any] = None | |
class Config: | |
"""Configuration for this pydantic object.""" | |
extra = Extra.forbid | |
arbitrary_types_allowed = True | |
def requests(self) -> Requests: | |
return Requests( | |
headers=self.headers, aiosession=self.aiosession, auth=self.auth | |
) | |
def get(self, url: str, **kwargs: Any) -> str: | |
"""GET the URL and return the text.""" | |
return self.requests.get(url, **kwargs).text | |
def post(self, url: str, data: Dict[str, Any], **kwargs: Any) -> str: | |
"""POST to the URL and return the text.""" | |
return self.requests.post(url, data, **kwargs).text | |
def patch(self, url: str, data: Dict[str, Any], **kwargs: Any) -> str: | |
"""PATCH the URL and return the text.""" | |
return self.requests.patch(url, data, **kwargs).text | |
def put(self, url: str, data: Dict[str, Any], **kwargs: Any) -> str: | |
"""PUT the URL and return the text.""" | |
return self.requests.put(url, data, **kwargs).text | |
def delete(self, url: str, **kwargs: Any) -> str: | |
"""DELETE the URL and return the text.""" | |
return self.requests.delete(url, **kwargs).text | |
async def aget(self, url: str, **kwargs: Any) -> str: | |
"""GET the URL and return the text asynchronously.""" | |
async with self.requests.aget(url, **kwargs) as response: | |
return await response.text() | |
async def apost(self, url: str, data: Dict[str, Any], **kwargs: Any) -> str: | |
"""POST to the URL and return the text asynchronously.""" | |
async with self.requests.apost(url, data, **kwargs) as response: | |
return await response.text() | |
async def apatch(self, url: str, data: Dict[str, Any], **kwargs: Any) -> str: | |
"""PATCH the URL and return the text asynchronously.""" | |
async with self.requests.apatch(url, data, **kwargs) as response: | |
return await response.text() | |
async def aput(self, url: str, data: Dict[str, Any], **kwargs: Any) -> str: | |
"""PUT the URL and return the text asynchronously.""" | |
async with self.requests.aput(url, data, **kwargs) as response: | |
return await response.text() | |
async def adelete(self, url: str, **kwargs: Any) -> str: | |
"""DELETE the URL and return the text asynchronously.""" | |
async with self.requests.adelete(url, **kwargs) as response: | |
return await response.text() | |
# For backwards compatibility | |
RequestsWrapper = TextRequestsWrapper | |