一个简单Python脚本,实现轻量应用服务器防火墙自动添加客户端公网IP访问

2024-04-26 16:26:27 浏览数 (1)

当需要设置轻量级服务器的防火墙策略时,一般用户都是需要手动去设置开放指定公网IP访问轻量服务器,这样也是比较麻烦的。就此作者想了下,如何避免每次手动去修改公网IP,这样也是非常麻烦,如果腾讯云轻量服务器控制台能增加一个安全选项,实现浏览器获取当前用户公网IP并一键设置允许常用端口访问。这样大大的增加用户安全体验下。

如下实现了自动获取当前客户端网络环境的公网IP并允许当前公网IP访问轻量服务器指定端口:

用户首先需要获取腾讯云的访问密钥(SecretId和Key)。然后,用户可以将这些密钥代入Python脚本,并指定服务器所在的区域以及云服务器的ID信息。

这个Python脚本的作用是解决用户远程访问腾讯云服务器的安全问题。运行此Python脚本后,它会自动获取当前用户的公网IP地址,并设置腾讯云服务器只允许当前公网IP地址访问服务器的22和8888端口,从而确保服务器的22和8888端口不对所有人开放。此脚本对于移动办公和运维人员来说非常方便。

运行Python脚本后,默认情况下,回车键将会自动开启所有用户对80和443端口的访问权限,并且只允许当前公网IP访问服务器的22端口和8888端口。

代码语言:txt复制
import json
from tabulate import tabulate
from tencentcloud.common import credential
from tencentcloud.common.profile.client_profile import ClientProfile
from tencentcloud.common.profile.http_profile import HttpProfile
from tencentcloud.common.exception.tencent_cloud_sdk_exception import TencentCloudSDKException
from tencentcloud.lighthouse.v20200324 import lighthouse_client, models
import requests
import time

#腾讯云SecretId信息和秘钥
SecretId = "AKIDqilQ1DSjsy7BHcHqHZjtESf"
key = "PdibItIYlrsqmibKxW3"
#轻量服务器地域
TencentZone = "guangzhou"
#轻量服务器ID,如下示例
Serverid = "lhins-6y22h1kaip"
#为了防止误运行,设置了密码,底部可修改。默认 admin 密码 123456

cred = credential.Credential(SecretId, key)
def add_local(ip,open_port="22",status="ACCEPT"):
    try:
        httpProfile = HttpProfile()
        httpProfile.endpoint = "lighthouse.tencentcloudapi.com"

        # 实例化一个client选项,可选的,没有特殊需求可以跳过
        clientProfile = ClientProfile()
        clientProfile.httpProfile = httpProfile
        # 实例化要请求产品的client对象,clientProfile是可选的
        client = lighthouse_client.LighthouseClient(cred, TencentZone, clientProfile)

        # 实例化一个请求对象,每个接口都会对应一个request对象
        req = models.ModifyFirewallRulesRequest()
        params = {
            "InstanceId": Serverid,
            "FirewallRules": [
                {
                    "Protocol": "TCP",
                    "Port": open_port,
                    "CidrBlock": ip,
                    "Action": status,
                    "FirewallRuleDescription": "服务器远程访问"
                }
            ]
        }
        req.from_json_string(json.dumps(params))

        # 返回的resp是一个ModifyFirewallRulesResponse的实例,与请求对象对应
        resp = client.ModifyFirewallRules(req)
        # 输出json格式的字符串回包
        print(resp.to_json_string())
    except TencentCloudSDKException as err:
        print(err)
def add_https(ip="0.0.0.0/0",port ="443,80"):
    try:
        # 实例化一个http选项,可选的,没有特殊需求可以跳过
        httpProfile = HttpProfile()
        httpProfile.endpoint = "lighthouse.tencentcloudapi.com"

        # 实例化一个client选项,可选的,没有特殊需求可以跳过
        clientProfile = ClientProfile()
        clientProfile.httpProfile = httpProfile
        # 实例化要请求产品的client对象,clientProfile是可选的
        client = lighthouse_client.LighthouseClient(cred, TencentZone, clientProfile)

        # 实例化一个请求对象,每个接口都会对应一个request对象
        req = models.CreateFirewallRulesRequest()
        params = {
            "InstanceId": Serverid,
            "FirewallRules": [
                {
                    "Protocol": "TCP",
                    "Port": port,
                    "CidrBlock": ip,
                    "Action": "ACCEPT",
                    "FirewallRuleDescription": "网站访问"
                }
            ]
        }
        req.from_json_string(json.dumps(params))

        # 返回的resp是一个CreateFirewallRulesResponse的实例,与请求对象对应
        resp = client.CreateFirewallRules(req)
        # 输出json格式的字符串回包
        print(resp.to_json_string())

    except TencentCloudSDKException as err:
        print(err)

def add_new(ip,new_port):
    try:
        # 实例化一个http选项,可选的,没有特殊需求可以跳过
        httpProfile = HttpProfile()
        httpProfile.endpoint = "lighthouse.tencentcloudapi.com"

        # 实例化一个client选项,可选的,没有特殊需求可以跳过
        clientProfile = ClientProfile()
        clientProfile.httpProfile = httpProfile
        # 实例化要请求产品的client对象,clientProfile是可选的
        client = lighthouse_client.LighthouseClient(cred, TencentZone, clientProfile)

        # 实例化一个请求对象,每个接口都会对应一个request对象
        req = models.CreateFirewallRulesRequest()
        params = {
            "InstanceId": Serverid,
            "FirewallRules": [
                {
                    "Protocol": "TCP",
                    "Port": new_port,
                    "CidrBlock": ip,
                    "Action": "ACCEPT"
                }
            ]
        }
        req.from_json_string(json.dumps(params))

        # 返回的resp是一个CreateFirewallRulesResponse的实例,与请求对象对应
        resp = client.CreateFirewallRules(req)
        # 输出json格式的字符串回包
        print(resp.to_json_string())

    except TencentCloudSDKException as err:
        print(err)
def add_list():
    try:
        # 实例化一个http选项,可选的,没有特殊需求可以跳过
        httpProfile = HttpProfile()
        httpProfile.endpoint = "lighthouse.tencentcloudapi.com"

        # 实例化一个client选项,可选的,没有特殊需求可以跳过
        clientProfile = ClientProfile()
        clientProfile.httpProfile = httpProfile
        # 实例化要请求产品的client对象,clientProfile是可选的
        client = lighthouse_client.LighthouseClient(cred, TencentZone, clientProfile)

        # 实例化一个请求对象,每个接口都会对应一个request对象
        req = models.DescribeFirewallRulesRequest()
        params = {
            "InstanceId": Serverid
        }
        req.from_json_string(json.dumps(params))

        # 返回的resp是一个DescribeFirewallRulesResponse的实例,与请求对象对应
        resp = client.DescribeFirewallRules(req)
        # # 输出json格式的字符串回包
        # print(resp.to_json_string())
        # 将对象转换为JSON字符串
        # 将对象转换为JSON字符串
        resp_str = resp.to_json_string()

        # 解析JSON字符串为字典
        resp_dict = json.loads(resp_str)

        # 只取需要的字段
        selected_fields = [{"CidrBlock": rule["CidrBlock"], "Protocol": rule["Protocol"], "Port": rule.get("Port"),
                            "Action": rule["Action"]} for rule in resp_dict["FirewallRuleSet"]]
        # 格式化输出为表格
        table = tabulate(selected_fields, headers="keys", tablefmt="orgtbl")
        # 输出结果
        print(table)

    except TencentCloudSDKException as err:
        print(err)
def local_ip():
    url = "https://www.ipuu.net/ipuu/user/getIP"
    response = requests.get(url)
    data = response.json()
    return data["data"]
def caidan():
    print("""---------------------------------------
         欢迎进入网站维护程序
---------------------------------------
    1 允许管理登录测试网站服务器SSH
    2 服务器开放端口
    3 显示服务器端口
    4 退出程序
---------------------------------------""")
print(f"本地公网IP:{local_ip()}")
def run_start():
    while True:
        caidan()
        try:
            num = int(input("请输入你选择的功能序号:"))
            if  num == 1 :
                print(f"本地公网IP:{local_ip()}")
                ip = input("请输入你要添加访问的IP地址,回车默认本地公网IP地址:") or local_ip()
                open_port = input("请输入要开放的TCP端口,默认22:") or "22"
                add_local(ip,open_port)
                add_https()
                add_https(ip=ip,port="8888")
                # add_https(ip=ip, port="80")

            elif num == 2:
                print(f"本地公网IP:{local_ip()}")
                ip = input("请输入你要添加访问的IP地址,回车默认本地公网IP地址:") or local_ip()
                open_port = input("请输入要开放的TCP端口:")
                add_new(ip, open_port)
            elif num == 3:
                add_list()

            elif num == 4:
                exit()
            else:
                print("功能选项不存在,退出程序,若需再次调式,请重新运行")
                exit()
        except ValueError:
            print("输入错误,请重新运行程序")
def user_log(user, passwd):
    if user == "admin" and passwd == "123456":
        print("登录成功")
        return True
    else:
        print("登录失败,请重新登录")
        return False
def login():
    print("运行本程序前,需要进行登录")
    while True:
        user = input("请输入登录用户名:")
        passwd = input("请输入登录密码:")
        if user_log(user, passwd):
            run_start()
            break
if __name__ == '__main__':
    login()

交互式运行显示如下:

0 人点赞