python-concurrent.futures模块

2024-07-11 10:04:05 浏览数 (3)

concurrent.futures模块包含了threading和multiprocessing,及多线程和多进程功能。最近也是在写自己的信息收集和漏洞扫描程序。所以对concurrent.futures模块使用的比较多,这里也是想记录下。对于任务量不大的cpu和io运算,我一般只用threading来处理,而遇到大批量复杂的任务我会用多进程 多线程同时处理,比如在我最近开发的程序中要对大批量IP进行全端口扫描,就非常适合这种模式,运行效率很nice~

concurrent.futures多线程使用

多线程代码示例,不说废话,直接看代码

代码语言:javascript复制
import concurrent.futures
import time

def doTask(T):
	print(T)
	time.sleep(1)

def threadingTask():
	taskList = []
	with concurrent.futures.ThreadPoolExecutor(max_workers=10) as executor:
		for i in range(1,10):
			task = executor.submit(doTask,i)
			taskList.append(task)
		for res in concurrent.futures.as_completed(taskList):
			getResult = res.result()
			
if __name__ == "__main__":
    threadingTask()

代码非常简单,一眼就食之无味了~

concurrent.futures下的多进程 多线程使用

代码示列,这段代码是我本来想写到程序里面的端口扫描模块,十几秒的时间发送6万多个包误差较大,准确性也不如nmap,后来放弃了。无奈只能在程序里面调用nmap来进行端口扫描。真是无奈的选择……但是经过测试,这段多进程 多线程的的模式用来扫5000以内的端口量那速度是又快又准。大概1-3秒就可以出结果。配置是4进程 20000线程。

代码语言:javascript复制
import socket
import concurrent.futures

# 存活探测
def alive_scan(ip):
    port_list = [22, 23, 53, 80, 443, 3389, 10022]
    port_len = len(port_list)
    num = 0
    for port in port_list:
        try:
            sock = socket.socket(socket.AF_INET,socket.SOCK_STREAM)
            sock.settimeout(1)
            sock.connect((ip, port))
            sock.close()
            return True
        except socket.error:
            num  = 1
            if num == port_len:
                return False
            else:
                continue

# 全端口探测
# 利用socket tcp进行探测端口存活
def all_port_scan(ip,port,live_port):
    try:
        sock = socket.socket(socket.AF_INET,socket.SOCK_STREAM)
        sock_https = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        sock.settimeout(3)
        sock.connect((ip, port))
        sock_https.connect((ip,port))
        sock.close()
        live_port.append(port)
        return live_port
    except socket.error as error:
        # print(error)
        # print(port)
        return 1

# 线程调度模块
def threadMode(ip):
    thread_list = []
    live_port = []
    num = 0
    with concurrent.futures.ThreadPoolExecutor(max_workers=20000) as executor:
        print("目标:{},端口探测中".format(ip))
		# 这里选择探测端口的范围,这里也可以改成列表形式,自定义扫描端口。当然也可以改成(1,65536)进行全端口探测,只是发包太大,结果失真。
        for port in range(1,6000):
            # num =1
            # if num in [5000,10000,15000,20000,25000,30000,35000,40000,45000,50000,55000,60000]:
            #     time.sleep(3)
            thread_task = executor.submit(all_port_scan,ip,port,live_port)
            thread_list.append(thread_task)
        for res in concurrent.futures.as_completed(thread_list):
            result = res.result()
            if result != 0 and result != 1:
                # 返回存活端口列表
                return result
            else:
                return 1

# 进程,线程之间的调度逻辑中心模块
def power_control_mode(ip_list):
    process_list = []
	# 这里通过concurrent.futures创建四个进程,每个进程在分配到各自需要探测的IP后后会传入多线程模块,启动多线程~
    with concurrent.futures.ProcessPoolExecutor(max_workers=4) as process_executor:
        for ip in ip_list:
            process_task = process_executor.submit(threadMode,ip)
            process_list.append(process_task)
        for res in concurrent.futures.as_completed(process_list):
            port_live = res.result()
            if port_live != 1:
                print("存活端口列表")
                print(port_live)
            else:
                pass


if __name__ == "__main__":
    iplist = ["1.1.1.1"]
    live_ip = []
    for i in iplist:
        status = alive_scan(i)
        print("目标:{}:{}".format(i,status))
        live_ip.append(i)
        if status:
            power_control_mode(live_ip)

OVER~

0 人点赞