Spaces:
Sleeping
Sleeping
# app.py | |
import streamlit as st | |
# Set page config | |
st.set_page_config( | |
page_title="Quantum Bank: Invented by James Burvel O'Callaghan III", | |
page_icon="💳", | |
layout="wide", | |
) | |
# Import necessary libraries | |
import requests | |
import base64 | |
import uuid | |
import os | |
import hashlib | |
from datetime import datetime, date, timedelta | |
import pandas as pd | |
from sklearn.ensemble import IsolationForest # For anomaly detection | |
from cryptography.fernet import Fernet # For encryption | |
import jwt # For JWT authentication | |
from sklearn.linear_model import LinearRegression # For predictive analytics | |
import numpy as np | |
import stripe | |
import plotly.express as px | |
from qiskit import Aer, QuantumCircuit, execute # For quantum computing simulation | |
from web3 import Web3 # For blockchain interaction | |
from web3.middleware import geth_poa_middleware # For Infura with PoA networks | |
import openai # For OpenAI ChatGPT integration | |
from github import Github # For GitHub API | |
from huggingface_hub import HfApi, HfFolder, Repository # For Hugging Face API | |
from requests.auth import HTTPBasicAuth | |
from oauthlib.oauth1 import Client # For MasterCard OAuth1 | |
import json | |
# Optional imports for barcode scanning | |
barcode_scanning_available = True | |
try: | |
import cv2 | |
from pyzbar import pyzbar | |
from streamlit_webrtc import webrtc_streamer, VideoTransformerBase | |
except ImportError: | |
cv2 = None | |
pyzbar = None | |
webrtc_streamer = None | |
VideoTransformerBase = object | |
barcode_scanning_available = False | |
# Now, after st.set_page_config(), you can use Streamlit commands | |
if not barcode_scanning_available: | |
st.warning("OpenCV and pyzbar are not available. Barcode scanning will be disabled.") | |
# Load environment variables securely | |
def load_env_vars(): | |
env_vars = os.environ | |
return env_vars | |
env_vars = load_env_vars() | |
# Securely retrieve API keys | |
def get_env_var(key): | |
return env_vars.get(key) | |
# Initialize API keys and configurations | |
stripe.api_key = get_env_var('STRIPE_API_KEY') | |
plaid_client_id = get_env_var('PLAID_CLIENT_ID') | |
plaid_secret = get_env_var('PLAID_SECRET') | |
plaid_env = get_env_var('PLAID_ENV') or 'sandbox' | |
citibank_client_id = get_env_var('CITIBANK_CLIENT_ID') | |
citibank_client_secret = get_env_var('CITIBANK_CLIENT_SECRET') | |
citibank_api_key = get_env_var('CITIBANK_API_KEY') | |
mt_api_key = get_env_var('MT_API_KEY') | |
mt_organization_id = get_env_var('MT_ORG_ID') | |
github_token = get_env_var('GITHUB_TOKEN') | |
huggingface_token = get_env_var('HUGGINGFACE_TOKEN') | |
openai_api_key = get_env_var('OPENAI_API_KEY') | |
infura_project_id = get_env_var('INFURA_PROJECT_ID') | |
metamask_account_address = get_env_var('METAMASK_ACCOUNT_ADDRESS') | |
metamask_private_key = get_env_var('METAMASK_PRIVATE_KEY') | |
visa_api_key = get_env_var('VISA_API_KEY') | |
visa_api_secret = get_env_var('VISA_API_SECRET') | |
visa_cert_path = get_env_var('VISA_CERT_PATH') | |
mastercard_consumer_key = get_env_var('MASTERCARD_CONSUMER_KEY') | |
mastercard_private_key = get_env_var('MASTERCARD_PRIVATE_KEY') | |
pipedream_url = get_env_var('PIPEDREAM_URL') | |
# Set OpenAI API key | |
openai.api_key = openai_api_key | |
# Initialize Web3 | |
if infura_project_id: | |
infura_url = f"https://mainnet.infura.io/v3/{infura_project_id}" | |
web3 = Web3(Web3.HTTPProvider(infura_url)) | |
# For PoA networks like Ropsten or Rinkeby, use the following line | |
# web3.middleware_onion.inject(geth_poa_middleware, layer=0) | |
else: | |
web3 = None | |
# Initialize GitHub and Hugging Face clients | |
github_client = Github(github_token) if github_token else None | |
hf_api = HfApi() | |
if huggingface_token: | |
HfFolder.save_token(huggingface_token) | |
else: | |
huggingface_token = HfFolder.get_token() | |
# Initialize Plaid client if credentials are provided | |
if get_env_var('PLAID_CLIENT_ID') and get_env_var('PLAID_SECRET'): | |
from plaid.api import plaid_api | |
from plaid.model import * | |
from plaid.api_client import ApiClient | |
from plaid.configuration import Configuration | |
plaid_environment = { | |
'sandbox': plaid.Environment.Sandbox, | |
'development': plaid.Environment.Development, | |
'production': plaid.Environment.Production, | |
} | |
plaid_config = Configuration( | |
host=plaid_environment.get(plaid_env, plaid.Environment.Sandbox), | |
api_key={ | |
'clientId': plaid_client_id, | |
'secret': plaid_secret, | |
} | |
) | |
plaid_api_client = ApiClient(plaid_config) | |
plaid_client = plaid_api.PlaidApi(plaid_api_client) | |
else: | |
plaid_client = None | |
# Initialize user database (In-memory for demonstration purposes) | |
if 'user_database' not in st.session_state: | |
st.session_state['user_database'] = {} | |
# JWT Authentication | |
def authenticate_user(): | |
st.sidebar.title("User Authentication") | |
auth_option = st.sidebar.radio("Select an option", ["Login", "Create Account"]) | |
if auth_option == "Login": | |
username = st.sidebar.text_input("Username") | |
password = st.sidebar.text_input("Password", type="password") | |
if st.sidebar.button("Login"): | |
# Hash the password | |
hashed_password = hashlib.sha256(password.encode()).hexdigest() | |
# Check credentials | |
user_db = st.session_state['user_database'] | |
if username in user_db and user_db[username]['password'] == hashed_password: | |
token = jwt.encode({'user': username}, 'secret', algorithm='HS256') | |
st.session_state['auth_token'] = token | |
st.success("Logged in successfully!") | |
else: | |
st.error("Invalid username or password") | |
elif auth_option == "Create Account": | |
username = st.sidebar.text_input("Choose a Username") | |
password = st.sidebar.text_input("Choose a Password", type="password") | |
confirm_password = st.sidebar.text_input("Confirm Password", type="password") | |
if st.sidebar.button("Create Account"): | |
if password != confirm_password: | |
st.error("Passwords do not match") | |
elif username in st.session_state['user_database']: | |
st.error("Username already exists") | |
else: | |
# Hash the password | |
hashed_password = hashlib.sha256(password.encode()).hexdigest() | |
# Store user in the database | |
st.session_state['user_database'][username] = {'password': hashed_password} | |
st.success("Account created successfully! Please login.") | |
if 'auth_token' not in st.session_state: | |
authenticate_user() | |
else: | |
# Proceed with the application | |
# Decode JWT to get username | |
token = st.session_state['auth_token'] | |
decoded_token = jwt.decode(token, 'secret', algorithms=['HS256']) | |
username = decoded_token['user'] | |
# Title of the app | |
st.title(f"Quantum Bank: Welcome, {username}") | |
st.sidebar.header("Navigation") | |
# Logout button | |
if st.sidebar.button("Logout"): | |
del st.session_state['auth_token'] | |
st.success("Logged out successfully") | |
st.experimental_rerun() | |
# Main application navigation | |
service_choice = st.sidebar.selectbox("Choose a Service", [ | |
"Dashboard", | |
"In-Store Scanner & Payment", | |
"Citibank Services", | |
"Plaid Services", | |
"Stripe Services", | |
"Modern Treasury Services", | |
"GitHub Integration", | |
"Hugging Face Integration", | |
"Blockchain Integration", | |
"Visa Services", | |
"MasterCard Services", | |
"Pipedream Integration", | |
"Financial Analysis", | |
"Budgeting Tools", | |
"AI Chatbot", | |
"Quantum Portfolio Optimization", | |
]) | |
# In-Store Scanner & Payment | |
if service_choice == "In-Store Scanner & Payment": | |
st.header("In-Store Item Scanner and Payment") | |
if barcode_scanning_available: | |
# Mock database of items | |
ITEM_DATABASE = { | |
"123456789012": {"name": "Product A", "price": 9.99}, | |
"987654321098": {"name": "Product B", "price": 19.99}, | |
# Add more items as needed | |
} | |
# Function to get item details | |
def get_item_details(code): | |
return ITEM_DATABASE.get(code) | |
# Video transformer for barcode scanning | |
class BarcodeScanner(VideoTransformerBase): | |
def __init__(self): | |
self.result = None | |
def transform(self, frame): | |
image = frame.to_ndarray(format="bgr24") | |
barcodes = pyzbar.decode(image) | |
for barcode in barcodes: | |
(x, y, w, h) = barcode.rect | |
cv2.rectangle(image, (x, y), (x + w, y + h), (0, 255, 0), 2) | |
barcode_data = barcode.data.decode("utf-8") | |
barcode_type = barcode.type | |
text = f"{barcode_data} ({barcode_type})" | |
cv2.putText(image, text, (x, y - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 255, 0), 2) | |
self.result = barcode_data | |
return image | |
st.write("Scan an item to view details and purchase.") | |
# Start barcode scanner | |
barcode_scanner = BarcodeScanner() | |
webrtc_ctx = webrtc_streamer( | |
key="barcode-scanner", | |
video_transformer_factory=lambda: barcode_scanner, | |
async_transform=True, | |
) | |
if webrtc_ctx.video_transformer: | |
barcode_data = barcode_scanner.result | |
if barcode_data: | |
item_details = get_item_details(barcode_data) | |
if item_details: | |
st.success(f"Item Scanned: {item_details['name']}") | |
st.write(f"Price: ${item_details['price']:.2f}") | |
if st.button("Purchase"): | |
# Process payment | |
try: | |
# In a real application, collect payment details securely | |
# For demonstration, we'll simulate a successful payment | |
st.write("Payment processed successfully.") | |
# Update Modern Treasury ledger | |
mt_response = create_mt_ledger_transaction( | |
amount=item_details['price'], | |
description=f"Purchase of {item_details['name']}" | |
) | |
if mt_response: | |
st.write("Ledger updated successfully.") | |
else: | |
st.write("Failed to update ledger.") | |
except Exception as e: | |
st.error(f"Error processing payment: {e}") | |
else: | |
st.error("Item not found in database.") | |
def create_mt_ledger_transaction(amount, description): | |
url = "https://app.moderntreasury.com/api/ledger_transactions" | |
credentials = f"{mt_organization_id}:{mt_api_key}" | |
base64_credentials = base64.b64encode(credentials.encode()).decode('utf-8') | |
headers = { | |
"Authorization": f"Basic {base64_credentials}", | |
"Content-Type": "application/json", | |
"Accept": "application/json" | |
} | |
data = { | |
"description": description, | |
"status": "posted", | |
"ledger_entries": [ | |
{ | |
"amount": int(amount * 100), # Convert to cents | |
"direction": "debit", | |
"ledger_account_id": "ledger_account_id_debit", # Replace with your ledger account ID | |
}, | |
{ | |
"amount": int(amount * 100), | |
"direction": "credit", | |
"ledger_account_id": "ledger_account_id_credit", # Replace with your ledger account ID | |
}, | |
] | |
} | |
response = requests.post(url, headers=headers, json=data) | |
if response.status_code in [200, 201]: | |
return response.json() | |
else: | |
st.error(f"Modern Treasury Error: {response.text}") | |
return None | |
else: | |
st.error("Barcode scanning features are not available in this environment.") | |
# Citibank Services | |
elif service_choice == "Citibank Services": | |
st.header("Citibank Services") | |
# Citibank API class | |
class CitibankAPI: | |
def __init__(self, client_id, client_secret, api_key): | |
self.client_id = client_id | |
self.client_secret = client_secret | |
self.api_key = api_key | |
self.access_token = self.get_access_token() | |
self.base_url = "https://sandbox.apihub.citi.com/gcb/api/v1" | |
def get_access_token(self): | |
url = "https://sandbox.apihub.citi.com/gcb/api/clientCredentials/oauth2/token/us/gcb" | |
headers = { | |
'Authorization': 'Basic ' + base64.b64encode((self.client_id + ':' + self.client_secret).encode()).decode(), | |
'Content-Type': 'application/x-www-form-urlencoded' | |
} | |
data = 'grant_type=client_credentials&scope=/api' | |
response = requests.post(url, headers=headers, data=data) | |
if response.status_code == 200: | |
return response.json().get('access_token') | |
else: | |
st.error("Failed to obtain Citibank access token.") | |
return None | |
def get_accounts(self): | |
url = f"{self.base_url}/accounts" | |
headers = { | |
'Authorization': f"Bearer {self.access_token}", | |
'Accept': 'application/json' | |
} | |
response = requests.get(url, headers=headers) | |
return response.json() | |
def get_account_details(self, account_id): | |
url = f"{self.base_url}/accounts/{account_id}" | |
headers = { | |
'Authorization': f"Bearer {self.access_token}", | |
'Accept': 'application/json' | |
} | |
response = requests.get(url, headers=headers) | |
return response.json() | |
def create_account(self, account_data): | |
url = f"{self.base_url}/accounts" | |
headers = { | |
'Authorization': f"Bearer {self.access_token}", | |
'Content-Type': 'application/json', | |
'Accept': 'application/json' | |
} | |
response = requests.post(url, headers=headers, json=account_data) | |
return response.json() | |
def initiate_payment(self, payment_data): | |
url = f"{self.base_url}/payments" | |
headers = { | |
'Authorization': f"Bearer {self.access_token}", | |
'Content-Type': 'application/json', | |
'Accept': 'application/json' | |
} | |
response = requests.post(url, headers=headers, json=payment_data) | |
return response.json() | |
citibank_api_instance = CitibankAPI(citibank_client_id, citibank_client_secret, citibank_api_key) | |
citibank_action = st.selectbox("Choose an Action", [ | |
"View Accounts", | |
"View Account Details", | |
"Create Account", | |
"Initiate Payment", | |
]) | |
if citibank_action == "View Accounts": | |
accounts = citibank_api_instance.get_accounts() | |
st.json(accounts) | |
elif citibank_action == "View Account Details": | |
account_id = st.text_input("Enter Account ID") | |
if st.button("Get Account Details"): | |
details = citibank_api_instance.get_account_details(account_id) | |
st.json(details) | |
elif citibank_action == "Create Account": | |
st.subheader("Create a New Account") | |
account_name = st.text_input("Account Name") | |
account_type = st.selectbox("Account Type", ["Savings", "Checking"]) | |
currency = st.text_input("Currency", value="USD") | |
if st.button("Create Account"): | |
account_data = { | |
"accountName": account_name, | |
"accountType": account_type, | |
"currency": currency | |
} | |
result = citibank_api_instance.create_account(account_data) | |
st.json(result) | |
elif citibank_action == "Initiate Payment": | |
st.subheader("Initiate a Payment") | |
from_account = st.text_input("From Account ID") | |
to_account = st.text_input("To Account Number") | |
amount = st.number_input("Amount", min_value=0.0) | |
currency = st.text_input("Currency", value="USD") | |
description = st.text_input("Description") | |
if st.button("Initiate Payment"): | |
payment_data = { | |
"sourceAccountId": from_account, | |
"transactionAmount": { | |
"amount": amount, | |
"currency": currency | |
}, | |
"destinationAccountNumber": to_account, | |
"paymentMethod": "InternalTransfer", | |
"paymentCurrency": currency, | |
"remarks": description | |
} | |
result = citibank_api_instance.initiate_payment(payment_data) | |
st.json(result) | |
# Plaid Services | |
elif service_choice == "Plaid Services": | |
st.header("Plaid Services") | |
if not plaid_client_id or not plaid_secret: | |
st.error("Please provide Plaid API credentials.") | |
else: | |
plaid_action = st.selectbox("Choose an Action", [ | |
"Link Account", | |
"Get Accounts", | |
"Get Transactions", | |
]) | |
if 'plaid_access_token' not in st.session_state: | |
st.session_state['plaid_access_token'] = None | |
if plaid_action == "Link Account": | |
st.subheader("Link a Bank Account") | |
# Create link token | |
user_id = str(uuid.uuid4()) | |
request = LinkTokenCreateRequest( | |
products=[Products('transactions')], | |
client_name='TransactPro', | |
country_codes=[CountryCode('US')], | |
language='en', | |
user=LinkTokenCreateRequestUser(client_user_id=user_id) | |
) | |
response = plaid_client.link_token_create(request) | |
link_token = response['link_token'] | |
st.write("Use this link token in your frontend to initialize Plaid Link.") | |
st.code(link_token) | |
st.session_state['plaid_link_token'] = link_token | |
elif plaid_action == "Get Accounts": | |
if st.session_state['plaid_access_token']: | |
accounts_request = AccountsGetRequest(access_token=st.session_state['plaid_access_token']) | |
accounts_response = plaid_client.accounts_get(accounts_request) | |
st.json(accounts_response.to_dict()) | |
else: | |
st.error("No access token available. Please link an account first.") | |
elif plaid_action == "Get Transactions": | |
if st.session_state['plaid_access_token']: | |
start_date = (date.today() - timedelta(days=30)).strftime('%Y-%m-%d') | |
end_date = date.today().strftime('%Y-%m-%d') | |
transactions_request = TransactionsGetRequest( | |
access_token=st.session_state['plaid_access_token'], | |
start_date=start_date, | |
end_date=end_date | |
) | |
transactions_response = plaid_client.transactions_get(transactions_request) | |
st.json(transactions_response.to_dict()) | |
else: | |
st.error("No access token available. Please link an account first.") | |
# Stripe Services | |
elif service_choice == "Stripe Services": | |
st.header("Stripe Services") | |
stripe_action = st.selectbox("Choose an Action", [ | |
"Create Customer", | |
"Charge Customer", | |
"View Charges", | |
]) | |
if stripe_action == "Create Customer": | |
st.subheader("Create a New Customer") | |
email = st.text_input("Customer Email") | |
description = st.text_input("Description") | |
if st.button("Create Customer"): | |
customer = stripe.Customer.create( | |
email=email, | |
description=description | |
) | |
st.success("Customer created successfully.") | |
st.json(customer) | |
elif stripe_action == "Charge Customer": | |
st.subheader("Charge a Customer") | |
customer_id = st.text_input("Customer ID") | |
amount = st.number_input("Amount (in cents)", min_value=50) | |
currency = st.text_input("Currency", value="usd") | |
description = st.text_input("Description") | |
if st.button("Charge Customer"): | |
charge = stripe.Charge.create( | |
customer=customer_id, | |
amount=int(amount), | |
currency=currency, | |
description=description | |
) | |
st.success("Charge created successfully.") | |
st.json(charge) | |
elif stripe_action == "View Charges": | |
st.subheader("List Charges") | |
charges = stripe.Charge.list() | |
st.json(charges) | |
# Modern Treasury Services | |
elif service_choice == "Modern Treasury Services": | |
st.header("Modern Treasury Services") | |
mt_action = st.selectbox("Choose an Action", [ | |
"View Payment Orders", | |
"Create Payment Order", | |
]) | |
def get_mt_headers(): | |
credentials = f"{mt_organization_id}:{mt_api_key}" | |
base64_credentials = base64.b64encode(credentials.encode()).decode('utf-8') | |
return { | |
"Authorization": f"Basic {base64_credentials}", | |
"Content-Type": "application/json", | |
"Accept": "application/json" | |
} | |
if mt_action == "View Payment Orders": | |
url = "https://app.moderntreasury.com/api/payment_orders" | |
headers = get_mt_headers() | |
response = requests.get(url, headers=headers) | |
if response.status_code == 200: | |
st.json(response.json()) | |
else: | |
st.error(f"Error: {response.text}") | |
elif mt_action == "Create Payment Order": | |
st.subheader("Create a New Payment Order") | |
amount = st.number_input("Amount", min_value=0.0) | |
direction = st.selectbox("Direction", ["credit", "debit"]) | |
currency = st.text_input("Currency", value="USD") | |
description = st.text_input("Description") | |
if st.button("Create Payment Order"): | |
url = "https://app.moderntreasury.com/api/payment_orders" | |
headers = get_mt_headers() | |
data = { | |
"amount": int(amount * 100), # Convert to cents | |
"direction": direction, | |
"currency": currency, | |
"description": description | |
} | |
response = requests.post(url, headers=headers, json=data) | |
if response.status_code == 201: | |
st.success("Payment Order created successfully.") | |
st.json(response.json()) | |
else: | |
st.error(f"Error: {response.text}") | |
# GitHub Integration | |
elif service_choice == "GitHub Integration": | |
st.header("GitHub Repository Management") | |
if not github_client: | |
st.error("Please provide your GitHub Personal Access Token in the environment variables.") | |
else: | |
github_action = st.selectbox("Choose an Action", [ | |
"List Repositories", | |
"Create Repository", | |
"Add File to Repository", | |
"List Repository Contents", | |
]) | |
if github_action == "List Repositories": | |
st.subheader("Your GitHub Repositories") | |
try: | |
repos = github_client.get_user().get_repos() | |
repo_list = [repo.full_name for repo in repos] | |
st.write(repo_list) | |
except Exception as e: | |
st.error(f"Error: {e}") | |
elif github_action == "Create Repository": | |
st.subheader("Create a New Repository") | |
repo_name = st.text_input("Repository Name") | |
repo_description = st.text_area("Repository Description") | |
private = st.checkbox("Private Repository") | |
if st.button("Create Repository"): | |
try: | |
user = github_client.get_user() | |
repo = user.create_repo( | |
name=repo_name, | |
description=repo_description, | |
private=private | |
) | |
st.success(f"Repository '{repo.full_name}' created successfully!") | |
st.write(f"URL: {repo.html_url}") | |
except Exception as e: | |
st.error(f"Error: {e}") | |
elif github_action == "Add File to Repository": | |
st.subheader("Add a File to a Repository") | |
repo_name = st.text_input("Repository Name (e.g., username/repo)") | |
file_path = st.text_input("File Path in Repository (e.g., folder/filename.py)") | |
file_content = st.text_area("File Content") | |
commit_message = st.text_input("Commit Message", value="Add new file via TransactPro app") | |
if st.button("Add File"): | |
try: | |
repo = github_client.get_repo(repo_name) | |
# Check if the file already exists | |
try: | |
contents = repo.get_contents(file_path) | |
# If the file exists, update it | |
repo.update_file( | |
path=file_path, | |
message=commit_message, | |
content=file_content, | |
sha=contents.sha, | |
branch="main" | |
) | |
st.success(f"File '{file_path}' updated in repository '{repo_name}' successfully!") | |
except Exception: | |
# If the file does not exist, create it | |
repo.create_file( | |
path=file_path, | |
message=commit_message, | |
content=file_content, | |
branch="main" | |
) | |
st.success(f"File '{file_path}' added to repository '{repo_name}' successfully!") | |
except Exception as e: | |
st.error(f"Error: {e}") | |
elif github_action == "List Repository Contents": | |
st.subheader("List Contents of a Repository") | |
repo_name = st.text_input("Repository Name (e.g., username/repo)") | |
if st.button("List Contents"): | |
try: | |
repo = github_client.get_repo(repo_name) | |
contents = repo.get_contents("") | |
while contents: | |
file_content = contents.pop(0) | |
if file_content.type == "dir": | |
contents.extend(repo.get_contents(file_content.path)) | |
else: | |
st.write(file_content.path) | |
except Exception as e: | |
st.error(f"Error: {e}") | |
# Hugging Face Integration | |
elif service_choice == "Hugging Face Integration": | |
st.header("Hugging Face Model Management") | |
if not huggingface_token: | |
st.error("Please provide your Hugging Face Token in the environment variables.") | |
else: | |
huggingface_action = st.selectbox("Choose an Action", [ | |
"List Models", | |
"Upload Model", | |
"Create Space", | |
"Download Model", | |
]) | |
if huggingface_action == "List Models": | |
st.subheader("Your Hugging Face Models") | |
try: | |
user_info = hf_api.whoami(token=huggingface_token) | |
models = hf_api.list_models(author=user_info['name'], token=huggingface_token) | |
model_list = [model.modelId for model in models] | |
st.write(model_list) | |
except Exception as e: | |
st.error(f"Error: {e}") | |
elif huggingface_action == "Upload Model": | |
st.subheader("Upload a Model to Hugging Face") | |
model_name = st.text_input("Model Name (e.g., username/model_name)") | |
model_files = st.file_uploader("Select Model Files", accept_multiple_files=True) | |
if st.button("Upload Model"): | |
try: | |
repo_url = hf_api.create_repo(name=model_name, token=huggingface_token) | |
repo = Repository(local_dir=model_name, clone_from=repo_url, use_auth_token=huggingface_token) | |
repo.git_pull() | |
for model_file in model_files: | |
with open(os.path.join(model_name, model_file.name), "wb") as f: | |
f.write(model_file.getbuffer()) | |
repo.git_add() | |
repo.git_commit("Upload model files") | |
repo.git_push() | |
st.success(f"Model '{model_name}' uploaded successfully!") | |
st.write(f"Model URL: https://huggingface.co/{model_name}") | |
except Exception as e: | |
st.error(f"Error: {e}") | |
elif huggingface_action == "Create Space": | |
st.subheader("Create a New Hugging Face Space") | |
space_name = st.text_input("Space Name (e.g., username/space_name)") | |
sdk = st.selectbox("Select SDK", ["streamlit", "gradio", "dash", "static"]) | |
if st.button("Create Space"): | |
try: | |
repo_url = hf_api.create_repo(name=space_name, repo_type="space", space_sdk=sdk, token=huggingface_token) | |
st.success(f"Space '{space_name}' created successfully!") | |
st.write(f"Space URL: https://huggingface.co/spaces/{space_name}") | |
except Exception as e: | |
st.error(f"Error: {e}") | |
elif huggingface_action == "Download Model": | |
st.subheader("Download a Model from Hugging Face") | |
model_id = st.text_input("Model ID (e.g., username/model_name)") | |
if st.button("Download Model"): | |
try: | |
model_dir = hf_api.download_repo(repo_id=model_id, repo_type='model', local_dir=model_id, token=huggingface_token) | |
st.success(f"Model '{model_id}' downloaded successfully!") | |
st.write(f"Model Directory: {model_dir}") | |
except Exception as e: | |
st.error(f"Error: {e}") | |
# Blockchain Integration | |
elif service_choice == "Blockchain Integration": | |
st.header("Blockchain Integration with Metamask and Infura") | |
if not web3: | |
st.error("Please provide your Infura Project ID in the environment variables.") | |
else: | |
blockchain_action = st.selectbox("Choose an Action", [ | |
"View Account Balance", | |
"Send Transaction", | |
"View Transaction Details", | |
]) | |
if blockchain_action == "View Account Balance": | |
st.subheader("View Ethereum Account Balance") | |
account_address = st.text_input("Enter Ethereum Account Address", value=metamask_account_address) | |
if st.button("Get Balance"): | |
try: | |
balance_wei = web3.eth.get_balance(account_address) | |
balance_eth = web3.fromWei(balance_wei, 'ether') | |
st.write(f"Balance: {balance_eth} ETH") | |
except Exception as e: | |
st.error(f"Error: {e}") | |
elif blockchain_action == "Send Transaction": | |
st.subheader("Send Ethereum Transaction") | |
to_address = st.text_input("Recipient Ethereum Address") | |
amount = st.number_input("Amount in ETH", min_value=0.0) | |
gas_price = st.number_input("Gas Price (Gwei)", min_value=1.0, value=50.0) | |
if st.button("Send Transaction"): | |
if not metamask_private_key: | |
st.error("Please provide your Metamask Private Key in the environment variables.") | |
else: | |
try: | |
# Build transaction | |
nonce = web3.eth.getTransactionCount(metamask_account_address) | |
tx = { | |
'nonce': nonce, | |
'to': to_address, | |
'value': web3.toWei(amount, 'ether'), | |
'gas': 21000, | |
'gasPrice': web3.toWei(gas_price, 'gwei'), | |
} | |
# Sign transaction | |
signed_tx = web3.eth.account.sign_transaction(tx, metamask_private_key) | |
# Send transaction | |
tx_hash = web3.eth.sendRawTransaction(signed_tx.rawTransaction) | |
st.success(f"Transaction sent! TX Hash: {web3.toHex(tx_hash)}") | |
except Exception as e: | |
st.error(f"Error: {e}") | |
elif blockchain_action == "View Transaction Details": | |
st.subheader("View Ethereum Transaction Details") | |
tx_hash = st.text_input("Enter Transaction Hash") | |
if st.button("Get Transaction Details"): | |
try: | |
tx = web3.eth.getTransaction(tx_hash) | |
st.json(dict(tx)) | |
except Exception as e: | |
st.error(f"Error: {e}") | |
# Visa Services | |
elif service_choice == "Visa Services": | |
st.header("Visa API Integration") | |
visa_action = st.selectbox("Choose an Action", [ | |
"Get Transaction Details", | |
"Initiate Payment", | |
]) | |
if not visa_api_key or not visa_api_secret or not visa_cert_path: | |
st.error("Please provide Visa API credentials and certificate path.") | |
else: | |
if visa_action == "Get Transaction Details": | |
transaction_id = st.text_input("Enter Transaction ID") | |
if st.button("Get Details"): | |
url = f"https://sandbox.api.visa.com/visadirect/v1/transaction/{transaction_id}" | |
headers = { | |
"Accept": "application/json", | |
"Content-Type": "application/json" | |
} | |
response = requests.get( | |
url, | |
headers=headers, | |
auth=( | |
visa_api_key, | |
visa_api_secret | |
), | |
cert=(visa_cert_path, visa_cert_path) | |
) | |
if response.status_code == 200: | |
st.json(response.json()) | |
else: | |
st.error(f"Error: {response.text}") | |
elif visa_action == "Initiate Payment": | |
st.subheader("Initiate a Payment") | |
# Implement payment initiation with Visa API | |
st.info("Payment initiation with Visa API is not implemented in this example.") | |
# MasterCard Services | |
elif service_choice == "MasterCard Services": | |
st.header("MasterCard API Integration") | |
mastercard_action = st.selectbox("Choose an Action", [ | |
"Get Account Details", | |
"Process Payment", | |
]) | |
if not mastercard_consumer_key or not mastercard_private_key: | |
st.error("Please provide MasterCard API credentials.") | |
else: | |
if mastercard_action == "Get Account Details": | |
account_id = st.text_input("Enter Account ID") | |
if st.button("Get Account Details"): | |
url = f"https://sandbox.api.mastercard.com/account/{account_id}" | |
headers = { | |
"Accept": "application/json", | |
"Content-Type": "application/json" | |
} | |
# Implement OAuth1 authentication for MasterCard API | |
client = Client( | |
client_key=mastercard_consumer_key, | |
rsa_key=mastercard_private_key, | |
signature_method='RSA-SHA1' | |
) | |
uri, headers, body = client.sign( | |
url, | |
http_method='GET', | |
body='', | |
headers=headers | |
) | |
response = requests.get(uri, headers=headers) | |
if response.status_code == 200: | |
st.json(response.json()) | |
else: | |
st.error(f"Error: {response.text}") | |
elif mastercard_action == "Process Payment": | |
st.subheader("Process a Payment") | |
# Implement payment processing with MasterCard API | |
st.info("Payment processing with MasterCard API is not implemented in this example.") | |
# Pipedream Integration | |
elif service_choice == "Pipedream Integration": | |
st.header("Pipedream Workflow Trigger") | |
if not pipedream_url: | |
st.error("Please provide your Pipedream Workflow URL in the environment variables.") | |
else: | |
event_name = st.text_input("Event Name") | |
data = st.text_area("Event Data (JSON format)") | |
if st.button("Trigger Workflow"): | |
try: | |
event_data = json.loads(data) | |
event_data['event_name'] = event_name | |
response = requests.post(pipedream_url, json=event_data) | |
if response.status_code == 200: | |
st.success("Pipedream workflow triggered successfully.") | |
else: | |
st.error(f"Failed to trigger workflow: {response.text}") | |
except json.JSONDecodeError: | |
st.error("Invalid JSON data.") | |
# Financial Analysis | |
elif service_choice == "Financial Analysis": | |
st.header("Financial Analysis and Insights") | |
# Fetch transaction data from APIs or use sample data | |
transactions = [ | |
{'date': '2023-01-01', 'amount': -50, 'category': 'Groceries'}, | |
{'date': '2023-01-02', 'amount': -200, 'category': 'Rent'}, | |
{'date': '2023-01-03', 'amount': -15, 'category': 'Coffee'}, | |
# ... more transactions | |
] | |
df_transactions = pd.DataFrame(transactions) | |
# Visualization | |
fig = px.bar(df_transactions, x='date', y='amount', color='category', title="Spending Over Time") | |
st.plotly_chart(fig) | |
# Anomaly Detection | |
model = IsolationForest(contamination=0.1) | |
df_transactions['anomaly'] = model.fit_predict(df_transactions[['amount']]) | |
anomalies = df_transactions[df_transactions['anomaly'] == -1] | |
st.subheader("Detected Anomalies") | |
st.write(anomalies) | |
# Predictive Analytics | |
st.subheader("Predictive Analytics") | |
df_transactions['timestamp'] = pd.to_datetime(df_transactions['date']).astype(np.int64) // 10**9 | |
X = df_transactions[['timestamp']] | |
y = df_transactions['amount'] | |
reg_model = LinearRegression() | |
reg_model.fit(X, y) | |
future_dates = pd.date_range(start=date.today(), periods=30) | |
future_timestamps = future_dates.astype(np.int64) // 10**9 | |
X_future = pd.DataFrame(future_timestamps, columns=['timestamp']) | |
y_pred = reg_model.predict(X_future) | |
pred_df = pd.DataFrame({'date': future_dates, 'predicted_amount': y_pred}) | |
fig_pred = px.line(pred_df, x='date', y='predicted_amount', title="Predicted Future Spending") | |
st.plotly_chart(fig_pred) | |
# Budgeting Tools | |
elif service_choice == "Budgeting Tools": | |
st.header("Budgeting and Financial Planning") | |
budget_categories = ['Rent', 'Groceries', 'Entertainment', 'Utilities', 'Miscellaneous'] | |
budgets = {} | |
for category in budget_categories: | |
budgets[category] = st.number_input(f"{category} Budget", min_value=0.0, format="%0.2f") | |
st.subheader("Your Budget Overview") | |
budget_df = pd.DataFrame(list(budgets.items()), columns=['Category', 'Budget']) | |
st.table(budget_df) | |
# Compare with actual spending | |
actual_spending = {'Rent': 1800, 'Groceries': 450, 'Entertainment': 300, 'Utilities': 200, 'Miscellaneous': 150} | |
spending_df = pd.DataFrame(list(actual_spending.items()), columns=['Category', 'Spent']) | |
comparison_df = budget_df.merge(spending_df, on='Category') | |
comparison_df['Difference'] = comparison_df['Budget'] - comparison_df['Spent'] | |
st.subheader("Budget vs Actual Spending") | |
st.table(comparison_df) | |
# AI Chatbot with OpenAI's ChatGPT API | |
elif service_choice == "AI Chatbot": | |
st.header("AI-Powered Financial Assistant with ChatGPT") | |
if not openai_api_key: | |
st.error("Please provide your OpenAI API Key in the environment variables.") | |
else: | |
user_input = st.text_input("You: ") | |
if user_input: | |
try: | |
response = openai.ChatCompletion.create( | |
model="gpt-3.5-turbo", | |
messages=[ | |
{"role": "system", "content": "You are a helpful financial assistant."}, | |
{"role": "user", "content": user_input}, | |
], | |
) | |
assistant_response = response['choices'][0]['message']['content'] | |
st.text_area("Assistant:", value=assistant_response, height=200) | |
except Exception as e: | |
st.error(f"Error: {e}") | |
# Quantum Portfolio Optimization | |
elif service_choice == "Quantum Portfolio Optimization": | |
st.header("Quantum Portfolio Optimization Simulation") | |
num_qubits = 3 | |
qc = QuantumCircuit(num_qubits) | |
qc.h(range(num_qubits)) | |
qc.measure_all() | |
simulator = Aer.get_backend('qasm_simulator') | |
result = execute(qc, simulator, shots=1024).result() | |
counts = result.get_counts() | |
st.write("Simulation Results:") | |
st.write(counts) | |
# Interpret the results for portfolio optimization | |
# Display footer | |
st.sidebar.markdown("Powered by Streamlit") | |
st.sidebar.markdown("Made by Citibank Demo Business Inc.") |