fe / bt-source /panel /tools.py
GGSheng's picture
feat: deploy Gemma 4 to hf space
3a5cf48 verified
# coding: utf-8
# +-------------------------------------------------------------------
# | 宝塔Linux面板
# +-------------------------------------------------------------------
# | Copyright (c) 2015-2099 宝塔软件(http://bt.cn) All rights reserved.
# +-------------------------------------------------------------------
# | Author: hwliang <hwl@bt.cn>
# +-------------------------------------------------------------------
# ------------------------------
# 工具箱
# ------------------------------
import sys
import os
import re
import json
panelPath = '/www/server/panel/'
os.chdir(panelPath)
sys.path.insert(0, panelPath + "class/")
import public, time, json
if sys.version_info[0] == 3: raw_input = input
def check_db(): # 检查数据库
pass
# data_func = {
# "users": check_users_tb,
# "config": check_config_tb,
# }
# for tb_name, check_func in data_func.items():
# sqlite_obj = public.M(tb_name)
# check_func(sqlite_obj)
# 用户表检查
def check_users_tb(sqlite_obj):
pass
# user = sqlite_obj.where("id=?", (1,)).find()
# if not isinstance(user, (dict, list)):
# print("users err:{}".format(user))
# return
# username = public.GetRandomString(8).lower()
# password = public.GetRandomString(8).lower()
# if not user: # 表数据为空
# sqlite_obj.add("id,username,password", (1, username, password))
# print("检测到默认用户丢失,正在修复...")
# print("|-新用户名(username): {}".format(username))
# print("|-新密码(password): {}".format(password))
# return
# # 表数据库缺失
# if not user.get("username"):
# print("检测到[用户名]为空,正在修复...")
# sqlite_obj.where("id=?", (1,)).setField("username", username)
# print("|-新用户名(username): {}".format(username))
# if not user.get("password"):
# print("检测到[用户密码]为空,正在修复...")
# sqlite_obj.where("id=?", (1,)).setField('password', public.password_salt(public.md5(password), uid=1))
# print("|-新密码(password): {}".format(password))
# 配置表检查
def check_config_tb(sqlite_obj):
config = sqlite_obj.where("id=?", (1,)).find()
if not isinstance(config, (dict, list)):
print("config err:{}".format(config))
return
webserver = "nginx"
backup_path = "/www/backup"
sites_path = "/www/wwwroot"
status = 1
mysql_root = public.GetRandomString(8).lower()
if not config: # 表数据为空
sqlite_obj.add("id,webserver,backup_path,sites_path,status,mysql_root", (1, webserver, backup_path, sites_path, status, mysql_root))
print("检测到面板默认配置丢失,正在修复...")
print("|-默认运行服务: {}".format(webserver))
print("|-默认备份路径: {}".format(backup_path))
print("|-默认网站路径: {}".format(sites_path))
print("|-默认Mysql密码: {}".format(mysql_root))
return
# 表数据库缺失
if not config.get("webserver"):
print("检测到[默认运行服务]为空,正在修复...")
sqlite_obj.where("id=?", (1,)).setField("webserver", webserver)
print("|-默认运行服务: {}".format(webserver))
if not config.get("backup_path"):
print("检测到[默认备份路径]为空,正在修复...")
sqlite_obj.where("id=?", (1,)).setField("backup_path", backup_path)
print("|-默认备份路径: {}".format(backup_path))
if not config.get("sites_path"):
print("检测到[默认网站路径]为空,正在修复...")
sqlite_obj.where("id=?", (1,)).setField("sites_path", sites_path)
print("|-默认网站路径: {}".format(sites_path))
if not config.get("mysql_root"):
print("检测到[默认Mysql密码]为空,正在修复...")
set_mysql_root(mysql_root)
# sqlite_obj.where("id=?", (1,)).setField("mysql_root", mysql_root)
# print("|-默认Mysql密码: {}".format(len(mysql_root) * "*"))
# 设置MySQL密码
def set_mysql_root(password):
import db, os
sql = db.Sql()
root_mysql = '''#!/bin/bash
PATH=/bin:/sbin:/usr/bin:/usr/sbin:/usr/local/bin:/usr/local/sbin:~/bin
export PATH
pwd=$1
/etc/init.d/mysqld stop
mysqld_safe --skip-grant-tables&
echo '正在修改密码...';
echo 'The set password...';
sleep 6
m_version=$(cat /www/server/mysql/version.pl)
if echo "$m_version" | grep -E "(5\.1\.|5\.5\.|5\.6\.|10\.0\.|10\.1\.)" >/dev/null; then
mysql -uroot -e "UPDATE mysql.user SET password=PASSWORD('${pwd}') WHERE user='root';"
elif echo "$m_version" | grep -E "(10\.4\.|10\.5\.|10\.6\.|10\.7\.|10\.11\.|11\.3\.|11\.4\.)" >/dev/null; then
mysql -uroot -e "
FLUSH PRIVILEGES;
ALTER USER 'root'@'localhost' IDENTIFIED BY '${pwd}';
ALTER USER 'root'@'127.0.0.1' IDENTIFIED BY '${pwd}';
FLUSH PRIVILEGES;
"
elif echo "$m_version" | grep -E "(5\.7\.|8\.[0-9]+\..*|9\.[0-9]+\..*)" >/dev/null; then
mysql -uroot -e "
FLUSH PRIVILEGES;
update mysql.user set authentication_string='' where user='root' and (host='127.0.0.1' or host='localhost');
ALTER USER 'root'@'localhost' IDENTIFIED BY '${pwd}';
ALTER USER 'root'@'127.0.0.1' IDENTIFIED BY '${pwd}';
FLUSH PRIVILEGES;
"
else
mysql -uroot -e "UPDATE mysql.user SET authentication_string=PASSWORD('${pwd}') WHERE user='root';"
fi
mysql -uroot -e "FLUSH PRIVILEGES";
pkill -9 mysqld_safe
pkill -9 mysqld
sleep 2
/etc/init.d/mysqld start
echo '==========================================='
echo "root密码成功修改为: ${pwd}"
echo "The root password set ${pwd} successuful"'''
public.writeFile('mysql_root.sh', root_mysql)
os.system("/bin/bash mysql_root.sh " + password)
os.system("rm -f mysql_root.sh")
result = public.M('config').where('id=?', (1,)).setField('mysql_root', password)
print(result)
# 设置面板密码
def set_panel_pwd(password, ncli=False):
password = password.strip()
if not len(password) > 5:
print("|-错误,密码长度必须大于5位")
return
import db
sql = db.Sql()
result = public.M('users').where('id=?', (1,)).setField('password', public.password_salt(public.md5(password), uid=1))
username = public.M('users').where('id=?', (1,)).getField('username')
if ncli:
print("|-用户名: " + username)
print("|-新密码: " + password)
else:
print(username)
# 设置数据库目录
def set_mysql_dir(path):
mysql_dir = '''#!/bin/bash
PATH=/bin:/sbin:/usr/bin:/usr/sbin:/usr/local/bin:/usr/local/sbin:~/bin
export PATH
oldDir=`cat /etc/my.cnf |grep 'datadir'|awk '{print $3}'`
newDir=$1
mkdir $newDir
if [ ! -d "${newDir}" ];then
echo 'The specified storage path does not exist!'
exit
fi
echo "Stopping MySQL service..."
/etc/init.d/mysqld stop
echo "Copying files, please wait..."
\cp -r -a $oldDir/* $newDir
chown -R mysql.mysql $newDir
sed -i "s#$oldDir#$newDir#" /etc/my.cnf
echo "Starting MySQL service..."
/etc/init.d/mysqld start
echo ''
echo 'Successful'
echo '---------------------------------------------------------------------'
echo "Has changed the MySQL storage directory to: $newDir"
echo '---------------------------------------------------------------------'
'''
public.writeFile('mysql_dir.sh', mysql_dir)
os.system("/bin/bash mysql_dir.sh " + path)
os.system("rm -f mysql_dir.sh")
# 封装
def PackagePanel():
print('========================================================')
print('|-正在清理日志信息...'),
public.M('logs').where('id!=?', (0,)).delete()
print('\t\t\033[1;32m[done]\033[0m')
print('|-正在清理任务历史...'),
public.M('tasks').where('id!=?', (0,)).delete()
print('\t\t\033[1;32m[done]\033[0m')
print('|-正在清理网络监控记录...'),
public.M('network').dbfile('system').where('id!=?', (0,)).delete()
print('\t\033[1;32m[done]\033[0m')
print('|-正在清理CPU监控记录...'),
public.M('cpuio').dbfile('system').where('id!=?', (0,)).delete()
print('\t\033[1;32m[done]\033[0m')
print('|-正在清理磁盘监控记录...'),
public.M('diskio').dbfile('system').where('id!=?', (0,)).delete()
print('\t\033[1;32m[done]\033[0m')
print('|-正在清理IP信息...'),
os.system('rm -f /www/server/panel/data/iplist.txt')
os.system('rm -f /www/server/panel/data/address.pl')
os.system('rm -f /www/server/panel/data/*.login')
os.system('rm -f /www/server/panel/data/domain.conf')
os.system('rm -f /www/server/panel/data/user*')
os.system('rm -f /www/server/panel/data/admin_path.pl')
os.system('rm -rf /www/backup/panel/*')
os.system('rm -f /root/.ssh/*')
print('\t\033[1;32m[done]\033[0m')
print('|-正在清理系统使用痕迹...'),
command = '''cat /dev/null > /var/log/boot.log
cat /dev/null > /var/log/btmp
cat /dev/null > /var/log/cron
cat /dev/null > /var/log/dmesg
cat /dev/null > /var/log/firewalld
cat /dev/null > /var/log/grubby
cat /dev/null > /var/log/lastlog
cat /dev/null > /var/log/mail.info
cat /dev/null > /var/log/maillog
cat /dev/null > /var/log/messages
cat /dev/null > /var/log/secure
cat /dev/null > /var/log/spooler
cat /dev/null > /var/log/syslog
cat /dev/null > /var/log/tallylog
cat /dev/null > /var/log/wpa_supplicant.log
cat /dev/null > /var/log/wtmp
cat /dev/null > /var/log/yum.log
history -c
'''
os.system(command)
print('\t\033[1;32m[done]\033[0m')
if sys.version_info[0] == 3:
a_input = input('|-是否在首次开机自动按机器配置优化PHP/MySQL配置?(y/n default: y): ')
else:
a_input = raw_input('|-是否在首次开机自动按机器配置优化PHP/MySQL配置?(y/n default: y): ')
if not a_input: a_input = 'y'
print(a_input)
if not a_input in ['Y', 'y', 'yes', 'YES']:
public.ExecShell("rm -f /www/server/panel/php_mysql_auto.pl")
else:
public.writeFile('/www/server/panel/php_mysql_auto.pl', "True")
print("|-请选择idc品牌信息展示设置:")
print("=" * 50)
print(" (1) 显示默认宝塔Linux面板信息")
print(" (2) 显示IDC定制版面板信息")
print("=" * 50)
i_input = input("请选择显示的面板信息(default: 1): ")
if i_input in [2, '2']:
print("2 显示IDC定制版面板信息")
print("=" * 50)
else:
print("1 显示默认宝塔Linux面板信息")
print("=" * 50)
panelPath = '/www/server/panel'
pFile = panelPath + '/config/config.json'
pInfo = json.loads(public.readFile(pFile))
pInfo['title'] = u'宝塔Linux面板'
pInfo['brand'] = u'宝塔'
pInfo['product'] = u'Linux面板'
public.writeFile(pFile, json.dumps(pInfo))
tFile = panelPath + '/data/title.pl'
if os.path.exists(tFile):
os.remove(tFile)
print("|-请选择用户初始化方式:")
print("=" * 50)
print(" (1) 访问面板页面时显示初始化页面")
print(" (2) 首次启动时自动随机生成新帐号密码")
print(" (3) 首次启动时自动随机生成新帐号密码和安全路径")
print("=" * 50)
p_input = input("请选择初始化方式(default: 1): ")
print(p_input)
if p_input in [2, '2']:
public.writeFile('/www/server/panel/aliyun.pl', "True")
s_file = '/www/server/panel/install.pl'
if os.path.exists(s_file): os.remove(s_file)
public.M('config').where("id=?", ('1',)).setField('status', 1)
elif p_input in [3, '3']:
public.writeFile('/www/server/panel/aliyun.pl', "True")
public.writeFile('/www/server/panel/random_path.pl', "True")
s_file = '/www/server/panel/install.pl'
if os.path.exists(s_file): os.remove(s_file)
public.M('config').where("id=?", ('1',)).setField('status', 1)
else:
public.writeFile('/www/server/panel/install.pl', "True")
public.M('config').where("id=?", ('1',)).setField('status', 0)
port = public.readFile('data/port.pl').strip()
print('========================================================')
print('\033[1;32m|-面板封装成功,请不要再登陆面板做任何其它操作!\033[0m')
if p_input not in [2, '2', 3, '3']:
print('\033[1;41m|-面板初始化地址: http://{SERVERIP}:' + port + '/install\033[0m')
else:
print('\033[1;41m|-获取初始帐号密码命令:bt default \033[0m')
print('\033[1;41m|-注意:仅在首次登录面板前能正确获取初始帐号密码 \033[0m')
# 清空正在执行的任务
def CloseTask():
ncount = public.M('tasks').where('status!=?', (1,)).delete()
os.system("kill `ps -ef |grep 'python panelSafe.pyc'|grep -v grep|grep -v panelExec|awk '{print $2}'`")
os.system("kill `ps -ef |grep 'install_soft.sh'|grep -v grep|grep -v panelExec|awk '{print $2}'`")
os.system('/etc/init.d/bt restart')
print("成功清理 " + int(ncount) + " 个任务!")
def get_ipaddress():
'''
@name 获取本机IP地址
@author hwliang<2020-11-24>
@return list
'''
ipa_tmp = public.ExecShell("ip a |grep inet|grep -v inet6|grep -v 127.0.0.1|awk '{print $2}'|sed 's#/[0-9]*##g'")[
0].strip()
iplist = ipa_tmp.split('\n')
return iplist
def get_host_all():
local_ip = ['127.0.0.1', '::1', 'localhost']
ip_list = []
bind_ip = get_ipaddress()
for ip in bind_ip:
ip = ip.strip()
if ip in local_ip: continue
if ip in ip_list: continue
ip_list.append(ip)
net_ip = public.httpGet("https://api.bt.cn/api/getipaddress")
if net_ip:
net_ip = net_ip.strip()
if not net_ip in ip_list:
ip_list.append(net_ip)
if len(ip_list) > 1:
ip_list = [ip_list[-1], ip_list[0]]
print(ip_list)
return ip_list
# 自签证书
def CreateSSL():
import base64
userInfo = public.get_user_info()
if not userInfo:
userInfo['uid'] = 0
userInfo['access_key'] = 'B' * 32
domains = get_host_all()
pdata = {
"action": "get_domain_cert",
"company": "宝塔面板",
"domain": ','.join(domains),
"uid": userInfo['uid'],
"access_key": userInfo['access_key'],
"panel": 1
}
cert_api = 'https://api.bt.cn/bt_cert'
res = public.httpPost(cert_api, {'data': json.dumps(pdata)})
try:
result = json.loads(res)
if 'status' in result:
if result['status']:
public.writeFile('ssl/certificate.pem', result['cert'])
public.writeFile('ssl/privateKey.pem', result['key'])
public.writeFile('ssl/baota_root.pfx', base64.b64decode(result['pfx']), 'wb+')
public.writeFile('ssl/root_password.pl', result['password'])
public.writeFile('data/ssl.pl', 'True')
public.ExecShell("/etc/init.d/bt reload")
print('1')
return True
except:
print('error:{}'.format(res))
print('0')
return False
# 创建文件
def CreateFiles(path, num):
if not os.path.exists(path): os.system('mkdir -p ' + path)
import time;
for i in range(num):
filename = path + '/' + str(time.time()) + '__' + str(i)
open(path, 'w+').close()
# 计算文件数量
def GetFilesCount(path):
i = 0
for name in os.listdir(path): i += 1
return i
# 清理系统垃圾
def ClearSystem():
count = total = 0
tmp_total, tmp_count = ClearMail()
count += tmp_count
total += tmp_total
print('=======================================================================')
tmp_total, tmp_count = ClearSession()
count += tmp_count
total += tmp_total
print('=======================================================================')
tmp_total, tmp_count = ClearOther()
count += tmp_count
total += tmp_total
print('=======================================================================')
print('\033[1;32m|-系统垃圾清理完成,共删除[' + str(count) + ']个文件,释放磁盘空间[' + ToSize(total) + ']\033[0m')
# 清理邮件日志
def ClearMail():
rpath = '/var/spool'
total = count = 0
import shutil
con = ['cron', 'anacron', 'mail']
for d in os.listdir(rpath):
if d in con: continue
dpath = rpath + '/' + d
print('|-正在清理' + dpath + ' ...')
time.sleep(0.2)
num = size = 0
for n in os.listdir(dpath):
filename = dpath + '/' + n
fsize = os.path.getsize(filename)
print('|---[' + ToSize(fsize) + '] del ' + filename),
size += fsize
if os.path.isdir(filename):
shutil.rmtree(filename)
else:
os.remove(filename)
print('\t\033[1;32m[OK]\033[0m')
num += 1
print('|-已清理[' + dpath + '],删除[' + str(num) + ']个文件,共释放磁盘空间[' + ToSize(size) + ']')
total += size
count += num
print('=======================================================================')
print('|-已完成spool的清理,删除[' + str(count) + ']个文件,共释放磁盘空间[' + ToSize(total) + ']')
return total, count
# 清理php_session文件
def ClearSession():
spath = '/tmp'
total = count = 0
import shutil
print('|-正在清理PHP_SESSION ...')
for d in os.listdir(spath):
if d.find('sess_') == -1: continue
filename = spath + '/' + d
fsize = os.path.getsize(filename)
print('|---[' + ToSize(fsize) + '] del ' + filename),
total += fsize
if os.path.isdir(filename):
shutil.rmtree(filename)
else:
os.remove(filename)
print('\t\033[1;32m[OK]\033[0m')
count += 1
print('|-已完成php_session的清理,删除[' + str(count) + ']个文件,共释放磁盘空间[' + ToSize(total) + ']')
return total, count
# 清空回收站
def ClearRecycle_Bin():
import files
f = files.files()
f.Close_Recycle_bin(None)
# 清理其它
def ClearOther():
clearPath = [
{'path': '/www/server/panel', 'find': 'testDisk_'},
{'path': '/www/wwwlogs', 'find': 'log'},
{'path': '/tmp', 'find': 'panelBoot.pl'},
{'path': '/www/server/panel/install', 'find': '.rpm'},
{'path': '/www/server/panel/install', 'find': '.zip'},
{'path': '/www/server/panel/install', 'find': '.gz'}
]
total = count = 0
print('|-正在清理临时文件及网站日志 ...')
for c in clearPath:
for d in os.listdir(c['path']):
if d.find(c['find']) == -1: continue
filename = c['path'] + '/' + d
if os.path.isdir(filename): continue
fsize = os.path.getsize(filename)
print('|---[' + ToSize(fsize) + '] del ' + filename),
total += fsize
os.remove(filename)
print('\t\033[1;32m[OK]\033[0m')
count += 1
public.serviceReload()
os.system('sleep 1 && /etc/init.d/bt reload > /dev/null &')
print('|-已完成临时文件及网站日志的清理,删除[' + str(count) + ']个文件,共释放磁盘空间[' + ToSize(total) + ']')
return total, count
# 关闭普通日志
def CloseLogs():
try:
paths = ['/usr/lib/python2.7/site-packages/web/httpserver.py',
'/usr/lib/python2.6/site-packages/web/httpserver.py']
for path in paths:
if not os.path.exists(path): continue
hsc = public.readFile(path)
if hsc.find('500 Internal Server Error') != -1: continue
rstr = '''def log(self, status, environ):
if status != '500 Internal Server Error': return;'''
hsc = hsc.replace("def log(self, status, environ):", rstr)
if hsc.find('500 Internal Server Error') == -1: return False
public.writeFile(path, hsc)
except:
pass
# 字节单位转换
def ToSize(size):
ds = ['b', 'KB', 'MB', 'GB', 'TB']
for d in ds:
if size < 1024: return str(size) + d
size = size / 1024
return '0b'
# 随机面板用户名
def set_panel_username(username=None):
import db
sql = db.Sql()
if username:
print("|-正在设置面板用户名...")
re_list = re.findall(r"[^\w,.]+", username)
if re_list:
print("|-错误,密码不能包含中文和特殊字符: {}".format(" ".join(re_list)))
return
if username in ['admin', 'root']:
print("|-错误,不能使用过于简单的用户名")
return
public.M('users').where('id=?', (1,)).setField('username', username)
print("|-新用户名: %s" % username)
return
username = public.M('users').where('id=?', (1,)).getField('username')
if username == 'admin':
username = public.GetRandomString(8).lower()
public.M('users').where('id=?', (1,)).setField('username', username)
print('username: ' + username)
# 设定idc
def setup_idc():
try:
panelPath = '/www/server/panel'
filename = panelPath + '/data/o.pl'
if not os.path.exists(filename): return False
o = public.readFile(filename).strip()
c_url = 'http://www.bt.cn/api/idc/get_idc_info_bycode?o=%s' % o
idcInfo = json.loads(public.httpGet(c_url))
if not idcInfo['status']: return False
pFile = panelPath + '/config/config.json'
pInfo = json.loads(public.readFile(pFile))
pInfo['brand'] = idcInfo['msg']['name']
pInfo['product'] = u'与宝塔联合定制版'
public.writeFile(pFile, json.dumps(pInfo))
tFile = panelPath + '/data/title.pl'
titleNew = pInfo['brand'] + u'面板'
if os.path.exists(tFile):
title = public.GetConfigValue('title')
if title == '' or title == '宝塔Linux面板':
public.writeFile(tFile, titleNew)
public.SetConfigValue('title', titleNew)
else:
public.writeFile(tFile, titleNew)
public.SetConfigValue('title', titleNew)
return True
except:
pass
def set_panel_port(panelport):
input_port=int(panelport)
if not input_port:
print("|-错误,未输入任何有效端口")
return
if input_port in [80, 443, 21, 20, 22]:
print("|-错误,请不要使用常用端口作为面板端口")
return
try:
port_str = public.readFile('data/port.pl')
if port_str:
old_port = int(port_str)
else:
old_port = 0
except:
old_port = 0
if old_port == input_port:
print("|-错误,与面板当前端口一致,无需修改")
return
if input_port > 65535 or input_port < 1:
print("|-错误,可用端口范围在1-65535之间")
return
print("|-开始设置面板端口")
is_exists = public.ExecShell("lsof -i:%s|grep LISTEN|grep -v grep" % input_port)
if len(is_exists[0]) > 5:
print("|-错误,指定端口已被其它应用占用")
return
public.writeFile('data/port.pl', str(input_port))
if os.path.exists("/usr/bin/firewall-cmd"):
os.system("firewall-cmd --permanent --zone=public --add-port=%s/tcp" % input_port)
os.system("firewall-cmd --reload")
elif os.path.exists("/etc/sysconfig/iptables"):
os.system("iptables -I INPUT -p tcp -m state --state NEW -m tcp --dport %s -j ACCEPT" % input_port)
os.system("service iptables save")
else:
os.system("ufw allow %s" % input_port)
os.system("ufw reload")
os.system("/etc/init.d/bt reload")
print("|-已将面板端口修改为:%s" % input_port)
print(
"|-若您的服务器提供商是[阿里云][腾讯云][华为云]或其它开启了[安全组]的服务器,请在安全组放行[%s]端口才能访问面板" % input_port)
panelPath = '/www/server/panel/data/o.pl'
if not os.path.exists(panelPath): return False
o = public.readFile(panelPath).strip()
if 'tencent' == o:
print("|-若使用腾讯轻量云-Linux专享版面板,则不需要添加至安全组")
def set_panel_path(adminpath):
admin_path = adminpath
msg = ''
from BTPanel import admin_path_checks
if len(admin_path) < 6: msg = '安全入口地址长度不能小于6位!'
if admin_path in admin_path_checks: msg = '该入口已被面板占用,请使用其它入口!'
if not public.path_safe_check(admin_path) or admin_path[-1] == '.': msg = '入口地址格式不正确,示例: /my_panel'
if admin_path[0] != '/':
admin_path = "/" + admin_path
if admin_path.find("//") != -1:
msg = '入口地址格式不正确,示例: /my_panel'
valid_path_pattern = re.compile(r'^(/?([a-zA-Z0-9\-._~:/@]|%[0-9A-Fa-f]{2})*)$')
if not valid_path_pattern.match(admin_path):
msg ='入口地址格式不正确,示例: /my_panel'
admin_path_file = 'data/admin_path.pl'
if msg != '':
print('设置出错:{}'.format(msg))
return
public.writeFile(admin_path_file, admin_path)
public.restart_panel()
print('安全入口设置成功:{}'.format(admin_path))
def set_panel_ssl(status):
status=status
if status == "enable":
CreateSSL()
if status == "disable":
os.system('btpython /www/server/panel/class/config.py SetPanelSSL')
os.system("/etc/init.d/bt reload")
def get_panel_version():
exit(public.version())
def sync_tencent_ssl(tencent_ssl_path="/root/tencent_ssl"):
"""同步腾讯云证书
遍历目录下的nginx证书压缩包,解压并同步到面板ssl目录
"""
import os
import shutil
import panelSSL
from sslModel import certModel
ss = panelSSL.panelSSL()
cert_main = certModel.main()
panel_ssl_path = "/www/server/panel/vhost/ssl"
try:
# 确保源目录存在
if not os.path.exists(tencent_ssl_path):
print(f"目录不存在: {tencent_ssl_path}")
return False
# 获取所有网站和对应的域名
all_sites = public.M('sites').field('name,id').select()
if not all_sites:
print("没有找到任何网站信息")
return False
for site in all_sites:
domians = public.M('domain').where("pid=?", (site["id"],)).field('name').select()
if not domians:
site["domains"] = []
site["domains"] = [domain["name"] for domain in domians]
BatchInfo = []
# 遍历目录下的所有文件
for filename in os.listdir(tencent_ssl_path):
if not filename.endswith('_nginx.zip'):
continue
# 获取域名(去除_nginx.zip后缀)
domain = filename.replace('_nginx.zip', '')
zip_path = os.path.join(tencent_ssl_path, filename)
extract_path = os.path.join(tencent_ssl_path, domain + '_nginx')
ssl_domain_path = os.path.join(panel_ssl_path, domain)
try:
# 解压文件
public.ExecShell("cd {} && unzip {}".format(tencent_ssl_path, zip_path))
# 创建目标ssl目录
os.makedirs(ssl_domain_path, exist_ok=True)
# 复制证书文件
bundle_src = os.path.join(extract_path, f"{domain}_bundle.pem")
key_src = os.path.join(extract_path, f"{domain}.key")
get = public.to_dict_obj({})
get.key = public.readFile(key_src)
get.csr = public.readFile(bundle_src)
res=cert_main.save_cert(get)
if res.get("status") == True:
print(f"成功同步证书: {domain}")
else:
print(f"同步证书失败: {domain}")
continue
# 获取证书信息
get.ssl_hash = res["ssl_hash"]
cert_detail = public.M('ssl_info').field(
'dns'
).where("hash=?",(res["ssl_hash"])).select()
if not cert_detail:
print(f"未找到证书信息: {domain}")
continue
try:
cert_detail = json.loads(cert_detail[0]["dns"])
except Exception as e:
print(f"解析证书信息失败: {domain}, 错误: {str(e)}")
for site in all_sites:
if site["domains"] and all_domains_covered(site["domains"], cert_detail):
BatchInfo.append(
{"ssl_hash":res["ssl_hash"],"siteName": site["name"]}
)
except Exception as e:
print(f"处理证书时出错 {domain}: {str(e)}")
finally:
if os.path.exists(extract_path):
shutil.rmtree(extract_path)
if BatchInfo:
get = public.to_dict_obj({})
get.BatchInfo = json.dumps(BatchInfo)
res = ss.SetBatchCertToSite(get)
print(res)
else:
print("没有需要部署的证书信息")
return True
except Exception as e:
print(f"同步证书时出错: {str(e)}")
return False
def all_domains_covered(domain_list, cert_domains):
def matches(domain, pattern):
if pattern.startswith('*.'):
base = pattern[2:]
base_dots = base.count('.')
domain_dots = domain.count('.')
# domain必须以.base结尾,且domain的点数比base多1(表示一级子域)
if domain.endswith('.' + base) and domain_dots == base_dots + 1:
return True
else:
if domain == pattern:
return True
return False
return all(any(matches(domain, pattern) for pattern in cert_domains) for domain in domain_list)
def get_temp_login():
s_time = int(time.time())
expire_time=int(int(time.time()) + 3600 * 3)
public.M('temp_login').where('state=? and expire>?', (0, s_time)).delete()
token = public.GetRandomString(48)
salt = public.GetRandomString(12)
pdata = {
'token': public.md5(token + salt),
'salt': salt,
'state': 0,
'login_time': 0,
'login_addr': '',
'expire': expire_time,
}
if not public.M('temp_login').count():
pdata['id'] = 101
if public.M('temp_login').insert(pdata):
if os.path.exists('/www/server/panel/data/ssl.pl'):
HTTP_C="https://"
else:
HTTP_C="http://"
IP_ADDRES=public.ExecShell("curl -sS --connect-timeout 10 -m 20 https://www.bt.cn/Api/getIpAddress")[0]
PANEL_PORT=public.readFile("/www/server/panel/data/port.pl")
PANEL_ADDRESS=HTTP_C+IP_ADDRES+":"+PANEL_PORT+"/login?tmp_token="+token
print(PANEL_ADDRESS)
def get_temp_login_ipv4():
s_time = int(time.time())
expire_time=int(int(time.time()) + 3600 * 3)
public.M('temp_login').where('state=? and expire>?', (0, s_time)).delete()
token = public.GetRandomString(48)
salt = public.GetRandomString(12)
pdata = {
'token': public.md5(token + salt),
'salt': salt,
'state': 0,
'login_time': 0,
'login_addr': '',
'expire': expire_time,
}
if not public.M('temp_login').count():
pdata['id'] = 101
if public.M('temp_login').insert(pdata):
if os.path.exists('/www/server/panel/data/ssl.pl'):
HTTP_C="https://"
else:
HTTP_C="http://"
IP_ADDRES=public.ExecShell("curl -4 -sS --connect-timeout 10 -m 20 https://www.bt.cn/Api/getIpAddress")[0]
PANEL_PORT=public.readFile("/www/server/panel/data/port.pl")
PANEL_ADDRESS=HTTP_C+IP_ADDRES+":"+PANEL_PORT+"/login?tmp_token="+token
print(PANEL_ADDRESS)
# 将插件升级到6.0
def update_to6():
print("====================================================")
print("正在升级插件...")
print("====================================================")
download_address = public.get_url()
exlodes = ['gitlab', 'pm2', 'mongodb', 'deployment_jd', 'logs', 'docker', 'beta', 'btyw']
for pname in os.listdir('plugin/'):
if not os.path.isdir('plugin/' + pname): continue
if pname in exlodes: continue
print("|-正在升级【%s】..." % pname),
download_url = download_address + '/install/plugin/' + pname + '/install.sh'
to_file = '/tmp/%s.sh' % pname
public.downloadFile(download_url, to_file)
os.system('/bin/bash ' + to_file + ' install &> /tmp/plugin_update.log 2>&1')
print(" \033[32m[成功]\033[0m")
print("====================================================")
print("\033[32m所有插件已成功升级到最新!\033[0m")
print("====================================================")
# 2024/5/29 下午5:58 调用面板反向代理的模块创建
def create_reverse_proxy(get):
'''
@param get:
@name 调用面板反向代理的模块创建
@author wzz <2024/5/29 下午6:00>
@param "data":{"参数名":""} <数据类型> 参数描述
@return dict{"status":True/False,"msg":"提示信息"}
'''
from mod.project.proxy.comMod import main as proxyMod
pMod = proxyMod()
panel_port = public.readFile('data/port.pl')
try:
close_reverse_proxy()
__http = 'https' if os.path.exists("/www/server/panel/data/ssl.pl") else 'http'
if __http != "https":
CreateSSL()
__http = "https"
args = public.to_dict_obj({
"proxy_pass": "{}://127.0.0.1:{}".format(__http, panel_port),
"proxy_type": "http",
"domains": get.siteName,
"proxy_host": "$http_host",
"remark": "宝塔面板的反代[请误操作,修改可能会导致面板无法访问]"
})
create_result = pMod.create(args)
if not create_result['status']:
return public.returnResult(False, create_result['msg'])
# args.site_name = get.siteName
# args.auth_path = "/"
# args.username = public.GetRandomString(8).lower()
# args.password = public.GetRandomString(8).lower()
# if os.path.exists("data/http_auth.pwd"):
# public.ExecShell("rm -rf /www/server/panel/data/http_auth.pwd")
#
# public.writeFile("data/http_auth.pwd", args.password)
# args.name = public.GetRandomString(5)
#
# auth_result = pMod.add_dir_auth(args)
# if not create_result['status']:
# return public.returnResult(False, auth_result['msg'])
return_data = {
# "username": args.username,
# "password": args.password,
"siteName": get.siteName,
"http": __http
}
return public.returnResult(True, '添加成功', data=return_data)
except Exception as e:
result = public.M('sites').where("name=?", (get.siteName,)).find()
if not isinstance(result, dict):
return public.returnResult(False, '添加失败,可能是Nginx配置文件错误,请先检查后再设置!错误详情:{}!'.format(str(e)))
args = public.to_dict_obj({
"id": result['id'],
"siteName": get.siteName,
"remove_path": 1,
})
pMod.delete(args)
return public.returnResult(False, '添加失败,可能是Nginx配置文件错误,请先检查后再设置!错误详情:{}!'.format(str(e)))
# 2024/5/30 上午10:38 设置指定代理的SSL
def set_reverse_proxy_ssl(get):
'''
@name 设置指定代理的SSL
@author wzz <2024/5/30 上午10:38>
@param "data":{"参数名":""} <数据类型> 参数描述
@return dict{"status":True/False,"msg":"提示信息"}
'''
from mod.project.proxy.comMod import main as proxyMod
pMod = proxyMod()
try:
key = public.readFile("ssl/privateKey.pem")
csr = public.readFile("ssl/certificate.pem")
get.key = key
get.csr = csr
result = pMod.set_ssl(get)
if not result['status']:
return public.returnResult(False, result['msg'])
return public.returnResult(True, '设置成功')
except Exception as e:
return public.returnMsg(False, '设置失败,错误{}!'.format(str(e)))
# 2024/5/29 下午6:21 关闭面板反向代理
def close_reverse_proxy():
'''
@name 关闭面板反向代理
@author wzz <2024/5/29 下午6:21>
@param "data":{"参数名":""} <数据类型> 参数描述
@return dict{"status":True/False,"msg":"提示信息"}
'''
from mod.project.proxy.comMod import main as proxyMod
pMod = proxyMod()
site_name = ""
sid = ""
try:
args = public.to_dict_obj({})
proxy_list = pMod.get_list(args)
if proxy_list["data"] is None:
return public.returnResult(True, '', data={"siteName": site_name})
panel_port = public.readFile('data/port.pl')
for proxy in proxy_list["data"]["data"]:
if panel_port == proxy["proxy_pass"].split(":")[-1]:
site_name = proxy["name"]
sid = proxy["id"]
break
args.id = sid
args.site_name = site_name
args.remove_path = 1
delete_result = pMod.delete(args)
if not delete_result['status']:
return public.returnResult(False, delete_result['msg'])
if os.path.exists("data/http_auth.pwd"):
public.ExecShell("rm -rf /www/server/panel/data/http_auth.pwd")
return public.returnResult(True, '删除成功', data={"siteName": site_name})
except Exception as e:
return public.returnMsg(False, '删除失败,错误{}!'.format(str(e)), data={"siteName": site_name})
# 2024/5/29 下午6:08 获取面板反代的信息
def get_reverse_proxy():
'''
@name
@author wzz <2024/5/29 下午6:08>
@param "data":{"参数名":""} <数据类型> 参数描述
@return dict{"status":True/False,"msg":"提示信息"}
'''
from mod.project.proxy.comMod import main as proxyMod
pMod = proxyMod()
site_name = ""
try:
args = public.to_dict_obj({})
proxy_list = pMod.get_list(args)
if proxy_list["data"] is None:
return public.returnResult(True, '', data={"siteName": site_name})
panel_port = str(public.get_panel_port())
for proxy in proxy_list["data"]["data"]:
if panel_port == proxy["proxy_pass"].split(":")[-1].strip():
site_name = proxy["name"]
break
# if site_name != "":
# args.site_name = site_name
# global_conf = pMod.get_global_conf(args)
#
# password = ""
# if os.path.exists("data/http_auth.pwd"):
# password = public.readFile("data/http_auth.pwd")
#
# if password == "":
# return public.returnResult(True, '', data={"siteName": site_name})
#
# return_result = {
# "username": global_conf["data"]["basic_auth"][0]["username"],
# "password": password,
# "siteName": site_name
# }
# return public.returnResult(True, '', data=return_result)
return public.returnResult(True, '', data={"siteName": site_name})
except Exception as e:
import traceback
print(traceback.format_exc())
return public.returnMsg(False, '获取失败,错误{}!'.format(str(e)), data={"siteName": site_name})
# 2025/2/17 14:29 检测如果磁盘剩余空间是否小于500M,是就返回False
def check_disk_space():
'''
@name 检测如果磁盘剩余空间是否小于50M,是就返回False,否则返回True
@return bool
'''
disk_info = public.get_disk_usage("/")
if disk_info.free < 50 * 1024 * 1024:
print("==========================================================================")
# 2025/2/17 14:34 输出格式化后的总磁盘空间和剩余空间
os.system("echo -e '\\e[31m紧急:根目录(\"/\")磁盘空间不足50M,请先清理磁盘空间后再执行命令!\\e[0m'")
print("可执行下面命令进行手动清理")
os.system("echo -e '\\e[33m命令:btpython /www/server/panel/script/btcli.py\\e[0m'")
print("总磁盘空间:{},剩余空间:{}".format(public.to_size(disk_info.total), public.to_size(disk_info.free)))
print("计算空间由字节转换,请以实际大小为准!")
print("")
stdout, stderr = public.ExecShell("df -Th")
print("系统 df -Th 输出详情如下:")
print(stdout)
print("==========================================================================")
return False
return True
# 命令行菜单
def bt_cli(u_input=0):
raw_tip = "==============================================="
raw_tip1 = "===================================================================================="
if not u_input:
print("==================================宝塔面板命令行====================================")
print("(1) 重启面板服务 (8) 改面板端口 |")
print("(2) 停止面板服务 (9) 清除面板缓存 |")
print("(3) 启动面板服务 (10) 清除登录限制 |")
print("(4) 重载面板服务 (11) 设置是否开启IP + User-Agent验证 |")
print("(5) 修改面板密码 (12) 取消域名绑定限制 |")
print("(6) 修改面板用户名 (13) 取消IP访问限制 |")
print("(7) 强制修改MySQL密码 (14) 查看面板默认信息 |")
print("(22) 显示面板错误日志 (15) 清理系统垃圾 |")
print("(23) 关闭BasicAuth认证 (16) 修复面板(安装当前版本的最新bug修复包) |")
print("(24) 关闭动态口令认证 (17) 设置日志切割是否压缩 |")
print("(25) 设置是否保存文件历史副本 (18) 设置是否自动备份面板 |")
print("(26) 关闭面板ssl (19) 关闭面板登录地区限制 |")
print("(28) 修改面板安全入口 (29) 取消访问设备验证 |")
print("(30) 取消访问UA验证 (32) 开启/关闭【80、443】端口访问面板 |")
print("(34) 更新面板(更新到最新版本) (35) btcli命令行管理工具 |")
print("(36) 磁盘空间清理工具 (99) Switch to English |")
print("(0) 取消 |")
print(raw_tip1)
try:
u_input = input("请输入命令编号:")
if sys.version_info[0] == 3: u_input = int(u_input)
except:
u_input = 0
try:
if u_input in ['log', 'logs', 'error', 'err', 'tail', 'debug', 'info']:
os.system("tail -f {}".format(public.get_panel_log_file()))
return
if u_input[:6] in ['install', 'update']:
print("提示:命令传参示例(编译安装php7.4):bt install/0/php/7.4")
print(sys.argv)
install_args = u_input.split('/')
if len(install_args) < 2:
try:
install_input = input("请选择安装方式(0 编译安装,1 极速安装,默认: 1):")
install_input = int(install_input)
except:
install_input = 1
else:
install_input = int(install_args[1])
print(raw_tip)
soft_list = 'nginx apache php mysql memcached redis pure-ftpd phpmyadmin pm2 docker openlitespeed mongodb'
soft_list_arr = soft_list.split(' ')
if len(install_args) < 3:
install_soft = ''
while not install_soft:
print("支持的软件:{}".format(soft_list))
print(raw_tip)
install_soft = input("请输入要安装的软件名称(如:nginx):")
if install_soft not in soft_list_arr:
print("不支命令行安装的持的软件")
install_soft = ''
else:
install_soft = install_args[2]
print(raw_tip)
if len(install_args) < 4:
install_version = ''
while not install_version:
print(raw_tip)
install_version = input("请输入要安装的版本号(如:1.18):")
else:
install_version = install_args[3]
print(raw_tip)
os.system(
"bash /www/server/panel/install/install_soft.sh {} {} {} {}".format(install_input, install_args[0],
install_soft, install_version))
exit()
print("不支持的指令")
exit()
except:
pass
nums = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 28, 29, 30, 31, 32, 33, 34, 35, 36, 99]
if not u_input in nums:
print(raw_tip)
print("已取消!")
exit()
print(raw_tip)
print("正在执行(%s)..." % u_input)
print(raw_tip)
if u_input == 32:
if public.get_webserver() != "nginx":
print("仅支持nginx!")
if not os.path.exists("/www/server/nginx/sbin/nginx"):
print("未安装nginx,无法设置免端口访问面板")
return
print(raw_tip)
print("此功能设置后可免端口访问宝塔面板")
print("例如,设置域名为:panel.bt.cn")
print("则可以通过 https://panel.bt.cn 访问面板")
print("如果您输入的域名未解析,设置hosts即可访问")
print("开启此功能后会启用http认证以及面板https,以确保面板使用安全,请勿关闭!")
print(raw_tip)
print()
reverse_data = get_reverse_proxy()
__http = 'https://' if os.path.exists("/www/server/panel/data/ssl.pl") else 'http://'
admin_path = public.readFile('/www/server/panel/data/admin_path.pl').strip() if os.path.exists(
"/www/server/panel/data/admin_path.pl") else "/login"
if "siteName" in reverse_data['data'] and reverse_data['data']['siteName'] != "":
address = __http + reverse_data['data']['siteName'] + admin_path
print("已设置的免端口访问信息如下:")
print("面板地址: {}".format(address))
if "username" in reverse_data['data']:
print("http认证用户名: {}".format(reverse_data['data']['username']))
print("http认证密码: {}".format(reverse_data['data']['password']))
bt_username = public.M('users').where('id=?', (1,)).getField('username')
bt_password = public.readFile("/www/server/panel/default.pl")
if "siteName" in reverse_data['data'] and reverse_data['data']['siteName'] != "":
print("面板用户名: {}".format(bt_username))
print("面板密码: {}".format(bt_password))
print()
print("#### 如需关闭,请输入0 关闭免端口访问面板")
site_name = input("请输入访问面板的域名或IP:")
if site_name == "":
print("域名或ip不能为空,请重新输入,例如:192.168.100.100或panel.bt.cn")
return
if site_name.startswith("0 ") or site_name == "0":
close_reverse_proxy()
print()
print("面板反向代理关闭成功")
return
else:
if not public.check_ip(site_name) and not public.is_domain(site_name):
print("域名或ip格式错误,请重新输入,例如:panel.bt.cn")
return
get = public.to_dict_obj({
"siteName": site_name
})
public.set_module_logs('命令行设置免端口访问', 'create', 1)
result = create_reverse_proxy(get)
if not result["status"]:
print(result["msg"])
return
if result["data"]["http"] == 'https':
get.site_name = site_name
set_reverse_proxy_ssl(get)
__http = 'https://'
print()
print(raw_tip)
address = __http + result['data']['siteName'] + admin_path
print("面板地址: {}".format(address))
# print("http认证用户名: {}".format(result['data']['username']))
# print("http认证密码: {}".format(result['data']['password']))
print("面板用户名: {}".format(bt_username))
print("面板密码: {}".format(bt_password))
print(raw_tip)
public.restart_panel()
if u_input == 33:
close_reverse_proxy()
print("面板反向代理关闭成功")
if u_input == 31:
os.system('tail -50 /www/server/panel/data/login_err.log')
if u_input == 26:
os.system('btpython /www/server/panel/class/config.py SetPanelSSL')
os.system("/etc/init.d/bt reload")
if u_input == 28:
admin_path = input('请输入新的安全入口:')
msg = ''
from BTPanel import admin_path_checks
if len(admin_path) < 6: msg = '安全入口地址长度不能小于6位!'
if admin_path in admin_path_checks: msg = '该入口已被面板占用,请使用其它入口!'
if not public.path_safe_check(admin_path) or admin_path[-1] == '.': msg = '入口地址格式不正确,示例: /my_panel'
if admin_path[0] != '/':
admin_path = "/" + admin_path
admin_path_file = 'data/admin_path.pl'
admin_path1 = '/'
if os.path.exists(admin_path_file): admin_path1 = public.readFile(admin_path_file).strip()
if msg != '':
print('设置出错:{}'.format(msg))
return
public.writeFile(admin_path_file, admin_path)
public.restart_panel()
print('安全入口设置成功:{}'.format(admin_path))
if u_input == 30:
ua_file = 'data/limitua.conf'
if os.path.exists(ua_file):
with open(ua_file, 'r') as f:
data = json.load(f)
data['status'] = '0' # 将status的值设置为"0"
with open(ua_file, 'w') as f:
json.dump(data, f)
print("|-已关闭访问UA验证")
if u_input == 29:
os.system("rm -rf /www/server/panel/data/ssl_verify_data.pl")
os.system("/etc/init.d/bt restart")
print("|-已关闭访问设备验证")
if u_input == 1:
os.system("/etc/init.d/bt restart")
elif u_input == 2:
os.system("/etc/init.d/bt stop")
elif u_input == 3:
os.system("/etc/init.d/bt start")
elif u_input == 4:
os.system("/etc/init.d/bt reload")
elif u_input == 5:
if sys.version_info[0] == 2:
input_pwd = raw_input("请输入新的面板密码:")
else:
input_pwd = input("请输入新的面板密码:")
set_panel_pwd(input_pwd.strip(), True)
elif u_input == 6:
if sys.version_info[0] == 2:
input_user = raw_input("请输入新的面板用户名(≥3位):")
else:
input_user = input("请输入新的面板用户名(≥3位):")
set_panel_username(input_user.strip())
elif u_input == 7:
if sys.version_info[0] == 2:
input_mysql = raw_input("请输入新的MySQL密码:")
else:
input_mysql = input("请输入新的MySQL密码:")
if not input_mysql:
print("|-错误,不能设置空密码")
return
if len(input_mysql) < 8:
print("|-错误,长度不能少于8位")
return
import re
rep = r"^[\w@\._]+$"
if not re.match(rep, input_mysql):
print("|-错误,密码中不能包含特殊符号")
return
print(input_mysql)
set_mysql_root(input_mysql.strip())
elif u_input == 8:
input_port = input("请输入新的面板端口:")
if sys.version_info[0] == 3: input_port = int(input_port)
if not input_port:
print("|-错误,未输入任何有效端口")
return
if input_port in [80, 443, 21, 20, 22]:
print("|-错误,请不要使用常用端口作为面板端口")
return
try:
port_str = public.readFile('data/port.pl')
if port_str:
old_port = int(port_str)
else:
old_port = 0
except:
old_port = 0
if old_port == input_port:
print("|-错误,与面板当前端口一致,无需修改")
return
if input_port > 65535 or input_port < 1:
print("|-错误,可用端口范围在1-65535之间")
return
is_exists = public.ExecShell("lsof -i:%s|grep LISTEN|grep -v grep" % input_port)
if len(is_exists[0]) > 5:
print("|-错误,指定端口已被其它应用占用")
return
public.writeFile('data/port.pl', str(input_port))
if os.path.exists("/usr/bin/firewall-cmd"):
os.system("firewall-cmd --permanent --zone=public --add-port=%s/tcp" % input_port)
os.system("firewall-cmd --reload")
elif os.path.exists("/etc/sysconfig/iptables"):
os.system("iptables -I INPUT -p tcp -m state --state NEW -m tcp --dport %s -j ACCEPT" % input_port)
os.system("service iptables save")
else:
os.system("ufw allow %s" % input_port)
os.system("ufw reload")
os.system("/etc/init.d/bt reload")
print("|-已将面板端口修改为:%s" % input_port)
print(
"|-若您的服务器提供商是[阿里云][腾讯云][华为云]或其它开启了[安全组]的服务器,请在安全组放行[%s]端口才能访问面板" % input_port)
panelPath = '/www/server/panel/data/o.pl'
if not os.path.exists(panelPath): return False
o = public.readFile(panelPath).strip()
if 'tencent' == o:
print("|-若使用腾讯轻量云-Linux专享版面板,则不需要添加至安全组")
elif u_input == 9:
sess_file = '/www/server/panel/data/session'
if os.path.exists(sess_file):
os.system("rm -f {}/*".format(sess_file))
public.clear_sql_session()
os.system("/etc/init.d/bt reload")
elif u_input == 10:
os.system("/etc/init.d/bt reload")
elif u_input == 11:
not_tip = '{}/data/not_check_ip.pl'.format(public.get_panel_path())
if os.path.exists(not_tip):
os.remove(not_tip)
print("|-已开启IP + User-Agent检测")
print("|-此功能可以有效防止[重放攻击]")
else:
public.writeFile(not_tip, 'True')
print("|-已关闭IP + User-Agent检测")
print("|-注意:关闭此功能有被[重放攻击]的风险")
elif u_input == 12:
auth_file = 'data/domain.conf'
if os.path.exists(auth_file): os.remove(auth_file)
os.system("/etc/init.d/bt reload")
print("|-已取消域名访问限制")
elif u_input == 13:
auth_file = 'data/limitip.conf'
if os.path.exists(auth_file): os.remove(auth_file)
os.system("/etc/init.d/bt reload")
print("|-已取消IP访问限制")
elif u_input == 14:
try:
if os.path.exists('/www/server/panel/data/panel_generation.pl'):
conf = json.loads(public.readFile('/www/server/panel/data/panel_generation.pl'))
__http = 'https://' if os.path.exists("/www/server/panel/data/ssl.pl") else 'http://'
admin_path = public.readFile('/www/server/panel/data/admin_path.pl')
if admin_path:
if admin_path.strip() in ("", "/"):
admin_path = '/login'
else:
admin_path = admin_path.strip()
else:
admin_path = '/login'
address = __http + conf['domain'] + admin_path
public.writeFile('/www/server/panel/data/panel_site_address.pl', address)
else:
public.ExecShell("rm -rf /www/server/panel/data/panel_site_address.pl")
except:
pass
os.system("/etc/init.d/bt default")
elif u_input == 15:
ClearSystem()
elif u_input == 16:
sh_path = "{}/script/upgrade_panel_optimized.py".format(public.get_panel_path())
if not os.path.exists(sh_path):
print("|-未找到修复脚本")
os.system("bash {} repair_panel".format(sh_path))
elif u_input == 17:
l_path = '/www/server/panel/data/log_not_gzip.pl'
if os.path.exists(l_path):
print("|-检测到已关闭gzip压缩功能,正在开启...")
os.remove(l_path)
print("|-已开启gzip压缩")
else:
print("|-检测到已开启gzip压缩功能,正在关闭...")
public.writeFile(l_path, 'True')
print("|-已关闭gzip压缩")
elif u_input == 18:
l_path = '/www/server/panel/data/not_auto_backup.pl'
if os.path.exists(l_path):
print("|-检测到已关闭面板自动备份功能,正在开启...")
os.remove(l_path)
print("|-已开启面板自动备份功能")
else:
print("|-检测到已开启面板自动备份功能,正在关闭...")
public.writeFile(l_path, 'True')
print("|-已关闭面板自动备份功能")
elif u_input == 19:
empty_content = {
"limit_area": {
"city": [],
"province": [],
"country": []
},
"limit_area_status": "false",
"limit_type": "deny"
}
try:
limit_area_json = json.loads(public.readFile("/www/server/panel/data/limit_area.json"))
except json.decoder.JSONDecodeError:
limit_area_json = empty_content
except TypeError:
limit_area_json = empty_content
limit_area_json['limit_area_status'] = "false"
public.writeFile('data/limit_area.json', json.dumps(limit_area_json))
print("|-已关闭面板登录地区限制")
elif u_input == 20:
limit_info = public.get_limit_area()
if limit_info['limit_type'] == "allow":
rule_type = "仅允许"
else:
rule_type = "禁止"
city = []
province = []
country = []
for area in limit_info['limit_area']['city']:
city.append(area['name'])
for area in limit_info['limit_area']['province']:
province.append(area['name'])
for area in limit_info['limit_area']['country']:
country.append(area['name'])
if not country and not province and not city:
print("|-当前未配置面板登录限制规则!")
return
print("|-当前面板登录限制规则为:【{}】以下地区登录宝塔面板!\n".format(rule_type))
if country:
print("国家:{}".format(country))
if province:
print("省份:{}".format(province))
if city:
print("城市:{}".format(city))
elif u_input == 21:
backup_lists = public.get_default_db_backup_dates()
print("如需恢复面板数据库初始设置请输入:default\n")
if len(backup_lists) > 0:
print("|-可以恢复的面板数据库备份列表:")
for backup_list in backup_lists:
print("|-{}".format(backup_list))
else:
print("|-没有可以恢复的面板数据库备份列表!")
date = input("请输入可恢复的日期(格式:2020-01-01/20200101):")
if date == 'default':
print("|-警告:恢复面板数据库初始设置将会清空面板数据!")
confirm = input("请输入yes确认:")
if confirm != 'yes':
print("|-输入非yes,已退出!")
return
print("|-正在恢复面板数据库初始设置...")
public.recover_default_db(date)
os.system("/etc/init.d/bt restart")
return
if not date:
print("|-错误,未输入任何有效日期")
return
if len(date) == 8 or len(date) == 10:
if len(date) == 8:
date = date[:4] + '-' + date[4:6] + '-' + date[6:]
elif len(date) == 10:
date = date[:4] + '-' + date[5:7] + '-' + date[8:]
if date not in backup_lists:
print("|-错误,指定的日期不存在,请输入正确日期!")
return
print("|-正在恢复面板数据库...")
print("|-警告:恢复面板数据库将会覆盖当前面板数据!")
confirm = input("请输入yes确认:")
if confirm != 'yes':
print("|-输入非yes,已退出!")
return
result = public.recover_default_db(date)
if result is False: return
print("|-面板数据库恢复成功!")
print("|-正在重启面板...")
os.system("/etc/init.d/bt restart")
print("|-面板重启成功!")
print("|-已将面板恢复至{}的设置!".format(date))
elif u_input == 22:
os.system('tail -100 /www/server/panel/logs/error.log')
elif u_input == 23:
filename = '/www/server/panel/config/basic_auth.json'
if os.path.exists(filename): os.remove(filename)
os.system('bt reload')
print("|-已关闭BasicAuth认证")
elif u_input == 24:
filename = '/www/server/panel/data/two_step_auth.txt'
if os.path.exists(filename): os.remove(filename)
print("|-已关闭谷歌认证")
elif u_input == 25:
l_path = '/www/server/panel/data/not_file_history.pl'
if os.path.exists(l_path):
print("|-检测到已关闭文件副本功能,正在开启...")
os.remove(l_path)
print("|-已开启文件副本功能")
else:
print("|-检测到已开启文件副本功能,正在关闭...")
public.writeFile(l_path, 'True')
print("|-已关闭文件副本功能")
elif u_input == 34:
ver = sys.argv[3] if len(sys.argv) > 3 and sys.argv[2]=="34" else ""
sh_path = "{}/script/upgrade_panel_optimized.py".format(public.get_panel_path())
if not os.path.exists(sh_path):
print("|-未找到升级脚本")
ret_code = os.system("bash {} upgrade_panel {} --dry-run".format(sh_path, ver))
if ret_code != 0:
return
continue_tip = input("是否继续执行更新?(y/n):")
if continue_tip.strip().lower() in ('y', 'yes'):
os.system("bash {} upgrade_panel {}".format(sh_path, ver))
else:
print("已取消更新!")
elif u_input == 35:
public.ExecShell("chmod +x /www/server/panel/script/btcli.py")
os.system("/www/server/panel/script/btcli.py")
elif u_input == 36:
public.ExecShell("chmod +x /www/server/panel/script/btcli.py")
os.system("/www/server/panel/script/btcli.py")
elif u_input == 99:
os.system("btpython /www/server/panel/tools_en.py cli")
if __name__ == "__main__":
type = sys.argv[1]
if type == 'root':
set_mysql_root(sys.argv[2])
elif type == 'panel':
set_panel_pwd(sys.argv[2])
elif type == 'username':
if len(sys.argv) > 2:
set_panel_username(sys.argv[2])
else:
set_panel_username()
elif type == 'reusername':
set_panel_username(sys.argv[2])
elif type == 'o':
setup_idc()
elif type == 'mysql_dir':
set_mysql_dir(sys.argv[2])
elif type == 'package':
PackagePanel()
elif type == 'ssl':
CreateSSL()
elif type == 'clear':
ClearSystem()
elif type == 'closelog':
CloseLogs()
elif type == 'update_to6':
update_to6()
elif type == 'set_panel_port':
set_panel_port(sys.argv[2])
elif type == 'set_panel_path':
set_panel_path(sys.argv[2])
elif type == 'set_panel_ssl':
set_panel_ssl(sys.argv[2])
elif type == 'get_temp_login':
get_temp_login()
elif type == 'get_panel_version':
get_panel_version()
elif type == 'sync_tencent_ssl':
sync_tencent_ssl()
elif type == 'get_temp_login_ipv4':
get_temp_login_ipv4()
elif type == 'phpenv':
import jobs
jobs.set_php_cli_env()
elif type == "cli":
clinum = 0
try:
if len(sys.argv) > 2:
clinum = int(sys.argv[2]) if sys.argv[2][:6] not in ['instal', 'update'] else sys.argv[2]
except:
clinum = sys.argv[2]
if clinum != 14:
if not check_disk_space(): exit(1)
bt_cli(clinum)
elif type == 'check_db':
check_db() # 面板自动修复
else:
print('ERROR: Parameter error')