| import hmac | |
| import hashlib | |
| import time | |
| import json | |
| import os | |
| import base64 | |
| from typing import Tuple | |
| def sign_payload (payload :dict ,secret :str )->Tuple [str ,str ]: | |
| payload ["_ts"]=int (time .time ()) | |
| body =json .dumps (payload ,separators =(",",":"),sort_keys =True ) | |
| sig =hmac .new ( | |
| secret .encode (), | |
| body .encode (), | |
| hashlib .sha256 | |
| ).hexdigest () | |
| return body ,sig | |
| def verify_payload (body :str ,sig :str ,secret :str ,max_age_seconds :int =300 )->dict : | |
| expected =hmac .new ( | |
| secret .encode (), | |
| body .encode (), | |
| hashlib .sha256 | |
| ).hexdigest () | |
| if not hmac .compare_digest (expected ,sig ): | |
| raise ValueError ("Invalid HMAC signature") | |
| data =json .loads (body ) | |
| ts =data .get ("_ts",0 ) | |
| if abs (time .time ()-ts )>max_age_seconds : | |
| raise ValueError (f"Stale request: {abs (time .time ()-ts ):.0f}s old") | |
| return data | |
| def generate_self_signed_cert (cert_path :str ,key_path :str ,cn :str ="localhost"): | |
| from cryptography import x509 | |
| from cryptography .x509 .oid import NameOID | |
| from cryptography .hazmat .primitives import hashes ,serialization | |
| from cryptography .hazmat .primitives .asymmetric import rsa | |
| from cryptography .hazmat .backends import default_backend | |
| import datetime | |
| key =rsa .generate_private_key ( | |
| public_exponent =65537 , | |
| key_size =2048 , | |
| backend =default_backend () | |
| ) | |
| subject =issuer =x509 .Name ([ | |
| x509 .NameAttribute (NameOID .COMMON_NAME ,cn ), | |
| ]) | |
| cert =( | |
| x509 .CertificateBuilder () | |
| .subject_name (subject ) | |
| .issuer_name (issuer ) | |
| .public_key (key .public_key ()) | |
| .serial_number (x509 .random_serial_number ()) | |
| .not_valid_before (datetime .datetime .utcnow ()) | |
| .not_valid_after (datetime .datetime .utcnow ()+datetime .timedelta (days =3650 )) | |
| .add_extension ( | |
| x509 .SubjectAlternativeName ([x509 .DNSName (cn )]), | |
| critical =False , | |
| ) | |
| .sign (key ,hashes .SHA256 (),default_backend ()) | |
| ) | |
| with open (cert_path ,"wb")as f : | |
| f .write (cert .public_bytes (serialization .Encoding .PEM )) | |
| with open (key_path ,"wb")as f : | |
| f .write (key .private_bytes ( | |
| serialization .Encoding .PEM , | |
| serialization .PrivateFormat .TraditionalOpenSSL , | |
| serialization .NoEncryption () | |
| )) | |
| def load_or_create_secret (path :str ="shared_secret.key")->str : | |
| if os .path .exists (path ): | |
| with open (path ,"r")as f : | |
| return f .read ().strip () | |
| secret =base64 .urlsafe_b64encode (os .urandom (32 )).decode () | |
| with open (path ,"w")as f : | |
| f .write (secret ) | |
| return secret | |