|
|
from ulid import ULID |
|
|
from typing import Dict |
|
|
import os, re, uuid, hashlib |
|
|
from dotenv import load_dotenv |
|
|
|
|
|
load_dotenv() |
|
|
|
|
|
def chunk_data(data : list | dict, chunk_size: int): |
|
|
""" |
|
|
This function takes an array and a chunk size as input, and returns a new array |
|
|
where the original array is divided into smaller chunks of the specified size. |
|
|
|
|
|
Parameters: |
|
|
data (list): The original data to be chunked. |
|
|
chunk_size (int): The size of each chunk. |
|
|
|
|
|
Returns: |
|
|
list: A new array containing the chunks of the original array. |
|
|
|
|
|
Example: |
|
|
>>> chunk_array([1, 2, 3, 4, 5, 6], 2) |
|
|
[[1, 2], [3, 4], [5, 6]] |
|
|
""" |
|
|
is_object = isinstance(data, dict) |
|
|
is_array = isinstance(data, list) |
|
|
|
|
|
if not is_object and not is_array: |
|
|
raise TypeError("Data must be a list or a dictionary.") |
|
|
elif is_array: |
|
|
return [data[i:i+chunk_size] for i in range(0, len(data), chunk_size)] |
|
|
elif is_object: |
|
|
items = list(data.items()) |
|
|
for i in range(0, len(items), chunk_size): |
|
|
yield dict(items[i:i + chunk_size]) |
|
|
|
|
|
|
|
|
def generate_ulid(seed: any = None) -> str: |
|
|
""" |
|
|
This function generates a Universally Unique Lexicographically Sortable Identifier (ULID). |
|
|
If a seed is provided, it will be used as the basis for the ULID generation. |
|
|
|
|
|
Parameters: |
|
|
seed (any, optional): A value to be used as the basis for the ULID generation. Defaults to None. |
|
|
|
|
|
Returns: |
|
|
str: A string representing the generated ULID. |
|
|
|
|
|
Example: |
|
|
>>> generate_ulid() |
|
|
'00000000-0001-0100-0000-000000000001' |
|
|
>>> generate_ulid('example_seed') |
|
|
'00000000-0001-0100-0000-000000000002' |
|
|
""" |
|
|
if seed is None: |
|
|
ulid = ULID() |
|
|
else: |
|
|
ulid = ULID(seed) |
|
|
|
|
|
return ulid.generate() |
|
|
|
|
|
def create_uuid_from_string(val: str) -> str: |
|
|
""" |
|
|
This function takes a string as input and generates a UUID (Universally Unique Identifier) |
|
|
using the input string as the basis for the hash. |
|
|
|
|
|
Parameters: |
|
|
val (str): The input string from which the UUID will be generated. |
|
|
|
|
|
Returns: |
|
|
str: A string representing the generated UUID. |
|
|
|
|
|
Example: |
|
|
>>> create_uuid_from_string('example_string') |
|
|
'00000000-0001-0100-0000-000000000001' |
|
|
""" |
|
|
hex_string = hashlib.md5(val.encode("UTF-8")).hexdigest() |
|
|
return str(uuid.UUID(hex=hex_string)) |
|
|
|
|
|
|
|
|
def to_snake_case(s): |
|
|
""" |
|
|
This function takes a string as input and converts it to snake_case format. |
|
|
|
|
|
Parameters: |
|
|
s (str): The input string to be converted to snake_case. |
|
|
|
|
|
Returns: |
|
|
str: A string in snake_case format. |
|
|
|
|
|
Example: |
|
|
>>> to_snake_case('FirstName') |
|
|
'first_name' |
|
|
""" |
|
|
if not s: |
|
|
return '' |
|
|
|
|
|
|
|
|
if s.isupper(): |
|
|
return s |
|
|
|
|
|
return '_'.join( |
|
|
word.lower() for word in re.findall(r'[A-Z]{2,}(?=[A-Z][a-z]+[0-9]*|\b)|[A-Z]?[a-z]+[0-9]*|[A-Z]|[0-9]+', s) |
|
|
) |
|
|
|
|
|
def convert_to_snakecase(original_data): |
|
|
""" |
|
|
This function takes a dictionary or list of dictionaries as input and converts its keys to snake_case format. |
|
|
If the input is a list of dictionaries, it will recursively convert each nested dictionary. |
|
|
|
|
|
Parameters: |
|
|
original_data (dict or list): The input dictionary or list of dictionaries. |
|
|
|
|
|
Returns: |
|
|
dict: A new dictionary with keys in snake_case format. |
|
|
|
|
|
Example: |
|
|
>>> convert_to_snakecase({'FirstName': 'John', 'LastName': 'Doe'}) |
|
|
{'first_name': 'John', 'last_name': 'Doe'} |
|
|
>>> convert_to_snakecase([{'FirstName': 'Jane', 'LastName': 'Smith'}, {'FirstName': 'Bob', 'LastName': 'Johnson'}]) |
|
|
[{'first_name': 'Jane', 'last_name': 'Smith'}, {'first_name': 'Bob', 'last_name': 'Johnson'}] |
|
|
""" |
|
|
if isinstance(original_data, dict): |
|
|
transformed_dict = {} |
|
|
for k, v in original_data.items(): |
|
|
new_key = to_snake_case(k) |
|
|
if isinstance(v, (dict, list)): |
|
|
transformed_dict[new_key] = convert_to_snakecase(v) |
|
|
else: |
|
|
transformed_dict[new_key] = v |
|
|
return transformed_dict |
|
|
elif isinstance(original_data, list): |
|
|
return [convert_to_snakecase(item) for item in original_data] |
|
|
else: |
|
|
raise TypeError("Input must be a dictionary or a list of dictionaries.") |
|
|
|
|
|
import json |
|
|
|
|
|
def store_json_data(data, output_file): |
|
|
try: |
|
|
|
|
|
os.makedirs(os.path.dirname(output_file), exist_ok=True) |
|
|
|
|
|
|
|
|
with open(output_file, 'w') as file: |
|
|
json.dump(data, file, indent=4) |
|
|
|
|
|
print(f"Data successfully stored in {output_file}") |
|
|
except Exception as e: |
|
|
print(f"An error occurred: {e}") |
|
|
|
|
|
def get_headers() -> Dict[str, str]: |
|
|
""" |
|
|
Returns the headers for the requests including the API key. |
|
|
""" |
|
|
return { |
|
|
'0x-api-key': f"{os.getenv('OX_API_KEY')}" |
|
|
} |
|
|
|
|
|
def get_private_key() -> str: |
|
|
""" |
|
|
Returns a private key. |
|
|
""" |
|
|
return os.getenv('WALLET_PRIVATE_KEY') |
|
|
|