import marimo __generated_with = "0.9.20" app = marimo.App(width="medium", app_title="Thomas Saaty", css_file="") @app.cell(hide_code=True) def __(): import numpy as np import marimo as mo import pandas as pd import gspread import random import datetime import time class Criterio: def __init__(self, criteriol, criterior, tooltipl, tooltipr): self.criteriol = criteriol self.criterior = criterior self.tooltipl = tooltipl self.tooltipr = tooltipr def show(self): score = mo.ui.range_slider( start=-9, stop=9, step=1, value=[-1, 1], full_width=True ) textl = mo.md(f'''
{self.criteriol}
''') textr = mo.md(f'''
{self.criterior}
''') return [textl, score, textr] class Titles: def __init__(self, title, nbs): self.title = title self.nbs = nbs def show(self): spaces = '
' * self.nbs return mo.vstack( [mo.md(f"{spaces}"), mo.md(f"# {self.title}"), mo.md(f"{spaces}")] ) class Nivel: def __init__(self, icono, nivel, color): self.icono = icono self.nivel = nivel self.color = color def show(self): return mo.md(f'''
{mo.icon(icon_name = self.icono, color = self.color, size = 40)}
{self.nivel}
''') def CompareLayoutValues(criterios, factores): pares = [] for i in range(1, criterios + 1): for j in range(1, criterios + 1): if i < j: pares.append( [ "Factor de riesgo " + str(i), "Factor de riesgo " + str(j), factores.iat[i - 1, 0], factores.iat[j - 1, 0] ] ) compares = [] hstacks = [] for i in range(int((criterios * (criterios - 1)) / 2)): compares.append(Criterio(pares[i][0], pares[i][1], pares[i][2], pares[i][3]).show()) hstacks.append( mo.hstack( compares[i], justify="space-between", align="center", widths=[1, 6, 1], gap=1.0, ) ) vstack = mo.vstack(hstacks) return compares, vstack def ShowMatrix(compares, criterios): try: with mo.status.spinner(title = 'Iniciando cálculos ...', subtitle = ':)', remove_on_exit = True) as spinner1: time.sleep(1) spinner1.update(title = "Calculando ...", subtitle = 'Matriz de consistencia') values = [] for i in range(len(compares)): values.append( abs(compares[i][1].value[0]) / abs(compares[i][1].value[1]) if compares[i][1].value[0] != 0 and compares[i][1].value[1] != 0 else 1 ) matrix0 = pd.DataFrame([[1] * criterios] * criterios, dtype=float) names = [] k = -1 for i in range(criterios): names.append("Factor de riesgo " + str(i + 1)) for j in range(criterios): if j > i: k += 1 matrix0.iat[i, j] = values[k] elif j < i: matrix0.iat[i, j] = 1 / matrix0.iat[j, i] names = pd.DataFrame(names) matrix0 = pd.concat([names, matrix0], axis=1, ignore_index=True) matrix0.columns = [ "Factores de riesgo\n VS\nFactores de riesgo" ] + names.loc[:, 0].to_list() spinner1.update(title = "Calculando ...", subtitle = 'Matriz normalizada') matrix1 = matrix0.iloc[:, 1:] matrix1 = matrix1.div(matrix1.sum(axis=0), axis=1) matrix1 = pd.concat([names, matrix1], axis=1, ignore_index=True) matrix1.columns = [ "Factores de riesgo\n VS\nFactores de riesgo" ] + names.loc[:, 0].to_list() matrix2 = matrix1.iloc[:, 1:] matrix2 = matrix2.mean(axis=1) matrix3 = matrix0.iloc[:, 1:].T.reset_index(drop=True).T matrix3 = matrix3.dot(matrix2) matrix4 = matrix3.div(matrix2) spinner1.update(title = "Calculando ...", subtitle = 'Matriz de vectores') matrix5 = pd.concat( [names, matrix2, matrix3, matrix4], axis=1, ignore_index=True ) matrix5.columns = [ " Vectores\n VS\nFactores de riesgo", "Vector promedio", "Vector producto", "Vector cociente", ] ym = matrix4.mean() ci = (ym - criterios) / (criterios - 1) cr = ci / np.polyval([-8.44988345e-04, 2.56164206e-02, -2.95897436e-01, 1.61445934e+00, -2.23388889e+00], criterios) * 100 if criterios <= 3: consistencia = 'es consistente.' if cr <= 5 else 'no es consistente.' elif criterios == 4: consistencia = 'es consistente.' if cr <= 9 else 'no es consistente.' else: consistencia = 'es consistente.' if cr <= 10 else 'no es consistente.' text1 = mo.md(f"El cociente promedio resultante es {str(round(ym, 3))}, por lo que el índice CI es igual a {str(round(ci, 3))} y el índice CR es igual {str(round(cr, 3))}%; en concecuencia la matriz de consistencias {consistencia}") matrixs = [ Titles("Matriz de consistencias", 1).show(), matrix0, Titles("Matriz normalizada", 1).show(), matrix1, Titles("Matriz de vectores", 1).show(), matrix5, mo.callout(text1, kind = 'success' if consistencia == "es consistente." else 'danger'), ] spinner1.update(title = "Cálculos finalizados", subtitle = ":)") time.sleep(1) return matrixs, consistencia except: with mo.status.spinner(title = 'Ocurrió un error', subtitle = ':(', remove_on_exit = True) as spinner1: time.sleep(2) def MeanMatrix(factores, sheets, valores, unidades, error): try: with mo.status.spinner(title = 'Iniciando cálculos ...', subtitle = ':)', remove_on_exit = True) as spinner1: time.sleep(1) spinner1.update(title = "Calculando ...", subtitle = 'Matriz de consistencia resultante') try: error = float(error.value) if float(error.value) >= 0.0001 else 0.0001 except: error = 0.0001 riesgos = factores.shape[0] names1 = [] names2 = [] for i in range(riesgos): names1.append(['Factor de riesgo ' + str(i + 1)]) names2.append('Factor de riesgo ' + str(i + 1)) names1 = pd.DataFrame(names1) names0 = [sheet.title for sheet in sheets.worksheets()] exclude = ["Factores de riesgo", "Valores normalizados", "Unidades", "Plan operativo anual"] matrices = [] for i in names0: if i not in exclude: worksheet = sheets.worksheet(i) matriz = pd.DataFrame(worksheet.get_all_records()).iloc[:, 1:riesgos + 1] if matriz.shape[0] == riesgos: matrices.append(matriz) matrix0 = pd.concat(matrices).groupby(level=0).prod() ** (1 / len(matrices)) matrix1 = matrix0.div(matrix0.sum(axis=0), axis=1) matrix2 = matrix1.mean(axis=1) matrix3 = matrix0.T.reset_index(drop=True).T matrix3 = matrix3.dot(matrix2) matrix4 = matrix3.div(matrix2) matrix5 = pd.concat( [names1, matrix0, matrix2, matrix3, matrix4], axis=1, ignore_index=True ) matrix5.columns = ['Parámetros de cálculo\n VS\n Factores de riesgo'] + names2 + ["Vector promedio", "Vector producto", "Vector cociente"] ym = matrix4.mean() ci = (ym - riesgos) / (riesgos - 1) cr = ci / np.polyval([-8.44988345e-04, 2.56164206e-02, -2.95897436e-01, 1.61445934e+00, -2.23388889e+00], riesgos) * 100 if riesgos <= 3: consistencia = 'es consistente.' if cr <= 5 else 'no es consistente.' elif riesgos == 4: consistencia = 'es consistente.' if cr <= 9 else 'no es consistente.' else: consistencia = 'es consistente.' if cr <= 10 else 'no es consistente.' text1 = mo.md(f"El cociente promedio resultante es {str(round(ym, 3))}, por lo que el índice CI es igual a {str(round(ci, 3))} y el índice CR es igual {str(round(cr, 3))}%; en concecuencia la matriz de consistencias {consistencia}") matrix6 = matrix0.T.reset_index(drop=True).T matrix7 = [] columns = ['Vectores promedio\n vs\nFactores de riesgo'] counter = 0 spinner1.update(title = "Calculando ...", subtitle = 'Matriz de convergencia') while True: counter += 1 columns.append('Iteración ' + str(counter)) matrix6, matrix8 = Convergencia(matrix6) matrix7.append(matrix8) if len(matrix7) > 1: if matrix7[-1].sub(matrix7[-2]).abs().sum() < error: break columns[-1] = 'Vector promedio final' matrix7 = pd.concat([names1, pd.concat(matrix7, axis = 1)], axis = 1) matrix7.columns = columns matrix8 = matrix7['Vector promedio final'] matrix9 = valores.iloc[:, 1:].replace(',', '.', regex = True).apply(pd.to_numeric, errors='coerce') matrix9 = matrix9 * matrix8.values matrix10 = pd.DataFrame(matrix9.sum(axis=1), columns=['Nivel de riesgo total']) matrix11 = pd.DataFrame(valores.iloc[:,0]) spinner1.update(title = "Calculando ...", subtitle = 'Matriz de nivel de riesgo') matrix12 = pd.concat([matrix11, matrix9, matrix10], axis=1, ignore_index=False) matrix13 = [] for i in matrix11['Registro de Hidrocarburos']: ubicacion = unidades[unidades['REGISTRO'] == i][['DEPARTAMENTO', 'PROVINCIA', 'DISTRITO']] matrix13.append([Direccion(ubicacion, filtroprovincial, filtrodistrital)]) matrix13 = pd.DataFrame(matrix13, columns = ['Región']) matrix14 = pd.concat([matrix11, matrix13, matrix10], axis=1, ignore_index=False) spinner1.update(title = "Cálculos finalizados", subtitle = ":)") time.sleep(1) return [mo.vstack([Titles("Matriz de consistencia resultante", 1).show(), matrix5, mo.callout(text1, kind = 'success' if consistencia == "es consistente." else 'danger'), Titles("Matriz de convergencia", 1).show(), matrix7, Titles("Matriz de nivel de riesgo", 1).show(), matrix12]), matrix14] except: with mo.status.spinner(title = 'Ocurrió un error', subtitle = ':(', remove_on_exit = True) as spinner1: time.sleep(2) def CreateSheet(sheets, text, matrixs): try: with mo.status.spinner(title = 'Validando credenciales ...', subtitle = ':)', remove_on_exit = True) as spinner1: time.sleep(1) spinner1.update(title = "Registrando ...", subtitle = 'Matriz de consistencia') name = text.value if name == "": name = str(datetime.datetime.now()) names = [sheet.title for sheet in sheets.worksheets()] if name not in names: sheet = sheets.add_worksheet(title=name, rows="20", cols="20") else: sheet = sheets.worksheet(name) sheet.clear() matrix = pd.concat([matrixs[1], matrixs[5].iloc[:, 1:]], axis=1, ignore_index=True) matrix.columns = ['Parámetros de cálculo\n VS\n Factores de riesgo'] + list(matrixs[1].columns[1:]) + list(matrixs[5].columns[1:]) set_2(2) sheet.update([matrix.columns.values.tolist()] + matrix.values.tolist()) set_2(1) spinner1.update(title = "Registro finalizado", subtitle = ":)") time.sleep(1) except: with mo.status.spinner(title = 'Ocurrió un error', subtitle = ':(', remove_on_exit = True) as spinner1: time.sleep(2) def Convergencia(matrix0): matrix1 = matrix0.dot(matrix0) matrix2 = matrix1.sum(axis = 1) matrix2 = matrix2.div(matrix2.sum()) return matrix1, matrix2 def Direccion(ubicacion, filtroprovincial, filtrodistrital): departamento = ubicacion['DEPARTAMENTO'].iat[0] provincia = ubicacion['PROVINCIA'].iat[0] distrito = ubicacion['DISTRITO'].iat[0] if departamento != 'PROV. CONST. DEL CALLAO' and departamento != 'LIMA': region = ubicacion['DEPARTAMENTO'].iat[0] else: if provincia in filtroprovincial or distrito in filtrodistrital: region = 'LIMA SUR' else: region = 'LIMA NORTE' return region def Distribucion(valor): cociente, residuo = valor // 12, valor % 12 lista = [cociente] * 12 for i in range(residuo): lista[i] += 1 random.shuffle(lista) return lista + [sum(lista)] def FixRange(): if get_3()[0] < 0.05: set_3([0.05, get_3()[1]]) if get_3()[1] > 0.95: set_3([get_3()[0], 0.95]) @mo.cache def LoadData(valor): gs = gspread.service_account(filename = valor) clasificacion = pd.read_excel("https://docs.google.com/spreadsheets/d/1CsDG8_l9r-aKX2UvFYASff-pqNSnD2wIz42Z8a-D3EQ/export?format=xlsx&gid=484881519", sheet_name='Clasificación de departamentos') filtroprovincial = clasificacion[clasificacion['OR2'] == 'LIMA SUR']['PROVINCIAS'].to_list() filtrodistrital = clasificacion[clasificacion['OR3'] == 'LIMA SUR']['DISTRITOS'].to_list() niveles = [Nivel("mingcute:safety-certificate-fill", "BAJO", "green").show(), Nivel("fluent:shield-28-filled", "MEDIO", "orange").show(), Nivel("clarity:shield-x-solid", "ALTO", "red").show()] return gs, filtroprovincial, filtrodistrital, niveles @mo.cache def SelectSheets(valor, gs): if valor == "Actividad 050: Estaciones de Servicios y Grifos": url = "2PACX-1vSi7zXsaq2CiXhtAVq_E4h3RJYVvnaj1yATmid_indbg1aEMIgJ_xUYw8QPiBxzJv9wMZHCB4Edr37P" key = "1kgP3kfkXM2ixQyGK_ycvzkZMbp9RkU8vdSg60ucGZT8" elif valor == "Actividad 060: Transporte de combustibles líquidos y OPDH": url = "2PACX-1vTkJhSkqGEIvirM5GGd1SeDHq5Iu2xrwhl8D7tEsAfi347zSOJ-R_EhaGCG5Qo74mmuMesFMFehkE9f" key = "1sSKv_1fNfZGH2tuKq781vb58xh3w8cx-yxChzwxY7C0" elif valor == "Actividad 071: Gasocentro de GLP": url = "2PACX-1vRPv-_mtVtmuUGLXDgbJ941volRsan8Hbf3GwN-5HtUhqhQTRhTdDsMwa-SxFtHNC6H21DjztadBdbz" key = "1MYSVV1MHVYFZxVNpNzloG6kcfX5z7TJXHB9Y687lVpU" iframe = mo.iframe(f''' ''') sheets = gs.open_by_key(key) factores = pd.DataFrame(sheets.worksheet("Factores de riesgo").get_all_records()) valores = sheets.worksheet("Valores normalizados").get_all_values() valores = pd.DataFrame(valores[1:], columns = valores[0]) unidades = sheets.worksheet("Unidades").get_all_values() unidades = pd.DataFrame(unidades[1:], columns = unidades[0]) poa = pd.DataFrame(sheets.worksheet("Plan operativo anual").get_all_records()) return iframe, sheets, factores, valores, unidades, poa get_1, set_1 = mo.state("Actividad 050: Estaciones de Servicios y Grifos") get_2, set_2 = mo.state(0) get_3, set_3 = mo.state([0.5, 0.7]) gs, filtroprovincial, filtrodistrital, niveles = LoadData('credentials.json') mo.output.replace(mo.vstack([ mo.md(f'''
{mo.md(f'''
MÉTODO DEL PROCESO JERÁRQUICO ANALÍTICO
''')}
'''), mo.md('---'), ])) icon1 = mo.icon('mdi:fossil-fuel-outline', size = 50, color = "black") md1 = mo.md("Seleccion el tipo de actividad que desea analizar :") dropdown1 = mo.ui.dropdown(options = ["Actividad 050: Estaciones de Servicios y Grifos", "Actividad 060: Transporte de combustibles líquidos y OPDH", "Actividad 071: Gasocentro de GLP"], value = get_1(), full_width = True, on_change = set_1) mo.output.append(mo.hstack([icon1, md1, dropdown1], align = "center", widths = [1, 8, 7], justify = "space-between", gap = 1.0)) return ( CompareLayoutValues, Convergencia, CreateSheet, Criterio, Direccion, Distribucion, FixRange, LoadData, MeanMatrix, Nivel, SelectSheets, ShowMatrix, Titles, datetime, dropdown1, filtrodistrital, filtroprovincial, get_1, get_2, get_3, gs, gspread, icon1, md1, mo, niveles, np, pd, random, set_1, set_2, set_3, time, ) @app.cell(hide_code=True) def __(SelectSheets, get_1, gs, mo): iframe, sheets, factores, valores, unidades, poa = SelectSheets(get_1(), gs) mo.output.replace(mo.vstack([mo.accordion({'Base de datos': iframe}, lazy = True), mo.md("
")])) icon2 = mo.icon('fluent-mdl2:compare', size = 50, color = "black") md2 = mo.md("Establesca el número de factores de riesgo a comparar : ") criterios = mo.ui.number( start=2, stop=factores.shape[0], step=1, value=factores.shape[0], full_width=True, ) mo.output.append(mo.hstack([icon2, md2, criterios], align = "center", widths = [1, 8, 7], justify = "space-between", gap = 1.0)) return ( criterios, factores, icon2, iframe, md2, poa, sheets, unidades, valores, ) @app.cell(hide_code=True) def __(CompareLayoutValues, criterios, factores, mo): compares, vstack = CompareLayoutValues(criterios.value, factores) mo.output.replace(vstack) return compares, vstack @app.cell(hide_code=True) def __(ShowMatrix, compares, criterios, mo): calcs = mo.ui.button( label=f"{mo.icon('eos-icons:rotating-gear', size = 15, color = "black")} Cálcular matrices de cálculo", tooltip="Define las matrices de consistencias", kind="success", full_width=True, on_click=lambda _: ShowMatrix(compares, criterios.value), ) return (calcs,) @app.cell(hide_code=True) def __(calcs, mo): mo.output.replace(mo.vstack(items = [mo.md("
"), calcs])) return @app.cell(hide_code=True) def __(CreateSheet, calcs, mo, sheets): try: matrixs, consistencia = calcs.value mo.lazy(mo.output.replace(mo.vstack(matrixs)), show_loading_indicator=True) text = mo.ui.text( placeholder='Ingrese su nombre aquí ...', kind='text', full_width=True, ) update = mo.ui.button( label=f"{mo.icon('ic:baseline-cloud-upload', size = 15, color = "black")} Registrar matriz de consistencia", kind="success", full_width=True, on_click=lambda _: CreateSheet(sheets, text, matrixs), ) except: pass return consistencia, matrixs, text, update @app.cell(hide_code=True) def __(consistencia, mo, text, update): try: if consistencia == "es consistente.": mo.output.replace(mo.hstack([text, update], justify='space-between', align='center', widths=[1, 1])) except: pass return @app.cell(hide_code=True) def __(get_2, mo, set_2, time): if get_2() != 0: if get_2() == 1: mo.output.replace(mo.callout(value = mo.md("Matriz de consistencia registrada con exito !"), kind = 'success')) elif get_2() == 2: mo.output.replace(mo.callout(value = mo.md("Matriz de consistencia registrada sin exito !"), kind = 'danger')) set_2(0) time.sleep(3) mo.output.clear() return @app.cell(hide_code=True) def __(get_3, mo, set_3): error = mo.ui.text(value = "0.0001", label = "Error de convergencia permisible:", full_width = True) porcentaje = mo.ui.number(start = 0, stop = 100, step = 1, value = 70, label = "Porcentaje de fiscalizaciones", full_width = True) rango = mo.ui.range_slider(start = 0, stop = 1, step = 0.05, value = get_3(), label = "Nivel de riesgo", on_change = set_3, full_width = True) mo.output.replace(mo.vstack(items = [mo.md("
"), mo.hstack(items = [error, porcentaje], align = "center", justify = "space-between", widths = [1, 1]), rango])) return error, porcentaje, rango @app.cell(hide_code=True) def __(FixRange, get_3, mo, niveles, np): FixRange() menor, mayor = get_3() anchos = np.array([menor - 0, mayor - menor, 1 - mayor]) anchos = anchos / anchos.min() anchos = anchos.tolist() mo.output.replace(mo.hstack(items = niveles, align = "center", justify = "space-between", widths = anchos)) return anchos, mayor, menor @app.cell(hide_code=True) def __(MeanMatrix, error, factores, mo, sheets, unidades, valores): meanmatrix = mo.ui.button( label=f"{mo.icon('eos-icons:rotating-gear', size = 15, color = "black")} Cálcular matrices de resultados", tooltip="Depende de las matrices de consistencias previamente definidas", kind="success", full_width=True, on_click=lambda _: MeanMatrix(factores, sheets, valores, unidades, error), ) return (meanmatrix,) @app.cell(hide_code=True) def __(meanmatrix, mo): mo.output.replace(mo.vstack([mo.md("
"), meanmatrix])) return @app.cell(hide_code=True) def __(meanmatrix, mo): try: mo.lazy(mo.output.replace(meanmatrix.value[0]), show_loading_indicator=True) except: pass return @app.cell(hide_code=True) def __(Distribucion, Titles, meanmatrix, mo, pd, poa, porcentaje): try: regiones = pd.DataFrame(meanmatrix.value[1]["Región"].value_counts()) oficina = mo.ui.dropdown(options=sorted(regiones.index.to_list()), label="Seleccione la región que desea analizar: ", value= sorted(regiones.index.to_list())[0], full_width=True) departamentos = pd.DataFrame(sorted(regiones.index.to_list()), columns = ['Regiones']) distribucion = [] for i in departamentos['Regiones']: acciones = int(poa[poa['REGIONES'] == i]['ACCIONES'].iat[0] * porcentaje.value / 100) distribucion.append(Distribucion(acciones)) distribucion = pd.DataFrame(distribucion, columns= ["Enero", "Febrero", "Marzo", "Abril", "Mayo", "Junio", "Julio", "Agosto", "Septiembre", "Octubre", "Noviembre", "Diciembre", "Total"]) mo.lazy(mo.output.replace(mo.vstack([Titles('Matriz de fiscalización nacional', 1).show(), pd.concat([departamentos, distribucion], ignore_index=False, axis = 1), Titles("Matriz de fiscalización regional", 1).show(), oficina])), show_loading_indicator=True) except: pass return acciones, departamentos, distribucion, i, oficina, regiones @app.cell(hide_code=True) def __(get_3, meanmatrix, mo, np, oficina, pd, poa, porcentaje, regiones): try: matrix1 = meanmatrix.value[1][meanmatrix.value[1]['Región'] == oficina.value].sort_values(by = 'Nivel de riesgo total', ascending = False) matrix1.index = np.arange(1, matrix1.shape[0] + 1) matrix2 = pd.DataFrame(['ALTA'] * int(matrix1[matrix1['Nivel de riesgo total'] >= get_3()[1]].shape[0]) + ['MEDIA'] * int(matrix1[(matrix1['Nivel de riesgo total'] < get_3()[1]) & (matrix1['Nivel de riesgo total'] >= get_3()[0])].shape[0]) + ['BAJA'] * int(matrix1[matrix1['Nivel de riesgo total'] < get_3()[0]].shape[0]), columns=['Prioridad']) matrix2.index = np.arange(1, matrix2.shape[0] + 1) matrix3 = pd.concat([matrix1, matrix2], axis=1, ignore_index=False) callout = mo.callout(value = mo.md(f"La region {oficina.value.title()} cuenta con {regiones.loc[oficina.value].iat[0]} unidades, y según el Plan Operativo Anual 2024, esta debe cumplir con {poa[poa['REGIONES'] == oficina.value]['ACCIONES'].iat[0]} acciones de fiscalización, por lo cual, debera ejecutar {int(poa[poa['REGIONES'] == oficina.value]['ACCIONES'].iat[0] * porcentaje.value / 100)} fiscalizaciones basadas en riesgos, siguiendo el orden de la matriz de fiscalización regional correspondiente."), kind = 'success') mo.lazy(mo.output.replace(mo.vstack([matrix3, callout])), show_loading_indicator=True) except: pass return callout, matrix1, matrix2, matrix3 if __name__ == "__main__": app.run()