duchaba's picture
return json too
c657c59 verified
raw
history blame
34.7 kB
# [BEGIN OF pluto_happy]
## required lib, required "pip install"
import torch
import cryptography
import cryptography.fernet
from flopth import flopth
import huggingface_hub
import huggingface_hub.hf_api
import gradio
import openai
## standard libs, no need to install
import json
import requests
import time
import os
import random
import re
import sys
import psutil
import threading
import socket
import PIL
import pandas
import matplotlib
import numpy
import importlib.metadata
import types
import cpuinfo
import pynvml
import pathlib
import re
import subprocess
# define class Pluto_Happy
class Pluto_Happy(object):
"""
The Pluto projects starts with fun AI hackings and become a part of my
first book "Data Augmentation with Python" with Packt Publishing.
In particular, Pluto_Happy is a clean and lite kernel of a simple class,
and using @add_module decoractor to add in specific methods to be a new class,
such as Pluto_HFace with a lot more function on HuggingFace, LLM and Transformers.
Args:
name (str): the display name, e.g. "Hanna the seeker"
Returns:
(object): the class instance.
"""
# initialize the object
def __init__(self, name="Pluto",*args, **kwargs):
super(Pluto_Happy, self).__init__(*args, **kwargs)
self.author = "Duc Haba"
self.name = name
self._ph()
self._pp("Hello from class", str(self.__class__) + " Class: " + str(self.__class__.__name__))
self._pp("Code name", self.name)
self._pp("Author is", self.author)
self._ph()
#
# define class var for stable division
self._huggingface_crkey="gAAAAABkduT-XeiYtD41bzjLtwsLCe9y1FbHH6wZkOZwvLwCrgmOtNsFUPWVqMVG8MumazFhiUZy91mWEnLDLCFw3eKNWtOboIyON6yu4lctn6RCQ4Y9nJvx8wPyOnkzt7dm5OISgFcm"
self._gpt_crkey="'gAAAAABkgiYGQY8ef5y192LpNgrAAZVCP3bo2za9iWSZzkyOJtc6wykLwGjFjxKFpsNryMgEhCATJSonslooNSBJFM3OcnVBz4jj_lyXPQABOCsOWqZm6W9nrZYTZkJ0uWAAGJV2B8uzQ13QZgI7VCZ12j8Q7WfrIg=='"
self._fkey="your_key_goes_here"
self._github_crkey="gAAAAABksjLYjRoFxZDDW5RgBN_uvm6pqDP128S2qOEfv9PgVL8fwdtXzWvCeMHwnGcibAky5cGs3XNxMH4VgbaPBA3I_CPRp3bRK3TMNU4HGRKxbdMnJ7U04IkVSdcMn8o86z3yhcSn"
self._kaggle_crkey="gAAAAABksjOOU2a-BtZ4NV8BkmFhBzqjix7XL9DsKPrua7OaMc7t8QKGw_3Ut5wyv4NL4FHX74JFEEbmpVbsPINN7LcqLtewuyF0o0P9461PY9qLBAGy6Wr7PyE0qwDogQoDGJ1UJgPn"
#
self.fname_id = 0
self.dname_img = "img_colab/"
self.flops_per_sec_gcolab_cpu = 4887694725 # 925,554,209 | 9,276,182,810 | 1,722,089,747 | 5,287,694,725
self.flops_per_sec_gcolab_gpu = 6365360673 # 1,021,721,764 | 9,748,048,188 | 2,245,406,502 | 6,965,360,673
self.fname_requirements = './pluto_happy/requirements.txt'
#
self.color_primary = '#2780e3' #blue
self.color_secondary = '#373a3c' #dark gray
self.color_success = '#3fb618' #green
self.color_info = '#9954bb' #purple
self.color_warning = '#ff7518' #orange
self.color_danger = '#ff0039' #red
self.color_mid_gray = '#495057'
self._xkeyfile = '.xoxo'
return
#
# pretty print output name-value line
def _pp(self, a, b,is_print=True):
"""
Pretty print output name-value line
Args:
a (str) :
b (str) :
is_print (bool): whether to print the header or footer lines to console or return a str.
Returns:
y : None or output as (str)
"""
# print("%34s : %s" % (str(a), str(b)))
x = f'{"%34s" % str(a)} : {str(b)}'
y = None
if (is_print):
print(x)
else:
y = x
return y
#
# pretty print the header or footer lines
def _ph(self,is_print=True):
"""
Pretty prints the header or footer lines.
Args:
is_print (bool): whether to print the header or footer lines to console or return a str.
Return:
y : None or output as (str)
"""
x = f'{"-"*34} : {"-"*34}'
y = None
if (is_print):
print(x)
else:
y = x
return y
#
# fetch huggingface file
def fetch_hface_files(self,
hf_names,
hf_space="duchaba/monty",
local_dir="/content/"):
"""
Given a list of huggingface file names, download them from the provided huggingface space.
Args:
hf_names: (list) list of huggingface file names to download
hf_space: (str) huggingface space to download from.
local_dir: (str) local directory to store the files.
Returns:
status: (bool) True if download was successful, False otherwise.
"""
status = True
# f = str(hf_names) + " is not iteratable, type: " + str(type(hf_names))
try:
for f in hf_names:
lo = local_dir + f
huggingface_hub.hf_hub_download(repo_id=hf_space,
filename=f,
use_auth_token=True,
repo_type=huggingface_hub.REPO_TYPE_SPACE,
force_filename=lo)
except:
self._pp("*Error", f)
status = False
return status
#
# push files to huggingface
def push_hface_files(self,
hf_names,
hf_space="duchaba/skin_cancer_diagnose",
local_dir="/content/"):
# push files to huggingface space
"""
Pushes files to huggingface space.
The function takes a list of file names as a
paramater and pushes to the provided huggingface space.
Args:
hf_names: list(of strings), list of file names to be pushed.
hf_space: (str), the huggingface space to push to.
local_dir: (str), the local directory where the files
are stored.
Returns:
status: (bool) True if successfully pushed else False.
"""
status = True
try:
for f in hf_names:
lo = local_dir + f
huggingface_hub.upload_file(
path_or_fileobj=lo,
path_in_repo=f,
repo_id=hf_space,
repo_type=huggingface_hub.REPO_TYPE_SPACE)
except Exception as e:
self._pp("*Error", e)
status = False
return status
#
# push the folder to huggingface space
def push_hface_folder(self,
hf_folder,
hf_space_id,
hf_dest_folder=None):
"""
This function pushes the folder to huggingface space.
Args:
hf_folder: (str). The path to the folder to push.
hf_space_id: (str). The space id to push the folder to.
hf_dest_folder: (str). The destination folder in the space. If not specified,
the folder name will be used as the destination folder.
Returns:
status: (bool) True if the folder is pushed successfully, otherwise False.
"""
status = True
try:
api = huggingface_hub.HfApi()
api.upload_folder(folder_path=hf_folder,
repo_id=hf_space_id,
path_in_repo=hf_dest_folder,
repo_type="space")
except Exception as e:
self._pp("*Error: ",e)
status = False
return status
#
# automatically restart huggingface space
def restart_hface_periodically(self):
"""
This function restarts the huggingface space automatically in random
periodically.
Args:
None
Returns:
None
"""
while True:
random_time = random.randint(15800, 21600)
time.sleep(random_time)
os.execl(sys.executable, sys.executable, *sys.argv)
return
#
# log into huggingface
def login_hface(self, key=None):
"""
Log into HuggingFace.
Args:
key: (str, optional) If key is set, this key will be used to log in,
otherwise the key will be decrypted from the key file.
Returns:
None
"""
if (key is None):
x = self._decrypt_it(self._huggingface_crkey)
else:
x = key
huggingface_hub.login(x, add_to_git_credential=True) # non-blocking login
self._ph()
return
#
# Define a function to display available CPU and RAM
def fetch_info_system(self):
"""
Fetches system information, such as CPU usage and memory usage.
Args:
None.
Returns:
s: (str) A string containing the system information.
"""
s=''
# Get CPU usage as a percentage
cpu_usage = psutil.cpu_percent()
# Get available memory in bytes
mem = psutil.virtual_memory()
# Convert bytes to gigabytes
mem_total_gb = mem.total / (1024 ** 3)
mem_available_gb = mem.available / (1024 ** 3)
mem_used_gb = mem.used / (1024 ** 3)
# save the results
s += f"Total memory: {mem_total_gb:.2f} GB\n"
s += f"Available memory: {mem_available_gb:.2f} GB\n"
# print(f"Used memory: {mem_used_gb:.2f} GB")
s += f"Memory usage: {mem_used_gb/mem_total_gb:.2f}%\n"
try:
cpu_info = cpuinfo.get_cpu_info()
s += f'CPU type: {cpu_info["brand_raw"]}, arch: {cpu_info["arch"]}\n'
s += f'Number of CPU cores: {cpu_info["count"]}\n'
s += f"CPU usage: {cpu_usage}%\n"
s += f'Python version: {cpu_info["python_version"]}'
except Exception as e:
s += f'CPU type: Not accessible, Error: {e}'
return s
#
# fetch GPU RAM info
def fetch_info_gpu(self):
"""
Function to fetch GPU RAM info
Args:
None.
Returns:
s: (str) GPU RAM info in human readable format.
"""
s=''
mtotal = 0
mfree = 0
try:
nvml_handle = pynvml.nvmlInit()
devices = pynvml.nvmlDeviceGetCount()
for i in range(devices):
device = pynvml.nvmlDeviceGetHandleByIndex(i)
memory_info = pynvml.nvmlDeviceGetMemoryInfo(device)
mtotal += memory_info.total
mfree += memory_info.free
mtotal = mtotal / 1024**3
mfree = mfree / 1024**3
# print(f"GPU {i}: Total Memory: {memory_info.total/1024**3} GB, Free Memory: {memory_info.free/1024**3} GB")
s += f'GPU type: {torch.cuda.get_device_name(0)}\n'
s += f'GPU ready staus: {torch.cuda.is_available()}\n'
s += f'Number of GPUs: {devices}\n'
s += f'Total Memory: {mtotal:.2f} GB\n'
s += f'Free Memory: {mfree:.2f} GB\n'
s += f'GPU allocated RAM: {round(torch.cuda.memory_allocated(0)/1024**3,2)} GB\n'
s += f'GPU reserved RAM {round(torch.cuda.memory_reserved(0)/1024**3,2)} GB\n'
except Exception as e:
s += f'**Warning, No GPU: {e}'
return s
#
# fetch info about host ip
def fetch_info_host_ip(self):
"""
Function to fetch current host name and ip address
Args:
None.
Returns:
s: (str) host name and ip info in human readable format.
"""
s=''
try:
hostname = socket.gethostname()
ip_address = socket.gethostbyname(hostname)
s += f"Hostname: {hostname}\n"
s += f"IP Address: {ip_address}\n"
except Exception as e:
s += f"**Warning, No hostname: {e}"
return s
#
# fetch files name
def fetch_file_names(self,directory, file_extension=None):
"""
This function gets all the filenames with a given extension.
Args:
directory (str):
directory path to scan for files in.
file_extension (list):
file extension to look for or "None" (default) to get all files.
Returns:
filenames (list):
list of strings containing the filenames with the given extension.
"""
filenames = []
for (root, subFolders, files) in os.walk(directory):
for fname in files:
if (file_extension is None):
filenames.append(os.path.join(root, fname))
else:
for ext in file_extension:
if fname.endswith(ext):
filenames.append(os.path.join(root, fname))
return filenames
#
# fetch the crypto key
def _fetch_crypt(self,has_new_key=False):
"""
This function fetches the crypto key from the file or from the
variable created previously in the class.
Args:
has_new_key (bool):
is_generate flag to indicate whether the key should be
use as-is or fetch from the file.
Returns:
s (str):
string value containing the crypto key.
"""
if self._fkey == 'your_key_goes_here':
raise Exception('Cryto Key is not correct!')
#
s=self._fkey[::-1]
if (has_new_key):
s=open(self._xkeyfile, "rb").read()
self._fkey = s[::-1]
return s
#
# generate new cryto key
def gen_key(self):
"""
This function generates a new cryto key and saves it to a file
Args:
None
Returns:
(str) crypto key
"""
key = cryptography.fernet.Fernet.generate_key()
with open(self._xkeyfile, "wb") as key_file:
key_file.write(key[::-1]) # write in reversed
return key
#
# decrypt message
def decrypt_it(self, x):
"""
Decrypts the encrypted string using the stored crypto key.
Args:
x: (str) to be decrypted.
Returns:
x: (str) decrypted version of x.
"""
y = self._fetch_crypt()
f = cryptography.fernet.Fernet(y)
m = f.decrypt(x)
return m.decode()
#
# encrypt message
def encrypt_it(self, x):
"""
encrypt message
Args:
x (str): message to encrypt
Returns:
str: encrypted message
"""
key = self._fetch_crypt()
p = x.encode()
f = cryptography.fernet.Fernet(key)
y = f.encrypt(p)
return y
#
# fetch import libraries
def _fetch_lib_import(self):
"""
This function fetches all the imported libraries that are installed.
Args:
None
Returns:
x (list):
list of strings containing the name of the imported libraries.
"""
x = []
for name, val in globals().items():
if isinstance(val, types.ModuleType):
x.append(val.__name__)
x.sort()
return x
#
# fetch lib version
def _fetch_lib_version(self,lib_name):
"""
This function fetches the version of the imported libraries.
Args:
lib_name (list):
list of strings containing the name of the imported libraries.
Returns:
val (list):
list of strings containing the version of the imported libraries.
"""
val = []
for x in lib_name:
try:
y = importlib.metadata.version(x)
val.append(f'{x}=={y}')
except Exception as e:
val.append(f'|{x}==unknown_*or_system')
val.sort()
return val
#
# fetch the lib name and version
def fetch_info_lib_import(self):
"""
This function fetches all the imported libraries name and version that are installed.
Args:
None
Returns:
x (list):
list of strings containing the name and version of the imported libraries.
"""
x = self._fetch_lib_version(self._fetch_lib_import())
return x
#
# write a file to local or cloud diskspace
def write_file(self,fname, in_data):
"""
Write a file to local or cloud diskspace or append to it if it already exists.
Args:
fname (str): The name of the file to write.
in_data (list): The
This is a utility function that writes a file to disk.
The file name and text to write are passed in as arguments.
The file is created, the text is written to it, and then the file is closed.
Args:
fname (str): The name of the file to write.
in_data (list): The text to write to the file.
Returns:
None
"""
if os.path.isfile(fname):
f = open(fname, "a")
else:
f = open(fname, "w")
f.writelines("\n".join(in_data))
f.close()
return
#
# fetch flops info
def fetch_info_flops(self,model, input_shape=(1, 3, 224, 224), device="cpu", max_epoch=1):
"""
Calculates the number of floating point operations (FLOPs).
Args:
model (torch.nn.Module): neural network model.
input_shape (tuple): input tensor size.
device (str): device to perform computation on.
max_epoch (int): number of times
Returns:
(float): number of FLOPs, average from epoch, default is 1 epoch.
(float): elapsed seconds
(list): of string for a friendly human readable output
"""
ttm_input = torch.rand(input_shape, dtype=torch.float32, device=device)
# ttm_input = torch.rand((1, 3, 224, 224), dtype=torch.float32, device=device)
tstart = time.time()
for i in range(max_epoch):
flops, params = flopth(model, inputs=(ttm_input,), bare_number=True)
tend = time.time()
etime = (tend - tstart)/max_epoch
# kilo = 10^3, maga = 10^6, giga = 10^9, tera=10^12, peta=10^15, exa=10^18, zetta=10^21
valstr = []
valstr.append(f'Tensors device: {device}')
valstr.append(f'flops: {flops:,}')
valstr.append(f'params: {params:,}')
valstr.append(f'epoch: {max_epoch}')
valstr.append(f'sec: {etime}')
# valstr += f'Tensors device: {device}, flops: {flops}, params: {params}, epoch: {max_epoch}, sec: {etime}\n'
x = flops/etime
y = (x/10**15)*86400
valstr.append(f'Flops/s: {x:,}')
valstr.append(f'PetaFlops/s: {x/10**15}')
valstr.append(f'PetaFlops/day: {y}')
valstr.append(f'1 PetaFlopsDay (on this system will take): {round(1/y, 2):,.2f} days')
return flops, etime, valstr
#
def print_petaflops(self):
"""
Prints the flops and peta-flops-day calculation.
**WARING**: This method will break/interfer with Stable Diffusion use of LoRA.
I can't debug why yet.
Args:
None
Returns:
None
"""
self._pp('Model', 'TTM, Tiny Torch Model on: CPU')
mtoy = TTM()
# my_model = MyModel()
dev = torch.device("cuda:0")
a,b,c = self.fetch_info_flops(mtoy)
y = round((a/b)/self.flops_per_sec_gcolab_cpu * 100, 2)
self._pp('Flops', f'{a:,} flops')
self._pp('Total elapse time', f'{b:,} seconds')
self._pp('Flops compared', f'{y:,}% of Google Colab Pro')
for i, val in enumerate(c):
self._pp(f'Info {i}', val)
self._ph()
try:
self._pp('Model', 'TTM, Tiny Torch Model on: GPU')
dev = torch.device("cuda:0")
a2,b2,c2 = self.fetch_info_flops(mtoy, device=dev)
y2 = round((a2/b2)/self.flops_per_sec_gcolab_gpu * 100, 2)
self._pp('Flops', f'{a2:,} flops')
self._pp('Total elapse time', f'{b2:,} seconds')
self._pp('Flops compared', f'{y2:,}% of Google Colab Pro')
d2 = round(((a2/b2)/(a/b))*100, 2)
self._pp('Flops GPU compared', f'{d2:,}% of CPU (or {round(d2-100,2):,}% faster)')
for i, val in enumerate(c2):
self._pp(f'Info {i}', val)
except Exception as e:
self._pp('Error', e)
self._ph()
return
#
#
def fetch_installed_libraries(self):
"""
Retrieves and prints the names and versions of Python libraries installed by the user,
excluding the standard libraries.
Args:
-----
None
Returns:
--------
dictionary: (dict)
A dictionary where keys are the names of the libraries and values are their respective versions.
Examples:
---------
libraries = get_installed_libraries()
for name, version in libraries.items():
print(f"{name}: {version}")
"""
# List of standard libraries (this may not be exhaustive and might need updates based on the Python version)
# Run pip freeze command to get list of installed packages with their versions
result = subprocess.run(['pip', 'freeze'], stdout=subprocess.PIPE)
# Decode result and split by lines
packages = result.stdout.decode('utf-8').splitlines()
# Split each line by '==' to separate package names and versions
installed_libraries = {}
for package in packages:
try:
name, version = package.split('==')
installed_libraries[name] = version
except Exception as e:
#print(f'{package}: Error: {e}')
pass
return installed_libraries
#
#
def fetch_match_file_dict(self, file_path, reference_dict):
"""
Reads a file from the disk, creates an array with each line as an item,
and checks if each line exists as a key in the provided dictionary. If it exists,
the associated value from the dictionary is also returned.
Parameters:
-----------
file_path: str
Path to the file to be read.
reference_dict: dict
Dictionary against which the file content (each line) will be checked.
Returns:
--------
dict:
A dictionary where keys are the lines from the file and values are either
the associated values from the reference dictionary or None if the key
doesn't exist in the dictionary.
Raises:
-------
FileNotFoundError:
If the provided file path does not exist.
"""
if not os.path.exists(file_path):
raise FileNotFoundError(f"The file at {file_path} does not exist.")
with open(file_path, 'r') as file:
lines = file.readlines()
# Check if each line (stripped of whitespace and newline characters) exists in the reference dictionary.
# If it exists, fetch its value. Otherwise, set the value to None.
results = {line.strip(): reference_dict.get(line.strip().replace('_', '-'), None) for line in lines}
return results
# print fech_info about myself
def print_info_self(self):
"""
Prints information about the model/myself.
Args:
None
Returns:
None
"""
self._ph()
self._pp("Hello, I am", self.name)
self._pp("I will display", "Python, Jupyter, and system info.")
self._pp("For complete doc type", "help(pluto) ...or help(your_object_name)")
self._pp('.','.')
self._pp("...", "Β―\_(ツ)_/Β―")
self._ph()
# system
self._pp('System', 'Info')
x = self.fetch_info_system()
print(x)
self._ph()
# gpu
self._pp('GPU', 'Info')
x = self.fetch_info_gpu()
print(x)
self._ph()
# lib used
self._pp('Installed lib from', self.fname_requirements)
self._ph()
x = self.fetch_match_file_dict(self.fname_requirements, self.fetch_installed_libraries())
for item, value in x.items():
self._pp(f'{item} version', value)
self._ph()
self._pp('Standard lib from', 'System')
self._ph()
self._pp('matplotlib version', matplotlib.__version__)
self._pp('numpy version', numpy.__version__)
self._pp('pandas version',pandas.__version__)
self._pp('PIL version', PIL.__version__)
self._pp('torch version', torch.__version__)
self._ph()
# host ip
self._pp('Host', 'Info')
x = self.fetch_info_host_ip()
print(x)
self._ph()
#
return
#
#
# define TTM for use in calculating flops
class TTM(torch.nn.Module):
"""
Tiny Torch Model (TTM)
This is a toy model consisting of four convolutional layers.
Args:
input_shape (tuple): input tensor size.
Returns:
(tensor): output of the model.
"""
def __init__(self, input_shape=(1, 3, 224, 224)):
super(TTM, self).__init__()
self.conv1 = torch.nn.Conv2d(3, 3, kernel_size=3, padding=1)
self.conv2 = torch.nn.Conv2d(3, 3, kernel_size=3, padding=1)
self.conv3 = torch.nn.Conv2d(3, 3, kernel_size=3, padding=1)
self.conv4 = torch.nn.Conv2d(3, 3, kernel_size=3, padding=1)
def forward(self, x1):
x1 = self.conv1(x1)
x1 = self.conv2(x1)
x1 = self.conv3(x1)
x1 = self.conv4(x1)
return x1
#
# (end of class TTM)
# add module/method
#
import functools
def add_method(cls):
def decorator(func):
@functools.wraps(func)
def wrapper(*args, **kwargs):
return func(*args, **kwargs)
setattr(cls, func.__name__, wrapper)
return func # returning func means func can still be used normally
return decorator
#
# [END OF pluto_happy]
## %%write -a app.py
# prompt: create a new class Pluto_Happy and name it monty
monty = Pluto_Happy('Monty, Monty Said!')
# %%writefile -a app.py
# prompt: (combine of many seperate prompts and copy code into one code cell)
# read back in toxic data
fname = 'toxic_data.csv'
monty.df_toxic_data = pandas.read_csv(fname)
# read in the keys
import os
monty._openai_key=os.getenv('openai_key')
monty._github_key=os.getenv('github_key')
monty._huggingface_key=os.getenv('huggingface_key')
monty._kaggle_key=os.getenv('kaggle_key')
# for openai version 1.3.8
@add_method(Pluto_Happy)
#
def _fetch_moderate_engine(self):
self.ai_client = openai.OpenAI(api_key=self._openai_key)
self.text_model = "text-moderation-latest"
return
#
@add_method(Pluto_Happy)
# f
def _censor_me(self, p, safer=0.0005):
self._fetch_moderate_engine()
resp_orig = self.ai_client.moderations.create(input=p, model=self.text_model)
resp_dict = resp_orig.model_dump()
#
v1 = resp_dict["results"][0]["category_scores"]
max_key = max(v1, key=v1.get)
max_value = v1[max_key]
sum_value = sum(v1.values())
#
v1["is_safer_flagged"] = False
if (max_value >= safer):
v1["is_safer_flagged"] = True
v1["is_flagged"] = resp_dict["results"][0]["flagged"]
v1['max_key'] = max_key
v1['max_value'] = max_value
v1['sum_value'] = sum_value
v1['safer_value'] = safer
v1['message'] = p
return v1
#
@add_method(Pluto_Happy)
def _draw_censor(self,data):
self._color_mid_gray = '#6c757d'
exp = (0.01, 0.01)
x = [data['max_value'], (1-data['max_value'])]
title=f"\nUnsafe: {data['max_key']}: {(data['max_value']*100):.2f}% Confidence\n"
lab = [data['max_key'], 'Other 13 categories']
if (data['is_flagged']):
col=[self.color_danger, self.color_mid_gray]
elif (data['is_safer_flagged']):
col=[self.color_warning, self.color_mid_gray]
lab = ['Relative Score:\n'+data['max_key'], 'Other 13 categories']
title=f"\nPersonal Unsafe: {data['max_key']}: {(data['max_value']*100):.2f}% Confidence\n"
else:
col=[self.color_mid_gray, self.color_success]
lab = ['False Negative:\n'+data['max_key'], 'Other 13 categories']
title='\nSafe Message\n'
canvas = self._draw_donut(x, lab, col, exp,title)
return canvas
#
@add_method(Pluto_Happy)
def _draw_donut(self,data,labels,col, exp,title):
# col = [self.color_danger, self._color_secondary]
# exp = (0.01, 0.01)
# Create a pie chart
canvas, pic = matplotlib.pyplot.subplots()
pic.pie(data, explode=exp,
labels=labels,
colors=col,
autopct='%1.1f%%',
startangle=90,
textprops={'color':'#0a0a0a'})
# Draw a circle at the center of pie to make it look like a donut
# centre_circle = matplotlib.pyplot.Circle((0,0),0.45,fc='white')
centre_circle = matplotlib.pyplot.Circle((0,0),0.45,fc=col[0],linewidth=2, ec='white')
canvas = matplotlib.pyplot.gcf()
canvas.gca().add_artist(centre_circle)
# Equal aspect ratio ensures that pie is drawn as a circle.
pic.axis('equal')
pic.set_title(title)
canvas.tight_layout()
# canvas.show()
return canvas
#
@add_method(Pluto_Happy)
# def censor_me(self, msg, safer=0.02, ibutton_1=0):
def fetch_toxicity_level(self, msg, safer):
# safer=0.2
yjson = self._censor_me(msg,safer)
_canvas = self._draw_censor(yjson)
_yjson = json.dumps(yjson, indent=4)
return (_canvas, _yjson)
#return(_canvas)
# %%write -a app.py
# prompt: result from a lot of prompt AI and old fashion try and error
import random
def say_hello(val):
return f"Hello: {val}"
def say_toxic():
return f"I am toxic"
def fetch_toxic_tweets(maxi=2):
sample_df = monty.df_toxic_data.sample(maxi)
is_true = random.choice([True, False])
c1 = "more_toxic"
if is_true:
c1 = "less_toxic"
toxic1 = sample_df[c1].iloc[0]
# toxic1 = "cat eats my homework."
return sample_df.to_html(index=False), toxic1
#
# define all gradio widget/components outside the block for easy to visualize the blocks structure
#
in1 = gradio.Textbox(lines=3, label="Enter Text:")
in2 = gradio.Slider(0.005, .1, value=0.02, step=.005,label="Personalize Safer Value: (larger value is less safe)")
out1 = gradio.Plot(label="Output:")
out2 = gradio.HTML(label="Real-world Toxic Posts/Tweets: *WARNING")
out3 = gradio.Textbox(lines=5, label="Output JSON:")
but1 = gradio.Button("Measure 14 Toxicity", variant="primary",size="sm")
but2 = gradio.Button("Fetch Toxic Text", variant="stop", size="sm")
#
txt1 = """
# πŸ˜ƒ Welcome To The Friendly Text Moderation
### Identify 14 categories of text toxicity.
> This NLP (Natural Language Processing) AI demonstration aims to prevent profanity, vulgarity, hate speech, violence, sexism, and other offensive language.
>It is **not an act of censorship**, as the final UI (User Interface) will give the reader, but not a young reader, the option to click on a label to read the toxic message.
>The goal is to create a safer and more respectful environment for you, your colleages, and your family.
> This NLP app is 1 of 3 hands-on courses, ["AI Solution Architect," from ELVTR and Duc Haba](https://elvtr.com/course/ai-solution-architect?utm_source=instructor&utm_campaign=AISA&utm_content=linkedin).
---
### 🌴 Helpful Instruction:
1. Enter your [harmful] message in the input box.
2. Click the "Measure 14 Toxicity" button.
3. View the result on the Donut plot.
4. (**Optional**) Click on the "Fetch Real World Toxic Dataset" below.
5. There are additional options and notes below.
"""
txt2 = """
## 🌻 Author and Developer Notes:
---
- The demo uses the cutting-edge (2024) AI Natural Language Processing (NLP) model from OpenAI.
- This NLP app is 1 of 3 hands-on apps from the ["AI Solution Architect," from ELVTR and Duc Haba](https://elvtr.com/course/ai-solution-architect?utm_source=instructor&utm_campaign=AISA&utm_content=linkedin).
- It is not a Generative (GenAI) model, such as Google Gemini or GPT-4.
- The NLP understands the message context, nuance, innuendo, and not just swear words.
- We **challenge you** to trick it, i.e., write a toxic tweet or post, but our AI thinks it is safe. If you win, please send us your message.
- The 14 toxicity categories are as follows:
1. harassment
2. harassment threatening
3. harassment instructions
4. hate
5. hate threatening
6. hate instructions
7. self harm
8. self harm instructions
9. self harm intent
10. self harm minor
11. sexual
12. sexual minors
13. violence
14. violence graphic
- If the NLP model classifies the message as "safe," you can still limit the level of toxicity by using the "Personal Safe" slider.
- The smaller the personal-safe value, the stricter the limitation. It means that if you're a young or sensitive adult, you should choose a lower personal-safe value, less than 0.02, to ensure you're not exposed to harmful content.
- The color of the donut plot is as follows:
- Red is an "unsafe" message by the NLP model
- Green is a "safe" message
- Yellow is an "unsafe" message by your toxicity level
- The **"confidence"** score refers to the confidence level in detecting a particular type of toxicity among the 14 tracked types. For instance, if the confidence score is 90%, it indicates a 90% chance that the toxicity detected is of that particular type. In comparison, the remaining 13 toxicities collectively have a 10% chance of being the detected toxicity. Conversely, if the confidence score is 3%, it could indicate any toxicity. It's worth noting that the Red, Green, or Yellow safety levels do not influence the confidence score.
- The real-world dataset is from the Jigsaw Rate Severity of Toxic Comments on Kaggle. It has 30,108 records.
- Citation:
- Ian Kivlichan, Jeffrey Sorensen, Lucas Dixon, Lucy Vasserman, Meghan Graham, Tin Acosta, Walter Reade. (2021). Jigsaw Rate Severity of Toxic Comments . Kaggle. https://kaggle.com/competitions/jigsaw-toxic-severity-rating
- The intent is to share with Duc's friends and colleagues, but for those with nefarious intent, this Text Moderation model is governed by the GNU 3.0 License: https://www.gnu.org/licenses/gpl-3.0.en.html
- Author: Copyright (C), 2024 **[Duc Haba](https://linkedin.com/in/duchaba)**
---
# 🌟 "AI Solution Architect" Course by ELVTR
>Welcome to the fascinating world of AI and natural language processing (NLP). This NLP model is a part of one of three hands-on application. In our journey together, we will explore the [AI Solution Architect](https://elvtr.com/course/ai-solution-architect?utm_source=instructor&utm_campaign=AISA&utm_content=linkedin) course, meticulously crafted by ELVTR in collaboration with Duc Haba. This course is intended to serve as your gateway into the dynamic and constantly evolving field of AI Solution Architect, providing you with a comprehensive understanding of its complexities and applications.
>An AI Solution Architect (AISA) is a mastermind who possesses a deep understanding of the complex technicalities of AI and knows how to creatively integrate them into real-world solutions. They bridge the gap between theoretical AI models and practical, effective applications. AISA works as a strategist to design AI systems that align with business objectives and technical requirements. They delve into algorithms, data structures, and computational theories to translate them into tangible, impactful AI solutions that have the potential to revolutionize industries.
> 🍎 [Sign up for the course today](https://elvtr.com/course/ai-solution-architect?utm_source=instructor&utm_campaign=AISA&utm_content=linkedin), and I will see you in class.
- An article about this NLP Text Moderation will be coming soon.
"""
txt3 = """
## πŸ’₯ WARNING: WARNING:
---
- The following button will retrieve **real-world** offensive posts from Twitter and customer reviews from consumer companies.
- The button will display four toxic messages at a time. **Click again** for four more randomly selected postings/tweets.
- They contain **profanity, vulgarity, hate, violence, sexism, and other offensive language.**
- After you fetch the toxic messages, Click on the **"Measure 14 Toxicity" button**.
"""
#reverse_button.click(process_text, inputs=text_input, outputs=reversed_text)
#
with gradio.Blocks() as gradio_app:
# title
gradio.Markdown(txt1) # any html or simple mark up
#
# first row, has two columns 1/3 size and 2/3 size
with gradio.Row(): # items inside rows are columns
# left column
with gradio.Column(scale=1): # items under columns are row, scale is 1/3 size
# left column has two rows, text entry, and buttons
in1.render()
in2.render()
but1.render()
out3.render()
but1.click(monty.fetch_toxicity_level, inputs=[in1, in2], outputs=[out1,out3])
with gradio.Column(scale=2):
out1.render()
#
# second row is warning text
with gradio.Row():
gradio.Markdown(txt3)
# third row is fetching toxic data
with gradio.Row():
with gradio.Column(scale=1):
but2.render()
but2.click(fetch_toxic_tweets, inputs=None, outputs=[out2, in1])
with gradio.Column(scale=2):
out2.render()
# fourth row is note text
with gradio.Row():
gradio.Markdown(txt2)
# %%write -a app.py
# prompt: start graido_app
gradio_app.launch()