File size: 4,058 Bytes
ac599f3 9e01274 ac599f3 9e01274 ac599f3 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 |
import pandas
import os
def load_csvs_from_folder(folder_path="data"):
csv_data = {}
try:
for filename in os.listdir(folder_path):
if filename.endswith(".csv"):
path = os.path.join(folder_path, filename)
df = pandas.read_csv(path)
csv_data[filename] = df
return csv_data
except Exception as e:
print("Erro ao carregar CSVs:", e)
return "ERROR"
def build_metadata2(csv_data: dict) -> dict:
meta = {}
first_nfe = next(iter(csv_data.values()), None)
if first_nfe:
head = first_nfe.get("head", {})
items = first_nfe.get("items", [])
head_fields_with_type = [(k, type(v).__name__) for k, v in head.items()]
if items:
item_fields_with_type = [(k, type(v).__name__) for k, v in items[0].items()]
else:
item_fields_with_type = []
else:
head_fields_with_type = []
item_fields_with_type = []
num_nfes = len(csv_data)
items_counts = [len(nfe.get("items", [])) for nfe in csv_data.values()]
avg_items_per_nfe = sum(items_counts) / num_nfes if num_nfes > 0 else 0
from datetime import datetime
dates = []
for nfe in csv_data.values():
date_str = nfe.get("head", {}).get("DATA EMISSÃO")
if date_str:
for fmt in (
"%Y-%m-%d",
"%d/%m/%Y",
"%m/%d/%Y",
"%m/%d/%Y %I:%M:%S %p",
"%m/%d/%Y %H:%M:%S",
"%m/%d/%Y %I:%M:%S %p",
):
try:
dates.append(datetime.strptime(date_str.strip(), fmt))
break
except Exception:
continue
else:
try:
import pandas as pd
dt = pd.to_datetime(date_str, errors="coerce")
if pd.notnull(dt):
dates.append(dt.to_pydatetime())
except Exception:
pass
date_range = (
(min(dates).strftime("%Y-%m-%d"), max(dates).strftime("%Y-%m-%d"))
if dates
else (None, None)
)
max_invoice_value = None
max_invoice_chave = None
for chave, nfe in csv_data.items():
head = nfe.get("head", {})
valor = head.get("VALOR NOTA FISCAL")
if valor is not None:
try:
valor_num = float(valor)
except Exception:
continue
if (max_invoice_value is None) or (valor_num > max_invoice_value):
max_invoice_value = valor_num
max_invoice_chave = chave
meta["head_fields_with_type"] = head_fields_with_type
meta["item_fields_with_type"] = item_fields_with_type
meta["avg_items_per_nfe"] = avg_items_per_nfe
meta["max_invoice_value"] = max_invoice_value
meta["max_invoice_chave"] = max_invoice_chave
meta["date_range"] = date_range
meta["num_nfes"] = num_nfes
return meta
def mount_nfe(csv_data: dict) -> dict:
nfes = {}
heads = None
items = None
for name, df in csv_data.items():
if name == "202401_NFs_Cabecalho.csv":
heads = df
if name == "202401_NFs_Itens.csv":
items = df
if heads is not None:
head_records = heads.to_dict(orient="records")
for record in head_records:
chave = record.get("CHAVE DE ACESSO")
if chave is not None:
nfes[chave] = {
"head": record,
"items": [],
}
if items is not None:
from collections import defaultdict
items_by_chave = defaultdict(list)
for record in items.to_dict(orient="records"):
chave = record.get("CHAVE DE ACESSO")
if chave in nfes:
items_by_chave[chave].append(record)
for chave, item_list in items_by_chave.items():
nfes[chave]["items"] = item_list
return nfes
|