raannakasturi commited on
Commit
d93884d
1 Parent(s): fbf1419

Upload 12 files

Browse files
Files changed (12) hide show
  1. .gitignore +6 -0
  2. GUI.py +39 -0
  3. acme_tools.py +71 -0
  4. dns_cf.py +62 -0
  5. genPVTCSR.py +82 -0
  6. gen_records.py +45 -0
  7. getGoogleEAB.py +36 -0
  8. getTokenCert.py +86 -0
  9. getZeroSSLEAB.py +18 -0
  10. main.py +93 -0
  11. requirements.txt +3 -0
  12. tools.py +70 -0
.gitignore ADDED
@@ -0,0 +1,6 @@
 
 
 
 
 
 
 
1
+ .env
2
+ certificate*
3
+ /raannakasturi
4
+ /thenayankasturi*
5
+ /__pycache__
6
+ *.json
GUI.py ADDED
@@ -0,0 +1,39 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import gradio as gr
2
+ from main import main
3
+
4
+ def run(i_domains, wildcard, email, ca_server, key_type, key_size=None, key_curve=None):
5
+ pvt, pvt_file, cert, cert_file = main(i_domains, wildcard, email, ca_server, key_type, key_size, key_curve)
6
+ return pvt, pvt_file, cert, cert_file
7
+
8
+ def update_key_options(key_type):
9
+ if key_type == "rsa":
10
+ return gr.update(visible=True), gr.update(visible=False)
11
+ else:
12
+ return gr.update(visible=False), gr.update(visible=True)
13
+
14
+ with gr.Blocks() as app:
15
+ with gr.Row():
16
+ with gr.Column():
17
+ domains_input = gr.Textbox(label="Enter Domains", placeholder="thenayankasturi.eu.org, dash.thenayankasturi.eu.org, www.thenayankasturi.eu.org", type="text", interactive=True)
18
+ wildcard = gr.Checkbox(label="Wildcard SSL", interactive=True, value=False)
19
+ email_input = gr.Textbox(label="Enter your Email ID", placeholder="nayankasturi@gmail.com", type="text", interactive=True)
20
+ with gr.Row():
21
+ ca_server = gr.Dropdown(label="Select Certificate Authority", choices=["Let's Encrypt (Testing)","Let's Encrypt", "Buypass (Testing)", "Buypass", "ZeroSSL", "Google (Testing)","Google", "SSL.com"], interactive=True, value="Let's Encrypt (Testing)")
22
+ key_type = gr.Radio(label="Select SSL key type", choices=["rsa", "ec"], interactive=True, value='ec')
23
+ key_size_dropdown = gr.Dropdown(label="Select Key Size", choices=['2048', '4096'], value='4096', visible=False) # Initially visible
24
+ key_curve_dropdown = gr.Dropdown(label="Select Key Curve", choices=['SECP256R1', 'SECP384R1'], value='SECP384R1', visible=True) # Initially hidden
25
+
26
+ key_type.change(fn=update_key_options, inputs=key_type, outputs=[key_size_dropdown, key_curve_dropdown])
27
+ btn = gr.Button(value="Generate SSL Certificate")
28
+
29
+ with gr.Row():
30
+ with gr.Column():
31
+ pvt = gr.Textbox(label="Your Private Key", placeholder="Your Private Key will appear here, after successful SSL generation", type="text", interactive=False, show_copy_button=True, lines=10, max_lines=10)
32
+ pvtfile = gr.File(label="Download your Private Key")
33
+ with gr.Column():
34
+ crt = gr.Textbox(label="Your SSL Certificate", placeholder="Your SSL Certificate will appear here, after successful SSL generation", type="text", interactive=False, show_copy_button=True, lines=10, max_lines=10)
35
+ crtfile = gr.File(label="Download your SSL Certificate")
36
+
37
+ btn.click(run, inputs=[domains_input, wildcard, email_input, ca_server, key_type, key_size_dropdown, key_curve_dropdown], outputs=[pvt, pvtfile, crt, crtfile])
38
+
39
+ app.queue(default_concurrency_limit=15).launch()
acme_tools.py ADDED
@@ -0,0 +1,71 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import sys
2
+ import josepy as jose
3
+ from acme import messages, jose
4
+ from acme import client, messages
5
+ from cryptography.hazmat.primitives.asymmetric import rsa, ec
6
+ from cryptography.hazmat.backends import default_backend
7
+
8
+ def pg_client(directory, key_type="rsa", key_size=None, key_curve=None):
9
+ try:
10
+ if key_type.lower() == "rsa":
11
+ if key_size == "" or key_size == None:
12
+ key_size = 4096
13
+ rsa_key = rsa.generate_private_key(public_exponent=65537, key_size=key_size, backend=default_backend())
14
+ account_key = jose.JWKRSA(key=rsa_key)
15
+ net = client.ClientNetwork(account_key, user_agent='project-gatekeeper/v1.5')
16
+ directory_obj = messages.Directory.from_json(net.get(directory).json())
17
+ acme_client = client.ClientV2(directory_obj, net=net)
18
+ return acme_client
19
+ elif key_type.lower() == "ec":
20
+ if key_curve == "" or key_curve == None:
21
+ key_curve = "ec256"
22
+ if key_curve == 'SECP256R1' or key_curve == 'ec256':
23
+ ec_key = ec.generate_private_key(ec.SECP256R1(), default_backend())
24
+ algo=jose.ES256
25
+ elif key_curve == 'SECP384R1' or key_curve == 'ec384':
26
+ ec_key = ec.generate_private_key(ec.SECP384R1(), default_backend())
27
+ algo=jose.ES384
28
+ account_key = jose.JWKEC(key=ec_key)
29
+ net = client.ClientNetwork(account_key, alg=algo, user_agent='project-gatekeeper/v2')
30
+ response = net.get(directory)
31
+ directory_obj = messages.Directory.from_json(response.json())
32
+ acme_client = client.ClientV2(directory_obj, net=net)
33
+ return acme_client
34
+ else:
35
+ print("Invalid key_type")
36
+ sys.exit()
37
+ except:
38
+ print("Error in initialization")
39
+ sys.exit()
40
+
41
+ def new_account(pgclient, email, kid=None, hmac=None):
42
+ external_account_binding = None
43
+ if kid and hmac:
44
+ if isinstance(hmac, bytes):
45
+ hmac = hmac.decode('utf-8')
46
+ if not isinstance(hmac, str):
47
+ print("Error: HMAC is not a string after decoding.")
48
+ return False
49
+ try:
50
+ hmac_bytes = jose.b64.b64decode(hmac)
51
+ except Exception as e:
52
+ print(f"Error decoding HMAC key: {e}")
53
+ return False
54
+ hmac_key_b64 = jose.b64.b64encode(hmac_bytes).decode('utf-8')
55
+ external_account_binding = messages.ExternalAccountBinding.from_data(
56
+ account_public_key=pgclient.net.key,
57
+ kid=kid,
58
+ hmac_key=hmac_key_b64,
59
+ directory=pgclient.directory
60
+ )
61
+ registration = messages.NewRegistration.from_data(
62
+ email=email,
63
+ terms_of_service_agreed=True,
64
+ external_account_binding=external_account_binding
65
+ )
66
+ try:
67
+ account = pgclient.new_account(registration)
68
+ return account
69
+ except Exception as e:
70
+ print(f"Error creating account: {e}")
71
+ return False
dns_cf.py ADDED
@@ -0,0 +1,62 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import requests
2
+ import os
3
+ from dotenv import load_dotenv
4
+
5
+ load_dotenv()
6
+ cf_token = os.environ.get("CF_TOKEN")
7
+ cf_zone_id = os.environ.get("CF_ZONE_ID")
8
+ cf_email = os.environ.get("CF_EMAIL_ID")
9
+ cf_api = "https://api.cloudflare.com/client/v4/"
10
+
11
+ headers = {
12
+ "Content-Type": "application/json",
13
+ "X-Auth-Email": cf_email,
14
+ "X-Auth-Key": cf_token
15
+ }
16
+
17
+ def check_txt():
18
+ cf_endpoint = f"zones/{cf_zone_id}/dns_records"
19
+ url = f"{cf_api}{cf_endpoint}"
20
+ response = requests.request("GET", url, headers=headers)
21
+ data = response.json()
22
+ record_ids = []
23
+ record_names = []
24
+ for record in data['result']:
25
+ if record['type'] == "TXT":
26
+ record_id = record['id']
27
+ record_name = record['name']
28
+ record_ids.append(record_id)
29
+ record_names.append(record_name)
30
+ else:
31
+ continue
32
+ return record_ids, record_names
33
+
34
+ def add_txt(txt_rec, txt_value, ssl_email):
35
+ cf_endpoint = f"zones/{cf_zone_id}/dns_records"
36
+ url = f"{cf_api}{cf_endpoint}"
37
+ name = txt_rec
38
+ data = {
39
+ "type": "TXT",
40
+ "name": name,
41
+ "content": txt_value,
42
+ "proxied": False,
43
+ "comment": f"SSL Verification for {ssl_email}"
44
+ }
45
+ response = requests.request("POST", url, headers=headers, json=data)
46
+ return response.json()
47
+
48
+ def del_txt(txt_name):
49
+ res = None
50
+ record_ids, record_names = check_txt()
51
+ for record_id, record_name in zip(record_ids, record_names):
52
+ if record_name.startswith(txt_name):
53
+ try:
54
+ cf_endpoint = f"zones/{cf_zone_id}/dns_records/{record_id}"
55
+ url = f"{cf_api}{cf_endpoint}"
56
+ requests.request("DELETE", url, headers=headers)
57
+ res = "records deleted"
58
+ except Exception as e:
59
+ res = f"Error deleting records: {e}"
60
+ else:
61
+ res = "records not found"
62
+ return res
genPVTCSR.py ADDED
@@ -0,0 +1,82 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ from cryptography.hazmat.primitives import serialization, hashes
2
+ from cryptography import x509
3
+ from cryptography.hazmat.primitives.asymmetric import ec, rsa
4
+ from cryptography.hazmat.backends import default_backend
5
+ from cryptography.x509.oid import NameOID
6
+ from typing import List, Tuple
7
+
8
+ def gen_pvt(key_type: str, key_size: int = None, key_curve: str = None) -> bytes:
9
+ if key_type.lower() == "ec":
10
+ if key_curve == 'SECP256R1' or key_curve == 'ec256':
11
+ key = ec.generate_private_key(ec.SECP256R1(), default_backend())
12
+ elif key_curve == 'SECP384R1' or key_curve == 'ec384':
13
+ key = ec.generate_private_key(ec.SECP384R1(), default_backend())
14
+ else:
15
+ key = ec.generate_private_key(ec.SECP256R1(), default_backend())
16
+ private_key = key.private_bytes(
17
+ encoding=serialization.Encoding.PEM,
18
+ format=serialization.PrivateFormat.TraditionalOpenSSL,
19
+ encryption_algorithm=serialization.NoEncryption()
20
+ )
21
+ elif key_type.lower() == "rsa":
22
+ if key_size not in [2048, 4096]:
23
+ key_size = 4096
24
+ key = rsa.generate_private_key(
25
+ public_exponent=65537,
26
+ key_size=key_size,
27
+ backend=default_backend()
28
+ )
29
+ private_key = key.private_bytes(
30
+ encoding=serialization.Encoding.PEM,
31
+ format=serialization.PrivateFormat.TraditionalOpenSSL,
32
+ encryption_algorithm=serialization.NoEncryption()
33
+ )
34
+ else:
35
+ raise ValueError("Unsupported key type or parameters")
36
+ return private_key
37
+
38
+ def gen_csr(private_key: bytes, domains: List[str], email: str, common_name: str = None, country: str = None,
39
+ state: str = None, locality: str = None, organization: str = None, organization_unit: str = None) -> bytes:
40
+
41
+ ssl_domains = [x509.DNSName(domain.strip()) for domain in domains]
42
+ private_key_obj = serialization.load_pem_private_key(private_key, password=None, backend=default_backend())
43
+ try:
44
+ if email.split("@")[1] in ["demo.com", "example.com"] or email.count("@") > 1 or email.count(".") < 1 or email is None:
45
+ print("Invalid email address")
46
+ email = f"admin@{domains[0]}"
47
+ except Exception as e:
48
+ print(f"Error in email address: {e}")
49
+ email = f"admin@{domains[0]}"
50
+ country: str = country or "IN"
51
+ state: str = state or "Maharashtra"
52
+ locality: str = locality or "Mumbai"
53
+ organization_unit: str = organization_unit or "IT Department"
54
+ common_name: str = common_name or domains[0]
55
+ organization: str = organization or common_name.split(".")[0]
56
+ subject = x509.Name([
57
+ x509.NameAttribute(NameOID.COUNTRY_NAME, country),
58
+ x509.NameAttribute(NameOID.STATE_OR_PROVINCE_NAME, state),
59
+ x509.NameAttribute(NameOID.LOCALITY_NAME, locality),
60
+ x509.NameAttribute(NameOID.EMAIL_ADDRESS, email),
61
+ x509.NameAttribute(NameOID.ORGANIZATION_NAME, organization),
62
+ x509.NameAttribute(NameOID.ORGANIZATIONAL_UNIT_NAME, organization_unit),
63
+ x509.NameAttribute(NameOID.COMMON_NAME, common_name),
64
+ ])
65
+ builder = x509.CertificateSigningRequestBuilder()
66
+ builder = builder.subject_name(subject)
67
+ builder = builder.add_extension(
68
+ x509.SubjectAlternativeName(ssl_domains),
69
+ critical=False,
70
+ )
71
+ csr = builder.sign(private_key_obj, hashes.SHA256(), default_backend())
72
+ return csr.public_bytes(serialization.Encoding.PEM)
73
+
74
+ def gen_pvt_csr(domains: List[str], key_type: str, key_size: int = None, key_curve: str = None, email: str = None,
75
+ common_name: str = None, country: str = None, state: str = None, locality: str = None,
76
+ organization: str = None, organization_unit: str = None) -> Tuple[bytes, bytes]:
77
+ if key_type.lower() == "rsa":
78
+ private_key = gen_pvt(key_type, key_size)
79
+ else:
80
+ private_key = gen_pvt(key_type, key_curve)
81
+ csr = gen_csr(private_key, domains, email, common_name, country, state, locality, organization, organization_unit)
82
+ return private_key, csr
gen_records.py ADDED
@@ -0,0 +1,45 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import hashlib
2
+
3
+ def get_domains(i_domains):
4
+ domains = []
5
+ for domain in i_domains.split(","):
6
+ domain = domain.strip()
7
+ domains.append(domain)
8
+ return domains
9
+
10
+ def prefix(domain):
11
+ domain_bytes = domain.encode()
12
+ prefix = hashlib.blake2b(domain_bytes, digest_size=12).hexdigest()
13
+ return prefix
14
+
15
+ def gen_cname_recs(domains):
16
+ cname_recs = []
17
+ for domain in domains:
18
+ cname_rec = f"_acme-challenge.{domain}"
19
+ cname_recs.append(cname_rec)
20
+ return cname_recs
21
+
22
+ def gen_cname_values(domains, cf_domain, exchange):
23
+ temp_cname_values = []
24
+ cname_values = []
25
+ for domain in domains:
26
+ cname_value = prefix(domain)
27
+ cname_value = f"{cname_value}.{domain}"
28
+ temp_cname_values.append(cname_value)
29
+ for cname_value in temp_cname_values:
30
+ modified_cname_value = cname_value.replace(exchange, cf_domain)
31
+ cname_values.append(modified_cname_value)
32
+ return cname_values
33
+
34
+ def gen_cname(domains, cf_domain, exchange):
35
+ cname_recs = gen_cname_recs(domains)
36
+ cname_values = gen_cname_values(domains, cf_domain, exchange)
37
+ return cname_recs, cname_values
38
+
39
+ def txt_recs(txt_records, exchange):
40
+ txt_record = txt_records.replace("_acme-challenge.", "")
41
+ txt_rec = txt_record.replace(f"{exchange}", "")
42
+ pre = prefix(txt_record)
43
+ rec = f"{pre}.{txt_rec}"
44
+ rec = rec.strip(".")
45
+ return rec
getGoogleEAB.py ADDED
@@ -0,0 +1,36 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import os
2
+ from dotenv import load_dotenv
3
+ from google.oauth2 import service_account
4
+ from google.cloud.security.publicca import PublicCertificateAuthorityServiceClient as productionclient
5
+ from google.cloud.security.publicca_v1beta1 import PublicCertificateAuthorityServiceClient as testingclient
6
+
7
+ def gen_google_eab_data():
8
+ load_dotenv()
9
+ data = {
10
+ "type": "service_account",
11
+ "project_id": os.getenv("PROJECT_ID"),
12
+ "private_key_id": os.getenv("PRIVATE_KEY_ID"),
13
+ "private_key": os.getenv("PRIVATE_KEY"),
14
+ "client_email": os.getenv("CLIENT_EMAIL"),
15
+ "client_id": os.getenv("CLIENT_ID"),
16
+ "auth_uri": os.getenv("AUTH_URI"),
17
+ "token_uri": os.getenv("TOKEN_URI"),
18
+ "auth_provider_x509_cert_url": os.getenv("AUTH_PROVIDER_X509_CERT_URL"),
19
+ "client_x509_cert_url": os.getenv("CLIENT_X509_CERT_URL"),
20
+ "universe_domain": os.getenv("UNIVERSE_DOMAIN")
21
+ }
22
+ return data
23
+
24
+ def gen_google_eab(test:bool):
25
+ service_account_info = gen_google_eab_data()
26
+ credentials = service_account.Credentials.from_service_account_info(service_account_info)
27
+ if test:
28
+ client = testingclient(credentials=credentials)
29
+ else:
30
+ client = productionclient(credentials=credentials)
31
+ project_id = service_account_info['project_id']
32
+ parent = f"projects/{project_id}"
33
+ response = client.create_external_account_key(parent=parent)
34
+ kid = response.key_id
35
+ hmac = response.b64_mac_key
36
+ return kid, hmac
getTokenCert.py ADDED
@@ -0,0 +1,86 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import datetime
2
+ from acme import errors, challenges
3
+
4
+ def challens(order, directory) -> dict:
5
+ challs = {}
6
+ for auth in list(order.authorizations):
7
+ for challenge in auth.body.challenges:
8
+ if isinstance(challenge.chall, challenges.DNS01):
9
+ domain = auth.body.identifier.value
10
+ challs[domain] = challs[domain] if domain in challs else []
11
+ challs[domain].append(challenge)
12
+ if not challs:
13
+ msg = f"ACME server at '{directory}' does not support DNS-01 challenge."
14
+ raise errors.ChallengeUnavailable(msg.format(directory=str(directory)))
15
+ return challs
16
+
17
+ def get_tokens(client, csr, directory):
18
+ verification_tokens = {}
19
+ responses = {}
20
+ order = client.new_order(csr)
21
+ challs = challens(order, directory)
22
+ for domain, challenge_items in challs.items():
23
+ domain = f"_acme-challenge.{domain}"
24
+ for challenge in challenge_items:
25
+ verification_tokens[domain] = verification_tokens[domain] if domain in verification_tokens else []
26
+ response, validation = challenge.response_and_validation(client.net.key)
27
+ verification_tokens[domain].append(validation)
28
+ responses[challenge.chall.token] = response
29
+ _verification_tokens = verification_tokens
30
+ return verification_tokens, challs, order
31
+
32
+ def process_challenge(client, challenge, domain):
33
+ try:
34
+ response, _validation = challenge.response_and_validation(client.net.key)
35
+ print(f"Challenge verified for domain: {domain}")
36
+ token = challenge.chall.token
37
+ if isinstance(token, bytes):
38
+ token = token.decode('utf-8', errors='replace')
39
+ return response, token
40
+ except Exception as e:
41
+ print(f"Error processing challenge for domain {domain}: {e}")
42
+ return None, None
43
+
44
+ def answer_challenge(client, challenge, response, domain):
45
+ try:
46
+ answer = client.answer_challenge(challenge, response)
47
+ print(f"Challenge answered for domain: {domain}")
48
+ return answer
49
+ except Exception as e:
50
+ print(f"Error answering challenge for domain {domain}: {e}")
51
+ return None
52
+
53
+ def finalize_order(client, order, deadline):
54
+ try:
55
+ return client.poll_and_finalize(order, deadline=deadline)
56
+ except Exception as e:
57
+ print(f"Error finalizing order: {e}")
58
+ return None
59
+
60
+ def retrieve_certificate(final_order):
61
+ try:
62
+ return final_order.fullchain_pem.encode()
63
+ except Exception as e:
64
+ print(f"Error retrieving certificate: {e}")
65
+ return None
66
+
67
+ def verify_tokens(client, challs, order):
68
+ deadline = datetime.datetime.now() + datetime.timedelta(seconds=90)
69
+ answers = []
70
+ responses = {}
71
+ for domain, challenge_list in challs.items():
72
+ print(f"Fetching challenges for domain: {domain}")
73
+ for challenge in challenge_list:
74
+ response, token = process_challenge(client, challenge, domain)
75
+ if response is None:
76
+ continue
77
+
78
+ responses[token] = response
79
+ answer = answer_challenge(client, challenge, response, domain)
80
+ if answer is not None:
81
+ answers.append(answer)
82
+ final_order = finalize_order(client, order, deadline)
83
+ if final_order is None:
84
+ return None
85
+ cert = retrieve_certificate(final_order)
86
+ return cert
getZeroSSLEAB.py ADDED
@@ -0,0 +1,18 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import requests
2
+ import os
3
+ from dotenv import load_dotenv
4
+
5
+ def gen_zero_ssl_eab():
6
+ load_dotenv()
7
+ apikey = os.getenv("ZEROSSLAPI")
8
+ url = "https://api.zerossl.com/acme/eab-credentials"
9
+ headers = {'Content-Type': 'application/json'}
10
+ resp = requests.post(url, params={'access_key': apikey}, headers=headers)
11
+ print(resp.json())
12
+ if resp.json()['success'] == False:
13
+ print("Error: ", resp.json()['error'])
14
+ return "Error", "Error"
15
+ else:
16
+ kid = resp.json()['eab_kid']
17
+ hmac = resp.json()['eab_hmac_key']
18
+ return kid, hmac
main.py ADDED
@@ -0,0 +1,93 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import os
2
+ import sys
3
+ import time
4
+ from genPVTCSR import gen_pvt_csr
5
+ from tools import get_domains, get_ca_server, get_kid_hmac, extract_subdomains, write_file
6
+ from acme_tools import pg_client, new_account
7
+ from getTokenCert import get_tokens, verify_tokens
8
+ from gen_records import txt_recs
9
+ from dns_cf import add_txt, del_txt
10
+
11
+ def cf_non_wildcard(verification_tokens, email, exchange):
12
+ tokens = verification_tokens
13
+ for key, value in tokens.items():
14
+ txt_rec = txt_recs(key, exchange)
15
+ txt_value = value[0].strip()
16
+ try:
17
+ del_txt(txt_rec)
18
+ except Exception as e:
19
+ print(f"Error deleting TXT records or no TXT records exists: {e}")
20
+ add_txt(txt_rec, txt_value, ssl_email=email)
21
+
22
+ def cf_wildcard(verification_tokens, email, exchange):
23
+ tokens = verification_tokens
24
+ for key, value in tokens.items():
25
+ txt_rec = txt_recs(key, exchange)
26
+ try:
27
+ del_txt(txt_rec)
28
+ except Exception as e:
29
+ print(f"Error deleting TXT records or no TXT records exists: {e}")
30
+ for txt_value in value:
31
+ add_txt(txt_rec, txt_value, ssl_email=email)
32
+
33
+ def main(i_domains, wildcard, email, ca_server, key_type, key_size=None, key_curve=None, kid=None, hmac=None):
34
+ domains = get_domains(i_domains)
35
+ exchange = extract_subdomains(domains=domains)
36
+ if wildcard:
37
+ domains = [exchange, f'*.{exchange}']
38
+ ca_server_url = get_ca_server(ca_server, key_type)
39
+ pgk_client = pg_client(ca_server_url, key_type=key_type, key_size=key_size, key_curve=key_curve)
40
+ if pgk_client is None:
41
+ exit()
42
+ nkid, nhmac = get_kid_hmac(ca_server)
43
+ if nkid == 'Error' or nhmac == 'Error':
44
+ print("Try with another provider or contact us")
45
+ sys.exit(1)
46
+ else:
47
+ kid = nkid
48
+ hmac = nhmac
49
+ print("KID:",kid)
50
+ print("HMAC:",hmac)
51
+ print(f"CA SERVER({ca_server}):", ca_server_url)
52
+ account = new_account(pgk_client, email, kid=kid, hmac=hmac)
53
+ if not account:
54
+ exit()
55
+ private_key, csr = gen_pvt_csr(domains=domains, email=email, key_type=key_type, key_curve=key_curve, key_size=key_size)
56
+ verification_tokens, challs, order = get_tokens(pgk_client, csr, ca_server_url)
57
+ try:
58
+ if wildcard:
59
+ cf_wildcard(verification_tokens, email, exchange)
60
+ else:
61
+ cf_non_wildcard(verification_tokens, email, exchange)
62
+ except:
63
+ print("Error adding TXT records")
64
+ sys.exit(1)
65
+ for i in range(60):
66
+ print(f"Waiting for {60-i} seconds", end="\r")
67
+ time.sleep(1)
68
+ cert = verify_tokens(pgk_client, challs, order)
69
+ for key, _value in verification_tokens.items():
70
+ txt_rec = txt_recs(key, exchange)
71
+ try:
72
+ del_txt(txt_rec)
73
+ print("TXT records deleted successfully")
74
+ except Exception as e:
75
+ print(f"Error deleting TXT records or no TXT records exists: {e}")
76
+ path = email.split("@")[0]
77
+ os.makedirs(path, exist_ok=True)
78
+ write_file(f"{path}/private.pem", private_key)
79
+ write_file(f"{path}/domain.csr", csr)
80
+ write_file(f"{path}/cert.pem", cert)
81
+ return private_key, f"{path}/private.pem", cert, f"{path}/cert.pem"
82
+ """
83
+ if __name__ == "__main__":
84
+ DOMAINS = 'thenayankasturi.eu.org, *.thenayankasturi.eu.org'
85
+ ca_server = "Google (Testing)" #Let's Encrypt (Testing), Let's Encrypt, Buypass (Testing), Buypass, zerossl, google_test, google, ssccom
86
+ EMAIL = "raannakasturi@mail.com"
87
+ key_type = "ec"
88
+ key_curve = "ec256"
89
+ key_size = None
90
+ KID = None
91
+ HMAC = None
92
+ main(i_domains=DOMAINS, wildcard=True, email=EMAIL, ca_server=ca_server, key_type=key_type, key_size=key_size,key_curve=key_curve, kid=KID, hmac=HMAC)
93
+ """
requirements.txt ADDED
@@ -0,0 +1,3 @@
 
 
 
 
1
+ josepy==1.14.0
2
+ python-dotenv
3
+ acme==2.11.0
tools.py ADDED
@@ -0,0 +1,70 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+
2
+
3
+ from getGoogleEAB import gen_google_eab
4
+ from getZeroSSLEAB import gen_zero_ssl_eab
5
+
6
+
7
+ def get_domains(i_domains):
8
+ domains = []
9
+ for domain in i_domains.split(","):
10
+ domain = domain.strip()
11
+ domains.append(domain)
12
+ return domains
13
+
14
+ def extract_subdomains(domains):
15
+ exchange = min(domains, key=len)
16
+ return exchange
17
+
18
+ def get_ca_server(caserver, key_type):
19
+ DEFAULT_LET_ENCRYPT_URL = "https://acme-v02.api.letsencrypt.org/directory"
20
+ urls = {
21
+ "SSL.com": {
22
+ "rsa": "https://acme.ssl.com/sslcom-dv-rsa",
23
+ "ec": "https://acme.ssl.com/sslcom-dv-ecc"
24
+ },
25
+ "Let's Encrypt (Testing)": "https://acme-staging-v02.api.letsencrypt.org/directory",
26
+ "Let's Encrypt": DEFAULT_LET_ENCRYPT_URL,
27
+ "Buypass (Testing)": "https://api.test4.buypass.no/acme/directory",
28
+ "Buypass": "https://api.Buypass.com/acme/directory",
29
+ "ZeroSSL": "https://acme.zerossl.com/v2/DV90",
30
+ "Google (Testing)": "https://dv.acme-v02.test-api.pki.goog/directory",
31
+ "Google": "https://dv.acme-v02.api.pki.goog/directory"
32
+ }
33
+ if caserver in urls:
34
+ if isinstance(urls[caserver], dict):
35
+ return urls[caserver].get(key_type, DEFAULT_LET_ENCRYPT_URL)
36
+ else:
37
+ return urls[caserver]
38
+ return DEFAULT_LET_ENCRYPT_URL
39
+
40
+ def get_kid_hmac(server):
41
+ if server == "SSL.com":
42
+ return None, None
43
+ elif server == "Let's Encrypt (Testing)":
44
+ return None, None
45
+ elif server == "Let's Encrypt":
46
+ return None, None
47
+ elif server == "Buypass (Testing)":
48
+ return None, None
49
+ elif server == "Buypass":
50
+ return None, None
51
+ elif server == "ZeroSSL":
52
+ kid, hmac = gen_zero_ssl_eab()
53
+ return kid, hmac
54
+ elif server == "Google (Testing)":
55
+ kid, hmac = gen_google_eab(test=True)
56
+ return kid, hmac
57
+ elif server == "Google":
58
+ kid, hmac = gen_google_eab(test=False)
59
+ return kid, hmac
60
+ else:
61
+ return None, None
62
+
63
+ def write_file(filename, data):
64
+ try:
65
+ with open(filename, 'wb') as f:
66
+ f.write(data)
67
+ print(filename, " successfully written")
68
+ except Exception as e:
69
+ print("Error writing file: ", filename)
70
+ print(e)