MIA / app.py
SharryOG's picture
Update app.py
dc28452 verified
raw
history blame
No virus
8.15 kB
import logging
from typing import Optional
from flask import Flask, jsonify, request, abort, render_template
import json
import random
import string
import datetime
from webscout import WEBS
from functools import wraps
import requests
app = Flask(__name__)
TIMEOUT = 10
PROXY = None
PRICING_PLANS = {
'free': {
'name': 'Free Plan',
'price': '$0/month',
'rate_limit': 1000
},
'pro': {
'name': 'Pro Plan',
'price': 'Coming Soon',
'rate_limit': None # Unlimited
}
}
# Function to generate a new API key based on the user's name and the current date
def generate_api_key(username):
"""Generate a new API key starting with 'HUAI' and including the user's name and the current date."""
current_date = datetime.datetime.now().strftime("%Y%m%d")
return 'HUAI' + username + current_date + ''.join(random.choices(string.ascii_uppercase + string.digits, k=5))
# Function to validate an API key against the stored keys
def validate_api_key(api_key):
"""Validate an API key against the stored keys."""
with open('api_keys.json', 'r') as file:
api_keys = json.load(file)
return api_key in api_keys.values()
# Middleware to require an API key for each request
def require_api_key(view_function):
@wraps(view_function)
def decorated_function(*args, **kwargs):
# Check if the API key is provided in the headers
api_key = request.headers.get('HUSI')
# If not provided in headers, check query parameters
if not api_key:
api_key = request.args.get('HUAI')
if not validate_api_key(api_key):
abort(401) # Unauthorized
return view_function(*args, **kwargs)
return decorated_function
# Routes with API key requirement
@app.route('/api/search', methods=['GET'])
@require_api_key
def search_text():
query = request.args.get('q', '')
max_results = request.args.get('max_results', 10, type=int)
timelimit = request.args.get('timelimit', None)
safesearch = request.args.get('safesearch', 'moderate')
region = request.args.get('region', 'wt-wt')
WEBS_instance = WEBS()
results = []
with WEBS() as webs:
for result in enumerate(WEBS_instance.text(query, max_results=max_results, timelimit=timelimit, safesearch=safesearch, region=region)):
results.append(result)
return jsonify({'results': results})
@app.route('/api/images', methods=['GET'])
@require_api_key
def search_images():
query = request.args.get('q', '')
max_results = request.args.get('max_results', 10, type=int)
safesearch = request.args.get('safesearch', 'moderate')
region = request.args.get('region', 'wt-wt')
WEBS_instance = WEBS()
results = []
with WEBS() as webs:
for result in enumerate(WEBS_instance.images(query, max_results=max_results, safesearch=safesearch, region=region)):
results.append(result)
return jsonify({'results': results})
@app.route('/api/videos', methods=['GET'])
@require_api_key
def search_videos():
query = request.args.get('q', '')
max_results = request.args.get('max_results', 10, type=int)
safesearch = request.args.get('safesearch', 'moderate')
region = request.args.get('region', 'wt-wt')
timelimit = request.args.get('timelimit', None)
resolution = request.args.get('resolution', None)
duration = request.args.get('duration', None)
WEBS_instance = WEBS()
results = []
with WEBS() as webs:
for result in enumerate(WEBS_instance.videos(query, max_results=max_results, safesearch=safesearch, region=region, timelimit=timelimit, resolution=resolution, duration=duration)):
results.append(result)
return jsonify({'results': results})
@app.route('/api/news', methods=['GET'])
@require_api_key
def search_news():
query = request.args.get('q', '')
max_results = request.args.get('max_results', 10, type=int)
safesearch = request.args.get('safesearch', 'moderate')
region = request.args.get('region', 'wt-wt')
timelimit = request.args.get('timelimit', None)
WEBS_instance = WEBS()
results = []
with WEBS() as webs:
for result in enumerate(WEBS_instance.news(query, max_results=max_results, safesearch=safesearch, region=region, timelimit=timelimit)):
results.append(result)
return jsonify({'results': results})
@app.route('/api/maps', methods=['GET'])
@require_api_key
def search_maps():
query = request.args.get('q', '')
place = request.args.get('place', None)
max_results = request.args.get('max_results', 10, type=int)
WEBS_instance = WEBS()
results = []
with WEBS() as webs:
for result in enumerate(WEBS_instance.maps(query, place=place, max_results=max_results)):
results.append(result)
return jsonify({'results': results})
@app.route('/api/translate', methods=['GET'])
@require_api_key
def translate_text():
query = request.args.get('q', '')
to_lang = request.args.get('to', 'en')
WEBS_instance = WEBS()
with WEBS() as webs:
translation = enumerate(WEBS_instance.translate(query, to=to_lang))
return jsonify({'translation': translation})
@app.route('/api/suggestions', methods=['GET'])
@require_api_key
def search_suggestions():
query = request.args.get('q', '')
if not query:
return jsonify({'error': 'Query parameter missing'})
results = []
try:
with WEBS() as webs:
for result in webs.suggestions(query):
results.append(result)
except Exception as e:
return jsonify({'error': str(e)}), 500
return jsonify({'results': results})
@app.route('/api/health', methods=['GET'])
@require_api_key
def health_check():
return jsonify({'status': 'working'})
import json
from flask import jsonify, request, abort
@app.route('/pricing', methods=['GET'])
def pricing():
return render_template('pricing.html', plans=PRICING_PLANS)
@app.route('/generate_key', methods=['GET', 'POST'])
def generate_key():
if request.method == 'POST':
username = request.form['username']
plan = request.form['plan']
if plan not in PRICING_PLANS:
return jsonify({'error': 'Invalid plan'}), 400
if plan == 'free' and PRICING_PLANS['free']['rate_limit'] == 0:
return jsonify({'error': 'Free plan is out of limits'}), 400
# Check if the user already has an API key
with open('api_keys.json', 'r') as file:
try:
api_keys = json.load(file)
except json.JSONDecodeError:
# If the file is empty or contains invalid JSON, initialize api_keys as an empty dict
api_keys = {}
if username in api_keys:
return jsonify({'api_key': api_keys[username]})
# Generate a new API key
new_key = generate_api_key(username)
try:
with open('api_keys.json', 'w') as file:
api_keys[username] = new_key
json.dump(api_keys, file, indent=4)
return jsonify({'api_key': new_key})
except Exception as e:
# Handle any other exceptions that might occur
return jsonify({'error': str(e)}), 500
else:
return render_template('index.html', plans=PRICING_PLANS)
if __name__ == '__main__':
def get_public_ip():
try:
response = requests.get('https://api.ipify.org/?format=json')
response.raise_for_status() # Raises a HTTPError if the response status is 4xx, 5xx
data = response.json()
return data['ip']
except requests.exceptions.RequestException as e:
print(f"An error occurred: {e}")
return None
except Exception as e:
print(f"An unexpected error occurred: {e}")
return None
# Get public IP or use a fallback
public_ip = get_public_ip() or 'Fallback_IP'
if public_ip:
print(f"Public IP: {public_ip}")
else:
print("Failed to retrieve public IP. Using fallback IP.")
app.run(debug=True)