Spaces:
Running
Running
from datetime import datetime | |
from contextlib import closing | |
from excel import Driver, Vehicle, Route, Branch, RoutePlan | |
import psycopg2 | |
import streamlit as st | |
class Data: | |
_DEFAULT_ESTIMATE_CONTAINERS = [{ | |
"amount" : 1, | |
"weightKg" : None, | |
"materialTypeId" : 17, # Basura | |
"containerTypeId": 8, # Tambo 200 L | |
"charge_type" : 2, # Por contenedor (?) | |
"unitary_price" : 0, | |
"visit_price" : 0 | |
}] | |
def connect(): | |
return psycopg2.connect(**st.secrets["postgres"]) | |
def __init__(self, recycler_id: int): | |
self.recycler_id = recycler_id | |
def get_drivers(self) -> list[Driver]: | |
# id, name | |
data: list[tuple[int, str]] = [] | |
with closing(Data.connect()) as conn: | |
with conn.cursor() as cur: | |
cur.execute("""SELECT id, name from drivers WHERE id_cliente_reciclador = %s ORDER BY name ASC""", (self.recycler_id, )) | |
data = cur.fetchall() | |
return [Driver(id=id, name=name) for id, name in data] | |
def get_vehicles(self) -> list[Driver]: | |
# id, name | |
data: list[tuple[int, str]] = [] | |
with closing(Data.connect()) as conn: | |
with conn.cursor() as cur: | |
cur.execute("""SELECT id, name from trucks WHERE id_cliente_reciclador = %s ORDER BY name ASC""", (self.recycler_id, )) | |
data = cur.fetchall() | |
return [Vehicle(id=id, name=name) for id, name in data] | |
def get_routes(self, date: datetime) -> list[RoutePlan]: | |
# id, name | |
routes: list[tuple[int, str]] = [] | |
# id, route_id, driver_id, driver_name, vehicle_id, vehicle_name, index, branch_id, branch_name, customer_id, customer_name | |
raw_collections: list[tuple[int, int, int, str, int, str, int, int, str, int, str]] = [] | |
with closing(Data.connect()) as conn: | |
with conn.cursor() as cur: | |
cur.execute("""SELECT id, name FROM cat_route WHERE id_cliente_reciclador = %s AND status = TRUE ORDER BY name ASC""", (self.recycler_id, )) | |
routes = cur.fetchall() | |
with conn.cursor() as cur: | |
cur.execute( | |
""" | |
SELECT | |
r.id, | |
r.id_cat_route route_id, | |
r.id_driver driver_id, | |
d.name driver_name, | |
r.id_truck vehicle_id, | |
t.name vehicle_name, | |
r."order" "index", | |
s.id branch_id, | |
s.nombre_sucursal branch_name, | |
eu.id customer_id, | |
CASE | |
WHEN eu.business_name IS NOT NULL AND TRIM(eu.business_name) != '' THEN eu.business_name | |
WHEN eu.razon_social IS NOT NULL AND TRIM(eu.razon_social ) != '' THEN eu.razon_social | |
ELSE eu.id::text | |
END customer_name | |
FROM | |
recollections r LEFT JOIN | |
drivers d ON r.id_driver = d.id LEFT JOIN | |
trucks t ON r.id_truck = t.id LEFT JOIN | |
sucursales s ON r.id_sucursal = s.id LEFT JOIN | |
end_users eu ON s.id_end_user = eu.id | |
WHERE | |
r.id_cliente_reciclador = %s AND | |
r.enabled = TRUE AND | |
r.date = %s AND | |
r.id_sucursal IS NOT NULL AND | |
s.id_end_user IS NOT NULL | |
ORDER BY | |
r."order" ASC | |
""", (self.recycler_id, f"{date.year}-{date.month}-{date.day}")) | |
raw_collections = cur.fetchall() | |
if len(raw_collections) == 0: | |
return [ | |
RoutePlan( | |
route = Route (id=route_id, name=route_name), | |
driver = Driver (id=-1, name="Sin asignar"), | |
vehicle = Vehicle(id=-1, name="Sin asignar"), | |
collection_points = [] | |
) for route_id, route_name in routes | |
] | |
collections_by_route_id = dict() | |
for id, route_id, driver_id, driver_name, vehicle_id, vehicle_name, index, branch_id, branch_name, customer_id, customer_name in raw_collections: | |
if collections_by_route_id.get(route_id, None) is None: | |
collections_by_route_id[route_id] = [] | |
collections_by_route_id[route_id].append(( | |
index, | |
Driver(id=driver_id, name=driver_name), | |
Vehicle(id=vehicle_id, name=vehicle_name), | |
Branch( | |
branch_id=branch_id, | |
branch_name=branch_name, | |
customer_id=customer_id, | |
customer_name=customer_name | |
), | |
)) | |
output: list[RoutePlan] = [] | |
for (route_id, route_name) in routes: | |
collections: list[tuple[int, Driver, Vehicle, Branch]] = collections_by_route_id.get(route_id, []) | |
collections = sorted(collections, key=lambda x: x[0]) | |
if len(collections) == 0: | |
output.append(RoutePlan( | |
route = Route (id=route_id, name=route_name), | |
driver = Driver (id=-1, name="Sin asignar"), | |
vehicle = Vehicle(id=-1, name="Sin asignar"), | |
collection_points = [] | |
)) | |
continue | |
index, driver, vehicle, b = collections[0] | |
output.append(RoutePlan( | |
route = Route(id=route_id, name=route_name), | |
driver = driver, | |
vehicle = vehicle, | |
collection_points = [branch for __, ___, ____, branch in collections]) | |
) | |
return output | |
def get_branches(self) -> list[Branch]: | |
# id, name | |
data: list[tuple[int, str]] = [] | |
with closing(Data.connect()) as conn: | |
with conn.cursor() as cur: | |
cur.execute( | |
"""SELECT | |
s.id branch_id, | |
s.nombre_sucursal branch_name, | |
eu.id customer_id, | |
CASE | |
WHEN eu.business_name IS NOT NULL AND TRIM(eu.business_name) != '' THEN eu.business_name | |
WHEN eu.razon_social IS NOT NULL AND TRIM(eu.razon_social) != '' THEN eu.razon_social | |
ELSE eu.id::text | |
END customer_name | |
FROM sucursales s LEFT JOIN end_users eu | |
ON s.id_end_user = eu.id | |
WHERE | |
s.id_cliente_reciclador = %s AND | |
s.id_end_user IS NOT NULL | |
ORDER BY | |
customer_name ASC""", (self.recycler_id, )) | |
data = cur.fetchall() | |
return [Branch( | |
branch_id=branch_id, | |
branch_name=branch_name, | |
customer_id=customer_id, | |
customer_name=customer_name | |
) for branch_id, branch_name, customer_id, customer_name in data] | |
def get_estimated_containers(self, branch_ids: list[int]) -> list[list[dict]]: | |
""" | |
Returns list of dictionaries with shape: | |
{ | |
amount: int, | |
materialTypeId: int, | |
containerTypeId: int, | |
unitary_price: float, | |
charge_type: int, | |
visit_price: float | |
} | |
""" | |
if len(branch_ids) == 0: | |
return [] | |
BASE_QUERY = """(SELECT id, estimate_containers FROM recollections WHERE id_cliente_reciclador = %s AND id_sucursal = %s ORDER BY created_at DESC LIMIT 1)""" | |
args = [] | |
full_query = [] | |
for branch_id in branch_ids: | |
args.append(self.recycler_id) | |
args.append(branch_id) | |
full_query.append(BASE_QUERY) | |
full_query = " UNION ALL ".join(full_query) | |
output: list[list[dict]] = [] | |
with closing(Data.connect()) as conn: | |
with conn.cursor() as cur: | |
# Tomar la última recolección programada y usar sus contenedores | |
cur.execute(full_query, args) | |
data: list[tuple[int, list[dict]]] = cur.fetchall() | |
if len(data) == 0: | |
for _ in branch_ids: | |
output.append(Data._DEFAULT_ESTIMATE_CONTAINERS.copy()) | |
for row in data: | |
estimate_containers: list[dict] = [] | |
if row is None or row[1] is None or len(row[1]) == 0: | |
output.append(Data._DEFAULT_ESTIMATE_CONTAINERS.copy()) | |
continue | |
collection_id, containers = row | |
for c in containers: | |
if c.get("materialTypeId", False) and c.get("containerTypeId", False) and c.get("amount", False): | |
estimate_containers.append({ | |
"amount" : c["amount"], | |
"weightKg" : c.get("weightKg", None), | |
"materialTypeId" : c["materialTypeId"], | |
"containerTypeId": c["containerTypeId"], | |
"unitary_price" : c.get("unitary_price", Data._DEFAULT_ESTIMATE_CONTAINERS[0]["unitary_price"]), | |
"charge_type" : c.get("charge_type", Data._DEFAULT_ESTIMATE_CONTAINERS[0]["charge_type"]), | |
"visit_price" : c.get("visit_price", Data._DEFAULT_ESTIMATE_CONTAINERS[0]["visit_price"]) | |
}) | |
output.append(estimate_containers if len(estimate_containers) > 0 else Data._DEFAULT_ESTIMATE_CONTAINERS.copy()) | |
if len(output) == 0: | |
print("[OUTPUT WAS EMPTY]", branch_ids) | |
return output |