| |
| |
| |
| |
| |
| |
| |
| |
|
|
| |
| |
| |
| import re |
| import json |
| import shutil |
| import sys |
| import time |
| import os |
| from typing import Optional, List, Iterator, Tuple, Dict |
|
|
| import public |
| from projectModel.base import projectBase |
|
|
|
|
| class main(projectBase): |
| PANEL_PATH = public.get_panel_path() |
| _vhost_path = '{}/vhost'.format(PANEL_PATH) |
| _log_name = '项目管理' |
| SETUP_PATH = '/www/server/panel' |
|
|
| def __init__(self): |
| pass |
|
|
| @staticmethod |
| def project_find(project_name) -> Optional[dict]: |
| """ |
| @name 获取指定项目配置 |
| @param project_name<string> 项目名称 |
| """ |
| project_info = public.M('sites').where('project_type=? AND name=?', ('html', project_name)).find() |
| |
| if not isinstance(project_info, dict): |
| return None |
| return project_info |
|
|
| def get_project_list(self, get): |
| ''' |
| @name 获取项目列表 |
| @author hwliang<2021-08-09> |
| @param get<dict_obj>{ |
| project_name: string<项目名称> |
| } |
| @return dict |
| ''' |
|
|
| if not 'p' in get: get.p = 1 |
| if not 'limit' in get: get.limit = 20 |
| if not 'callback' in get: get.callback = '' |
| if not 'order' in get: get.order = 'id desc' |
| type_id = None |
| if hasattr(get,"type_id"): |
| try: |
| type_id = int(get.type_id) |
| except: |
| type_id = None |
|
|
| if 'search' in get: |
| get.project_name = get.search.strip() |
| search = "%{}%".format(get.project_name) |
| if type_id is None: |
| count = public.M('sites').where('project_type=? AND (name LIKE ? OR ps LIKE ?)', |
| ('html', search, search)).count() |
| data = public.get_page(count, int(get.p), int(get.limit), get.callback) |
| data['data'] = public.M('sites').where('project_type=? AND (name LIKE ? OR ps LIKE ?)', |
| ('html', search, search)).limit( |
| data['shift'] + ',' + data['row']).order(get.order).select() |
| else: |
| count = public.M('sites').where('project_type=? AND (name LIKE ? OR ps LIKE ?) AND type_id = ?', |
| ('html', search, search, type_id)).count() |
| data = public.get_page(count, int(get.p), int(get.limit), get.callback) |
| data['data'] = public.M('sites').where('project_type=? AND (name LIKE ? OR ps LIKE ?) AND type_id = ?', |
| ('html', search, search, type_id)).limit( |
| data['shift'] + ',' + data['row']).order(get.order).select() |
| else: |
| if type_id is None: |
| count = public.M('sites').where('project_type=?', 'html').count() |
| data = public.get_page(count, int(get.p), int(get.limit), get.callback) |
| data['data'] = public.M('sites').where('project_type=?', 'html').limit( |
| data['shift'] + ',' + data['row']).order(get.order).select() |
| else: |
| count = public.M('sites').where('project_type=? AND type_id = ?', ('html', type_id)).count() |
| data = public.get_page(count, int(get.p), int(get.limit), get.callback) |
| data['data'] = public.M('sites').where('project_type=? AND type_id = ?', ('html', type_id)).limit( |
| data['shift'] + ',' + data['row']).order(get.order).select() |
|
|
| |
| for val in data['data']: |
| print(val["name"]) |
| val['ssl'] = self.get_site_ssl_info(val["name"]) |
|
|
| return data |
|
|
| def project_get_domain(self, get): |
| ''' |
| @name 获取指定项目的域名列表 |
| @author hwliang<2021-08-09> |
| @param get<dict_obj>{ |
| project_name: string<项目名称> |
| } |
| @return dict |
| ''' |
| project_id = public.M('sites').where('name=?', (get.project_name,)).getField('id') |
| if not project_id: |
| return public.return_data(False, '站点查询失败') |
| domains = public.M('domain').where('pid=?', (project_id,)).order('id desc').select() |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| return domains |
|
|
| def project_remove_domain(self, get): |
| ''' |
| @name 为指定项目删除域名 |
| @author hwliang<2021-08-09> |
| @param get<dict_obj>{ |
| project_name: string<项目名称> |
| domain: string<域名> |
| } |
| @return dict |
| ''' |
| project_find = self.project_find(get.project_name) |
| if not project_find: |
| return public.return_error('指定项目不存在') |
| project_name = project_find["name"] |
| domain_arr = get.domain.rsplit(':', 1) |
| if len(domain_arr) == 1: |
| domain_arr.append("80") |
|
|
| project_id = project_find["id"] |
| if public.M('domain').where("pid=?", (project_id,)).count() <= 1: |
| return public.return_error('项目至少需要一个域名') |
| domain_id = public.M('domain').where('name=? AND pid=? AND port= ?', |
| (domain_arr[0], project_id, domain_arr[1])).getField('id') |
| if not domain_id: |
| return public.return_error('指定域名不存在') |
| public.M('domain').where('id=?', (domain_id,)).delete() |
|
|
| public.WriteLog(self._log_name, '从项目:{},删除域名{}'.format(get.project_name, get.domain)) |
|
|
| |
| self._nginx_set_domain(project_id, project_name, prefix="html_") |
| self._apache_set_domain(project_id, project_name, project_find["path"]) |
|
|
| return public.returnMsg(True, '删除域名成功') |
|
|
| def _test_domain_list(self, domain_list: List[str]) -> Tuple[List[Tuple[str, str]], List[Dict]]: |
| res, error = [], [] |
| for i in domain_list: |
| if not i.strip(): |
| continue |
| d_list = [i.strip() for i in i.rsplit(":", 1)] |
| if len(d_list) > 1: |
| try: |
| p = int(d_list[1]) |
| if not (1 < p < 65535): |
| error.append({ |
| "domain": i, |
| "msg": "端口范围错误" |
| }) |
| continue |
| else: |
| d_list[1] = str(p) |
| except: |
| error.append({ |
| "domain": i, |
| "msg": "端口范围错误" |
| }) |
| continue |
| else: |
| d_list.append("80") |
| d, p = d_list |
| d = self.check_domain(d) |
| if isinstance(d, str): |
| res.append((d, p)), |
| continue |
| error.append({ |
| "domain": i, |
| "msg": "域名格式错误" |
| }) |
|
|
| res = list(set(res)) |
| return res, error |
|
|
| def project_add_domain(self, get): |
| ''' |
| @name 为指定项目添加域名 |
| @author hwliang<2021-08-09> |
| @param get<dict_obj>{ |
| project_name: string<项目名称> |
| domains: list<域名列表> |
| } |
| @return dict |
| ''' |
| try: |
| if isinstance(get.domains, str): |
| domains = json.loads(get.domains) |
| else: |
| domains = get.domains |
| except json.JSONDecodeError: |
| return public.return_error('参数错误') |
| if not isinstance(domains, list): |
| return public.return_error('参数错误') |
| project_find = self.project_find(get.project_name) |
| if not project_find: |
| return public.return_error('指定项目不存在') |
| project_id = project_find['id'] |
| project_name = project_find["name"] |
| res_domains = [] |
| check_cloud = False |
| for domain in domains: |
| domain = domain.strip() |
| if not domain: |
| continue |
| if "[" in domain and "]" in domain: |
| if "]:" in domain: |
| domain_arr = domain.rsplit(":", 1) |
| else: |
| domain_arr = [domain] |
| else: |
| domain_arr = domain.split(':') |
| domain_arr[0] = self.check_domain(domain_arr[0]) |
| if domain_arr[0] is False: |
| res_domains.append({"name": domain, "status": False, "msg": '域名格式错误'}) |
| continue |
| if len(domain_arr) == 1: |
| domain_arr.append("") |
| if domain_arr[1] == "": |
| domain_arr[1] = 80 |
| domain += ':80' |
| try: |
| if not (0 < int(domain_arr[1]) < 65535): |
| res_domains.append({"name": domain, "status": False, "msg": '域名格式错误'}) |
| continue |
| except ValueError: |
| res_domains.append({"name": domain, "status": False, "msg": '域名格式错误'}) |
| continue |
| if not public.M('domain').where('name=? and port=?', (domain_arr[0], domain_arr[1])).count(): |
| public.M('domain').add('name,pid,port,addtime', |
| (domain_arr[0], project_id, domain_arr[1], public.getDate())) |
| public.WriteLog(self._log_name, '成功添加域名{}到项目{}'.format(domain, get.project_name)) |
| res_domains.append({"name": domain_arr[0], "status": True, "msg": '添加成功'}) |
| if not check_cloud: |
| check_cloud = True |
| public.check_domain_cloud(domain_arr[0]) |
| else: |
| public.WriteLog(self._log_name, '添加域名错误,域名{}已存在'.format(domain)) |
| res_domains.append( |
| {"name": domain_arr[0], "status": False, "msg": '添加失败,域名{}已存在'.format(domain)}) |
|
|
| |
| self._nginx_set_domain(project_id, project_name, prefix="html_") |
| self._apache_set_domain(project_id, project_name, project_find["path"]) |
|
|
| return self._check_add_domain(get.project_name, res_domains) |
|
|
| def _apache_set_domain(self, project_id, project_name, site_path): |
| file = '{}/apache/html_{}.conf'.format(self._vhost_path, project_name) |
| conf: str = public.readFile(file) |
| if not isinstance(conf, str): |
| return False |
|
|
| all_domains_data = public.M('domain').where('pid=?', (project_id,)).select() |
| if not isinstance(all_domains_data, list): |
| return False |
| domains, ports = set(), set() |
| for i in all_domains_data: |
| domains.add(str(i["name"])) |
| ports.add(str(i["port"])) |
|
|
| domains_str = " ".join(domains) |
|
|
| |
| rep_server_name = re.compile(r"\s*ServerAlias\s*(.*)\n", re.M) |
| new_conf = rep_server_name.sub("\n ServerAlias {}\n".format(domains_str), conf) |
|
|
| rep_ports = re.compile(r"<VirtualHost +.*:(?P<port>\d+)+\s*>") |
| need_remove_port = [] |
| for tmp in rep_ports.finditer(new_conf): |
| if tmp.group("port") in ports: |
| ports.remove(tmp.group("port")) |
| elif tmp.group("port") != "443": |
| need_remove_port.append(tmp.group("port")) |
|
|
| if need_remove_port: |
| for i in need_remove_port: |
| tmp_rep = re.compile(r"<VirtualHost.*" + i + r"(.|\n)*?</VirtualHost[^\n]*\n?") |
| new_conf = tmp_rep.sub("", new_conf, 1) |
|
|
| if ports: |
| |
| try: |
| httpdVersion = public.readFile('/www/server/apache/version.pl').strip() |
| except: |
| httpdVersion = "" |
| if httpdVersion == '2.2': |
| apaOpt = "Order allow,deny\n\t\tAllow from all" |
| else: |
| apaOpt = 'Require all granted' |
|
|
| template_file = "{}/template/apache/html_http.conf".format(self._vhost_path) |
| config_body = public.readFile(template_file) |
| other_config_body_list = [] |
| for p in ports: |
| other_config_body_list.append(config_body.format( |
| port=p, |
| server_admin="admin@{}".format(project_name), |
| site_path=site_path, |
| server_name='{}.{}'.format(p, project_name), |
| domains=domains_str, |
| log_path=self._get_sites_log_path(), |
| project_name=project_name, |
| apa_opt=apaOpt, |
| )) |
| new_conf += "\n" + "\n".join(other_config_body_list) |
| from mod.base.web_conf import ap_ext |
| |
| new_conf = ap_ext.remove_extension_from_config(project_name, new_conf) |
| new_conf = ap_ext.set_extension_by_config(project_name, new_conf) |
|
|
| public.writeFile(file, new_conf) |
| |
| self.apache_add_ports(port_list=ports) |
|
|
| return True |
|
|
| @staticmethod |
| def get_project_info(get): |
| ''' |
| @name 获取指定项目信息 |
| @author hwliang<2021-08-09> |
| @param get<dict_obj>{ |
| project_name: string<项目名称> |
| } |
| @return dict |
| ''' |
| project_info = public.M('sites').where('project_type=? AND name=?', ('html', get.project_name)).find() |
| if not project_info: |
| return public.return_error('指定项目不存在!') |
| return project_info |
|
|
| @staticmethod |
| def _format_path_str(path: str): |
| return path.replace('//', '/').rstrip("/") |
|
|
| @staticmethod |
| def _check_site_path(path): |
| other_path = public.M('config').where("id=?", ('1',)).field('sites_path,backup_path').find() |
| |
| if path == other_path['sites_path'] or path == other_path['backup_path']: |
| return False |
| return True |
|
|
| |
| def set_nginx_conf(self, ports: Iterator[str], domains: List[str], site_name, site_path): |
| listen_ipv6 = public.listen_ipv6() |
| listen_ports = '' |
| for p in ports: |
| listen_ports += " listen {};\n".format(p) |
| if listen_ipv6: |
| listen_ports += " listen [::]:{};\n".format(p) |
| listen_ports = listen_ports.strip() |
| config_file = "{}/nginx/html_{}.conf".format(self._vhost_path, site_name) |
| template_file = "{}/template/nginx/html_http.conf".format(self._vhost_path) |
|
|
| config_body = public.readFile(template_file) |
|
|
| conf = config_body.format( |
| listen_ports=listen_ports, |
| site_path=site_path, |
| ssl_start_msg=public.getMsg('NGINX_CONF_MSG1'), |
| err_page_msg=public.getMsg('NGINX_CONF_MSG2'), |
| setup_path="/www/server", |
| rewrite_start_msg=public.getMsg('NGINX_CONF_MSG4'), |
| log_path=self._get_sites_log_path(), |
| site_name=site_name, |
| domains=' '.join(domains) |
| ) |
| from mod.base.web_conf import ng_ext |
| conf = ng_ext.set_extension_by_config(site_name, conf) |
|
|
| |
| if not os.path.exists("{}/nginx/well-known".format(self._vhost_path)): |
| os.makedirs("{}/nginx/well-known".format(self._vhost_path), 0o600) |
| public.writeFile("{}/nginx/well-known/{}.conf".format(self._vhost_path, site_name), "") |
| public.writeFile(config_file, conf) |
|
|
| |
| rewrite_file = "{}/rewrite/html_{}.conf".format(self._vhost_path, site_name) |
| if not os.path.exists(os.path.dirname(rewrite_file)): |
| os.makedirs(os.path.dirname(rewrite_file)) |
| if not os.path.exists(rewrite_file): |
| public.writeFile(rewrite_file, '# 请将伪静态规则或自定义NGINX配置填写到此处\n') |
|
|
| return True |
|
|
| def get_project_find(self, project_name): |
| ''' |
| @name 获取指定项目配置 |
| @author hwliang<2021-08-09> |
| @param project_name<string> 项目名称 |
| @return dict |
| ''' |
| project_info = public.M('sites').where('project_type=? AND name=?', ('html', project_name)).find() |
| if isinstance(project_info, str): |
| raise public.PanelError('数据库查询错误:'+ project_info) |
| if not project_info: return False |
| project_info['project_config'] = json.loads(project_info['project_config']) |
| return project_info |
|
|
| def set_apache_config(self, project_info): |
| self._apache_set_domain(project_info["id"], project_info["name"], project_info["path"]) |
|
|
| def set_apache_conf(self, ports: Iterator[str], domains: List[str], site_name, site_path): |
| self.apache_add_ports(port_list=ports) |
| try: |
| httpdVersion = public.readFile('/www/server/apache/version.pl').strip() |
| except: |
| httpdVersion = "" |
| if httpdVersion == '2.2': |
| apaOpt = "Order allow,deny\n\t\tAllow from all" |
| else: |
| apaOpt = 'Require all granted' |
|
|
| config_file = "{}/apache/html_{}.conf".format(self._vhost_path, site_name) |
| template_file = "{}/template/apache/html_http.conf".format(self._vhost_path) |
|
|
| config_body = public.readFile(template_file) |
| apache_config_body_list = [] |
| domains = " ".join(domains) |
| for p in ports: |
| apache_config_body_list.append(config_body.format( |
| port=p, |
| server_admin="admin@{}".format(site_name), |
| site_path=site_path, |
| server_name='{}.{}'.format(p, site_name), |
| domains=domains, |
| log_path=self._get_sites_log_path(), |
| project_name=site_name, |
| apa_opt=apaOpt, |
| )) |
| apache_config_body = "\n".join(apache_config_body_list) |
| from mod.base.web_conf import ap_ext |
| apache_config_body = ap_ext.set_extension_by_config(site_name, apache_config_body) |
| htaccess = site_path + '/.htaccess' |
| if not os.path.exists(htaccess): |
| public.writeFile(htaccess, ' ') |
| public.ExecShell('chmod -R 755 ' + htaccess) |
| public.ExecShell('chown -R www:www ' + htaccess) |
| public.writeFile(config_file, apache_config_body) |
| return True |
|
|
| def create_project(self, get): |
| res_msg = self._check_webserver() |
| if res_msg: |
| return public.returnMsg(False, res_msg) |
| isError = public.checkWebConfig() |
| if isError is not True: |
| return public.returnMsg(False, 'ERROR: 检测到配置文件有错误,请先排除后再操作<br><br>' |
| '<a style="color:red;">' + isError.replace("\n", '<br>') + '</a>') |
| public.set_module_logs("create_html_project", "create") |
| get.path = self._format_path_str(get.path) |
| if not public.check_site_path(get.path): |
| a, c = public.get_sys_path() |
| return public.returnMsg(False, '请不要将网站根目录设置到以下关键目录中: <br>{}'.format("<br>".join(a + c))) |
| try: |
| siteMenu = json.loads(get.webname.strip()) |
| except json.JSONDecodeError: |
| return public.returnMsg(False, 'webname参数格式不正确,应该是可被解析的JSON字符串') |
| site_name = self.domain_to_puny_code(siteMenu['domain'].strip().rsplit(':', 1)[0]).strip().lower() |
| public.check_domain_cloud(site_name) |
| site_path = get.path.replace(' ', '') |
| site_port = get.port.strip().replace(' ', '') |
|
|
| if site_port == "": |
| site_port = "80" |
| if not public.checkPort(site_port): |
| return public.returnMsg(False, 'SITE_ADD_ERR_PORT') |
|
|
| |
| if not self._check_site_path(site_path): |
| return public.returnMsg(False, 'PATH_ERROR') |
| if not re.match(r"^([\w\-*]{1,100}\.){1,24}([\w\-]{1,24}|[\w\-]{1,24}\.[\w\-]{1,24})$", site_name) and not public.check_ip(site_name.replace('[', '').replace(']', '')): |
| return public.returnMsg(False, 'SITE_ADD_ERR_DOMAIN') |
| if site_name.find('*') != -1: |
| return public.returnMsg(False, 'SITE_ADD_ERR_DOMAIN_TOW') |
| if site_path[-1] == '.': |
| return public.returnMsg(False, '网站目录结尾不可以是 "."') |
|
|
| main_domain = site_name |
| old_pid = public.M('domain').where("name=? and port=?", (main_domain, int(site_port))).getField('pid') |
|
|
| if old_pid: |
| if public.M('sites').where('id=?', (old_pid,)).count(): |
| return public.returnMsg(False, 'SITE_ADD_ERR_DOMAIN_EXISTS') |
| public.M('domain').where('pid=?', (old_pid,)).delete() |
|
|
| if public.M('binding').where('domain=?', (site_name,)).count(): |
| return public.returnMsg(False, 'SITE_ADD_ERR_DOMAIN_EXISTS') |
|
|
| |
| sql = public.M('sites') |
| if sql.where("name=?", (site_name,)).count(): |
| if public.is_ipv4(site_name): |
| site_name = site_name + "_" + str(site_port) |
| else: |
| return public.returnMsg(False, 'SITE_ADD_ERR_EXISTS') |
|
|
| domain_list, error = self._test_domain_list(siteMenu['domainlist']) |
| if error: |
| return public.returnMsg(False, '域名信息解析错误') |
|
|
| all_domains = set((i[0] for i in domain_list)) |
| all_domains.add(site_name) |
| all_ports = set((i[1] for i in domain_list)) |
| all_ports.add(site_port) |
|
|
| |
| if not os.path.exists(site_path): |
| try: |
| os.makedirs(site_path) |
| except Exception as ex: |
| return public.returnMsg(False, '创建根目录失败, %s' % ex) |
| public.ExecShell('chmod -R 755 ' + site_path) |
| public.ExecShell('chown -R www:www ' + site_path) |
|
|
| |
| index = site_path + '/index.html' |
| if not os.path.exists(index): |
| public.writeFile(index, public.readFile('/www/server/panel/data/defaultDoc.html')) |
| public.ExecShell('chmod -R 755 ' + index) |
| public.ExecShell('chown -R www:www ' + index) |
|
|
| |
| doc404 = site_path + '/404.html' |
| if not os.path.exists(doc404): |
| public.writeFile(doc404, public.readFile('/www/server/panel/data/404.html')) |
| public.ExecShell('chmod -R 755 ' + doc404) |
| public.ExecShell('chown -R www:www ' + doc404) |
|
|
|
|
| |
| self.del_user_ini_file(site_path) |
| user_ini_file = site_path + '/.user.ini' |
| if not os.path.exists(user_ini_file): |
| public.writeFile(user_ini_file, 'open_basedir=' + site_path + '/:/tmp/') |
| public.ExecShell('chmod 644 ' + user_ini_file) |
| public.ExecShell('chown root:root ' + user_ini_file) |
| public.ExecShell('chattr +i ' + user_ini_file) |
|
|
| ngx_open_basedir_path = self.SETUP_PATH + '/vhost/open_basedir/nginx' |
| if not os.path.exists(ngx_open_basedir_path): |
| os.makedirs(ngx_open_basedir_path, 384) |
| ngx_open_basedir_file = ngx_open_basedir_path + '/{}.conf'.format(site_name) |
| ngx_open_basedir_body = '''set $bt_safe_dir "open_basedir"; |
| set $bt_safe_open "{}/:/tmp/";'''.format(site_path) |
|
|
| public.writeFile(ngx_open_basedir_file, ngx_open_basedir_body) |
|
|
| |
| result = self.set_nginx_conf(all_ports, all_domains, site_name, site_path) |
| result = self.set_apache_conf(all_ports, all_domains, site_name, site_path) |
|
|
| |
| if not result: |
| return public.returnMsg(False, 'SITE_ADD_ERR_WRITE') |
|
|
| ps = public.xssencode2(get.ps) |
| |
| if site_port != '80': |
| import firewalls |
| get_obj = public.dict_obj() |
| get_obj.port = site_port |
| get_obj.ps = site_name |
| firewalls.firewalls().AddAcceptPort(get_obj) |
|
|
| if not hasattr(get, 'type_id'): |
| get.type_id = 0 |
|
|
| |
| get.pid = sql.table('sites').add('name,path,status,ps,type_id,project_type,addtime', |
| (site_name, site_path, '1', ps, get.type_id, "html", public.getDate())) |
|
|
| sql.table('domain').add('pid,name,port,addtime', (get.pid, main_domain, site_port, public.getDate())) |
| for d, p in domain_list: |
| sql.table('domain').add('pid,name,port,addtime', (get.pid, d, p, public.getDate())) |
|
|
| data = {"siteStatus": True, "siteId": get.pid, 'ftpStatus': False} |
|
|
| |
| if get.get("ftp", "false") == 'true': |
| import ftp |
| result = ftp.ftp().AddUser(get) |
| if result['status']: |
| data['ftpStatus'] = True |
| data['ftpUser'] = get.ftp_username |
| data['ftpPass'] = get.ftp_password |
|
|
| public.serviceReload() |
| public.WriteLog('TYPE_SITE', 'SITE_ADD_SUCCESS', (site_name,)) |
| return data |
|
|
| def clear_config(self, project_name): |
| self.clear_nginx_config(project_name) |
| self.clear_apache_config(project_name) |
| public.serviceReload() |
| return True |
|
|
| def clear_nginx_config(self, project_name): |
| config_file = "{}/nginx/html_{}.conf".format(self._vhost_path, project_name) |
| if os.path.exists(config_file): |
| os.remove(config_file) |
| rewrite_file = "{}/rewrite/html_{}.conf".format(self._vhost_path, project_name) |
| if os.path.exists(rewrite_file): |
| os.remove(rewrite_file) |
| return True |
|
|
| def clear_apache_config(self, project_name): |
| config_file = "{}/apache/html_{}.conf".format(self._vhost_path, project_name) |
| if os.path.exists(config_file): |
| os.remove(config_file) |
| return True |
|
|
| |
| def remove_project(self, get): |
| project_find = self.project_find(get.project_name) |
|
|
| if not project_find: |
| return public.return_error('指定项目不存在: {}'.format(get.project_name)) |
|
|
| self.clear_config(get.project_name) |
| |
| from mod.base.web_conf import remove_sites_service_config |
| remove_sites_service_config(get.project_name, "html_") |
|
|
| public.M('domain').where('pid=?', (project_find['id'],)).delete() |
| public.M('sites').where('name=?', (get.project_name,)).delete() |
| public.WriteLog(self._log_name, '删除其他项目{}'.format(get.project_name)) |
| return public.return_data(True, '删除项目成功') |
|
|
| @staticmethod |
| def _check_add_domain(site_name, domains): |
| from panelSite import panelSite |
| ssl_data = panelSite().GetSSL(type("get", tuple(), {"siteName": site_name})()) |
| res = {"domains": domains} |
| if not ssl_data["status"]: |
| return res |
| domain_rep = [] |
| for i in ssl_data["cert_data"]["dns"]: |
| if i.startswith("*"): |
| _rep = r"^[^\.]+\." + i[2:].replace(".", r"\.") |
| else: |
| _rep = "^" + i.replace(".", r"\.") |
| domain_rep.append(_rep) |
| no_ssl = [] |
| for domain in domains: |
| if not domain["status"]: |
| continue |
| for _rep in domain_rep: |
| if re.search(_rep, domain["name"]): |
| break |
| else: |
| no_ssl.append(domain["name"]) |
| if not no_ssl: |
| return res |
| res["not_ssl"] = no_ssl |
| res["tip"] = "本站点已启用SSL证书,但本次添加的域名:{},无法匹配当前证书,如有需求,请重新申请证书。".format( |
| str(no_ssl)) |
| return res |
|
|
| def project_dir_info(self, get): |
| try: |
| project_name = get.project_name |
| except AttributeError: |
| return public.returnMsg(False, "参数错误") |
| project_info = self.project_find(project_name) |
| if not project_info: |
| return public.returnMsg(False, "未查询到网站") |
| data = { |
| 'userini': False, |
| 'path': project_info["path"], |
| } |
| run_path = self._get_site_run_path(project_name, project_info["path"]) |
| user_ini_file = run_path + '/.user.ini' |
| user_ini_conf = public.readFile(user_ini_file) |
| if user_ini_conf and "open_basedir" in user_ini_conf: |
| data['userini'] = True |
| data['run_path'] = run_path.replace(project_info["path"], "") |
|
|
| return data |
|
|
| def set_dir_user_ini(self, get): |
| try: |
| project_name = get.project_name |
| except AttributeError: |
| return public.returnMsg(False, "参数错误") |
| project_info = self.project_find(project_name) |
| if not project_info: |
| return public.returnMsg(False, "未查询到网站") |
| site_path = project_info["path"] |
| run_path = self._get_site_run_path(project_name, project_info["path"]) |
| filename = run_path + '/.user.ini' |
| conf = public.readFile(filename) |
| try: |
| if os.path.exists(filename): |
| public.ExecShell("chattr -i " + filename) |
| if conf and "open_basedir" in conf: |
| rep = "\n*open_basedir.*" |
| conf = re.sub(rep, "", conf) |
| if not conf: |
| os.remove(filename) |
| else: |
| public.writeFile(filename, conf) |
| if os.path.exists(filename): |
| public.ExecShell("chattr +i " + filename) |
|
|
| return public.returnMsg(True, 'SITE_BASEDIR_CLOSE_SUCCESS') |
|
|
| if conf and "session.save_path" in conf: |
| s_path = re.search(r'session.save_path\s*=\s*(.*)', conf).groups(1)[0] |
| public.writeFile(filename, conf + '\nopen_basedir={}/:/tmp/:{}'.format(site_path, s_path)) |
| else: |
| public.writeFile(filename, 'open_basedir={}/:/tmp/'.format(site_path)) |
| if os.path.exists(filename): |
| public.ExecShell("chattr +i " + filename) |
| if not os.path.exists(filename): |
| return public.returnMsg(False, '.user.ini文件不存在,设置防跨站失败,请关闭防篡改或其他安全软件后再试!') |
|
|
| public.serviceReload() |
| return public.returnMsg(True, 'SITE_BASEDIR_OPEN_SUCCESS') |
| except Exception as e: |
| if os.path.exists(filename): |
| public.ExecShell("chattr +i " + filename) |
| if "Operation not permitted" in str(e): |
| return public.returnMsg(False, "{}\t权限拒绝,设置防跨站失败,请关闭防篡改或其他安全软件后再试!".format(str(e))) |
| return public.returnMsg(False, str(e)) |
|
|
| def change_site_path(self, get): |
| try: |
| project_name = get.project_name |
| path = get.path |
| except AttributeError: |
| return public.returnMsg(False, "参数错误") |
| project_info = self.project_find(project_name) |
| if not project_info: |
| return public.returnMsg(False, "未查询到网站") |
| if path == "": |
| return public.returnMsg(False, "DIR_EMPTY") |
|
|
| if not self._check_site_path(path): |
| return public.returnMsg(False, "PATH_ERROR") |
| if not public.check_site_path(path): |
| a, c = public.get_sys_path() |
| return public.returnMsg(False, '请不要将网站根目录设置到以下关键目录中: <br>{}'.format("<br>".join(a + c))) |
|
|
| if not os.path.exists(path): |
| return public.returnMsg(False, '指定的网站根目录不存在,无法设置,请检查输入信息.') |
|
|
| if project_info["path"] == path: |
| return public.returnMsg(False, "SITE_PATH_ERR_RE") |
|
|
| nginx_file = '{}/nginx/html_{}.conf'.format(self._vhost_path, project_name) |
| nginx_conf = public.readFile(nginx_file) |
| if nginx_conf: |
| new_conf = nginx_conf.replace(project_info["path"], path) |
| public.writeFile(nginx_file, new_conf) |
|
|
| apache_file = '{}/apache/html_{}.conf'.format(self._vhost_path, project_name) |
| apache_conf = public.readFile(apache_file) |
| if apache_conf: |
| rep_doc = re.compile(r"DocumentRoot\s+.*\n") |
| new_conf = rep_doc.sub('DocumentRoot "' + path + '"\n', apache_conf) |
|
|
| rep_dir = re.compile(r"<Directory\s+.*\n") |
| new_conf = rep_dir.sub('<Directory "' + path + "\">\n", new_conf) |
| public.writeFile(apache_file, new_conf) |
|
|
| public.M("sites").where("id=?", (project_info["id"],)).setField('path', path) |
| public.WriteLog('TYPE_SITE', 'SITE_PATH_SUCCESS', (project_name,)) |
| self._check_run_path_exists(project_info) |
|
|
| |
| userIni = path + '/.user.ini' |
| if os.path.exists(userIni): |
| public.ExecShell("chattr -i " + userIni) |
| public.writeFile(userIni, 'open_basedir=' + path + '/:/tmp/') |
| public.ExecShell('chmod 644 ' + userIni) |
| public.ExecShell('chown root:root ' + userIni) |
| public.ExecShell('chattr +i ' + userIni) |
| public.serviceReload() |
|
|
| return public.returnMsg(True, "SET_SUCCESS") |
|
|
| def _check_run_path_exists(self, project_info): |
| run_path = self._get_site_run_path(project_info["name"], project_info["path"]) |
| if os.path.exists(run_path): |
| return True |
| self.set_site_run_path(None, run_path="/", project_info=project_info) |
| public.WriteLog('TYPE_SITE', '因修改网站[{}]根目录,检测到原指定的运行目录[.{}]不存在,已自动将运行目录切换为[./]'.format(site_info['name'], run_path)) |
|
|
| def set_site_run_path(self, get, run_path=None, project_info=None): |
| if not run_path or project_info: |
| try: |
| project_name = get.project_name |
| run_path = get.path |
| except AttributeError: |
| return public.returnMsg(False, "参数错误") |
| project_info = self.project_find(project_name) |
| if not project_info: |
| return public.returnMsg(False, "未查询到网站") |
| project_name = project_info["name"] |
| old_run_path = self._get_site_run_path(project_info["name"], project_info["path"]) |
| site_path = project_info["path"] |
| |
| filename = '{}/nginx/html_{}.conf'.format(self._vhost_path, project_name) |
| nginx_conf = public.readFile(filename) |
| if nginx_conf: |
| tmp = re.search(r'\s*root\s+(.+);', nginx_conf) |
| if tmp: |
| o_path = tmp.groups()[0] |
| new_conf = nginx_conf.replace(o_path, site_path + run_path) |
| public.writeFile(filename, new_conf) |
|
|
| |
| filename = '{}/apache/html_{}.conf'.format(self._vhost_path, project_name) |
| ap_conf = public.readFile(filename) |
| if ap_conf: |
| tmp = re.search(r'\s*DocumentRoot\s*"(.+)"\s*\n', ap_conf) |
| if tmp: |
| o_path = tmp.groups()[0] |
| new_conf = ap_conf.replace(o_path, site_path + run_path) |
| public.writeFile(filename, new_conf) |
|
|
| s_path = old_run_path + "/.user.ini" |
| d_path = site_path + run_path + "/.user.ini" |
| if s_path != d_path: |
| public.ExecShell("chattr -i {}".format(s_path)) |
| public.ExecShell("mv {} {}".format(s_path, d_path)) |
| public.ExecShell("chattr +i {}".format(d_path)) |
|
|
| public.serviceReload() |
| return public.returnMsg(True, 'SET_SUCCESS') |
|
|
| def _get_site_run_path(self, site_name: str, site_path: str) -> str: |
| run_path = site_path |
| webserver = public.get_webserver() |
| filename = "{}/{}/html_{}.conf".format(self._vhost_path, webserver, site_name) |
| if not os.path.exists(filename): |
| return run_path |
| conf = public.readFile(filename) |
| if webserver == 'nginx': |
| tmp1 = re.search(r'\s*root\s+(?P<path>.+);', conf) |
| if tmp1: |
| run_path = tmp1.group("path") |
| elif webserver == 'apache': |
| tmp1 = re.search(r'\s*DocumentRoot\s*"(?P<path>.+)"\s*\n', conf) |
| if tmp1: |
| run_path = tmp1.group("path") |
| else: |
| tmp1 = re.search(r"vhRoot\s*(?P<path>.*)", conf) |
| if tmp1: |
| run_path = tmp1.group("path") |
|
|
| return run_path |
|
|
| def get_site_ssl_info(self, siteName): |
| try: |
| s_file = 'vhost/nginx/html_{}.conf'.format(siteName) |
| is_apache = False |
| if not os.path.exists(s_file): |
| s_file = 'vhost/apache/html_{}.conf'.format(siteName) |
| is_apache = True |
|
|
| if not os.path.exists(s_file): |
| return -1 |
|
|
| s_conf = public.readFile(s_file) |
| if not s_conf: return -1 |
| if is_apache: |
| if s_conf.find('SSLCertificateFile') == -1: |
| return -1 |
| s_tmp = re.findall(r"SSLCertificateFile\s+(.+\.pem)", s_conf) |
| if not s_tmp: return -1 |
| ssl_file = s_tmp[0] |
| else: |
| if s_conf.find('ssl_certificate') == -1: |
| return -1 |
| s_tmp = re.findall(r"ssl_certificate\s+(.+\.pem);", s_conf) |
| if not s_tmp: return -1 |
| ssl_file = s_tmp[0] |
| ssl_info = self.get_cert_end(ssl_file) |
| if not ssl_info: return -1 |
| ssl_info['endtime'] = int( |
| int(time.mktime(time.strptime(ssl_info['notAfter'], "%Y-%m-%d")) - time.time()) / 86400) |
| return ssl_info |
| except: |
| import traceback |
| print(traceback.format_exc()) |
| return -1 |
|
|
| def get_cert_end(self, pem_file): |
| if "/www/server/panel/class" not in sys.path: |
| sys.path.insert(0, "/www/server/panel/class") |
| import ssl_info |
| return ssl_info.ssl_info().load_ssl_info(pem_file) |
| |
| |
| |
|
|
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
|
|
| |
|
|