Spaces:
Runtime error
Runtime error
import os | |
import re | |
import time | |
from enum import Enum | |
from typing import List, Optional | |
import requests | |
from langchain.docstore.document import Document | |
from langchain.document_loaders.base import BaseLoader | |
class BlockchainType(Enum): | |
"""Enumerator of the supported blockchains.""" | |
ETH_MAINNET = "eth-mainnet" | |
ETH_GOERLI = "eth-goerli" | |
POLYGON_MAINNET = "polygon-mainnet" | |
POLYGON_MUMBAI = "polygon-mumbai" | |
class BlockchainDocumentLoader(BaseLoader): | |
"""Load elements from a blockchain smart contract. | |
The supported blockchains are: Ethereum mainnet, Ethereum Goerli testnet, | |
Polygon mainnet, and Polygon Mumbai testnet. | |
If no BlockchainType is specified, the default is Ethereum mainnet. | |
The Loader uses the Alchemy API to interact with the blockchain. | |
ALCHEMY_API_KEY environment variable must be set to use this loader. | |
The API returns 100 NFTs per request and can be paginated using the | |
startToken parameter. | |
If get_all_tokens is set to True, the loader will get all tokens | |
on the contract. Note that for contracts with a large number of tokens, | |
this may take a long time (e.g. 10k tokens is 100 requests). | |
Default value is false for this reason. | |
The max_execution_time (sec) can be set to limit the execution time | |
of the loader. | |
Future versions of this loader can: | |
- Support additional Alchemy APIs (e.g. getTransactions, etc.) | |
- Support additional blockain APIs (e.g. Infura, Opensea, etc.) | |
""" | |
def __init__( | |
self, | |
contract_address: str, | |
blockchainType: BlockchainType = BlockchainType.ETH_MAINNET, | |
api_key: str = "docs-demo", | |
startToken: str = "", | |
get_all_tokens: bool = False, | |
max_execution_time: Optional[int] = None, | |
): | |
""" | |
Args: | |
contract_address: The address of the smart contract. | |
blockchainType: The blockchain type. | |
api_key: The Alchemy API key. | |
startToken: The start token for pagination. | |
get_all_tokens: Whether to get all tokens on the contract. | |
max_execution_time: The maximum execution time (sec). | |
""" | |
self.contract_address = contract_address | |
self.blockchainType = blockchainType.value | |
self.api_key = os.environ.get("ALCHEMY_API_KEY") or api_key | |
self.startToken = startToken | |
self.get_all_tokens = get_all_tokens | |
self.max_execution_time = max_execution_time | |
if not self.api_key: | |
raise ValueError("Alchemy API key not provided.") | |
if not re.match(r"^0x[a-fA-F0-9]{40}$", self.contract_address): | |
raise ValueError(f"Invalid contract address {self.contract_address}") | |
def load(self) -> List[Document]: | |
result = [] | |
current_start_token = self.startToken | |
start_time = time.time() | |
while True: | |
url = ( | |
f"https://{self.blockchainType}.g.alchemy.com/nft/v2/" | |
f"{self.api_key}/getNFTsForCollection?withMetadata=" | |
f"True&contractAddress={self.contract_address}" | |
f"&startToken={current_start_token}" | |
) | |
response = requests.get(url) | |
if response.status_code != 200: | |
raise ValueError( | |
f"Request failed with status code {response.status_code}" | |
) | |
items = response.json()["nfts"] | |
if not items: | |
break | |
for item in items: | |
content = str(item) | |
tokenId = item["id"]["tokenId"] | |
metadata = { | |
"source": self.contract_address, | |
"blockchain": self.blockchainType, | |
"tokenId": tokenId, | |
} | |
result.append(Document(page_content=content, metadata=metadata)) | |
# exit after the first API call if get_all_tokens is False | |
if not self.get_all_tokens: | |
break | |
# get the start token for the next API call from the last item in array | |
current_start_token = self._get_next_tokenId(result[-1].metadata["tokenId"]) | |
if ( | |
self.max_execution_time is not None | |
and (time.time() - start_time) > self.max_execution_time | |
): | |
raise RuntimeError("Execution time exceeded the allowed time limit.") | |
if not result: | |
raise ValueError( | |
f"No NFTs found for contract address {self.contract_address}" | |
) | |
return result | |
# add one to the tokenId, ensuring the correct tokenId format is used | |
def _get_next_tokenId(self, tokenId: str) -> str: | |
value_type = self._detect_value_type(tokenId) | |
if value_type == "hex_0x": | |
value_int = int(tokenId, 16) | |
elif value_type == "hex_0xbf": | |
value_int = int(tokenId[2:], 16) | |
else: | |
value_int = int(tokenId) | |
result = value_int + 1 | |
if value_type == "hex_0x": | |
return "0x" + format(result, "0" + str(len(tokenId) - 2) + "x") | |
elif value_type == "hex_0xbf": | |
return "0xbf" + format(result, "0" + str(len(tokenId) - 4) + "x") | |
else: | |
return str(result) | |
# A smart contract can use different formats for the tokenId | |
def _detect_value_type(tokenId: str) -> str: | |
if isinstance(tokenId, int): | |
return "int" | |
elif tokenId.startswith("0x"): | |
return "hex_0x" | |
elif tokenId.startswith("0xbf"): | |
return "hex_0xbf" | |
else: | |
return "hex_0xbf" | |