python协程扫描器实现

时隔一个月,我又回来了

最近hw,懂得都懂。闲的时候用python的协程实现了一款高速扫描器,用来扫描资产的敏感文件。当然扫描别的啥都行,看你怎么改。这个扫描器只是自己写的一个爬虫框架的一款插件,代码仅供参考学习。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
import os
import time
import asyncio
import aiohttp

LOG_PRINT_FLAG = 0 # 是否在控制台打印扫描日志

async def fetch(session, url):
headers = {
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.9",
"Accept-Encoding": "gzip, deflate, br",
"Accept-Language": "zh-CN,zh;q=0.9",
"Connection": "keep-alive",
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/90.0.4430.72 Safari/537.36",
}
try:
async with session.head(url, headers=headers, timeout=30) as res:
return [url, res.status]
except aiohttp.client_exceptions.ClientConnectorError:
# print('访问失败:{}'.format(url))
pass
except asyncio.exceptions.TimeoutError:
# print('访问超时:{}'.format(url))
pass
except Exception as e:
print(e)

async def check_alive(urls):
# 检测目标主机是否存活,只返回响应结果为200的目标
async with aiohttp.ClientSession() as session:
tasks = [asyncio.ensure_future(fetch(session, url)) for url in urls]
results = await asyncio.gather(*tasks)
return [i[0] for i in list(filter(None, results))] #过滤掉None

async def scan(urls, callback=None):
async with aiohttp.ClientSession() as session:
tasks = [asyncio.ensure_future(fetch(session, url)) for url in urls]
if callback:
for task in tasks:
task.add_done_callback(callback)
results = await asyncio.gather(*tasks)
return [i for i in list(filter(None, results))]

def logger_print(url):
# 在控制台打印日志
if LOG_PRINT_FLAG:
print('正在扫描:',url.result())

def logger_write(log_dir, log_content):
# 写入日志
log_filename = str(int(time.time()))+'.log'
with open(os.path.join(log_dir, log_filename), 'w+') as f:
f.write(log_content)

def get_urls_li(csv_path):
# 从csv中获取目标url的list
urls_li = []
with open(csv_path, 'r') as csv:
for line in csv.readlines()[1:]:
url = line.split(';')[0]
urls_li.append(url)
return urls_li

def get_dics_li(dic_path):
# 获取字典
with open(dic_path, 'r') as dic:
return [x.strip('\n') for x in dic.readlines()]

async def main(csv_path, dic_path, log_dir):
# 分为三个模块,先检测存活,再批量扫描,最后写入日志
# 检测存活
print('正在检测目标IP是否存活...')
urls = []
urls_li = get_urls_li(csv_path)
dics_li = get_dics_li(dic_path)
urls_alive = await check_alive(urls_li)
print("共发现{}个存活IP".format(len(urls_alive)))

# 扫描
for url in urls_alive:
for dic in dics_li:
urls.append(os.path.join(url, dic))
start = time.time()
urls_scan = await scan(urls, logger_print)
print("扫描完成,共耗时: %.2f 秒" % (time.time() - start))

# 记录日志
log_content = ''
for url in urls_scan:
log_content += '{0} {1}\n'.format(url[0], url[1])
logger_write(log_dir,log_content)

def run(csv_path, dic_path, log_dir):
# 程序入口
asyncio.run(main(csv_path, dic_path, log_dir))


run('./result/fofa.csv',dic_path='./dic/bak.txt',log_dir='./log/')

速度的话还不错,近2000个请求用了8秒多,差不多每秒处理250个请求,要是再加上多进程速度还能翻几倍,但我懒得加了,以后有时间再看

性能

刚开始写的那一版有bug,扫描的时候会漏掉很多url,找了好几天的bug没想出为什么,可能我代码写的有问题。后来看了这篇文章《【并发编程】深入异步IO框架:asyncio 中篇》,重新写了一遍扫描器,思路清晰了很多,建议把上中下都看一下,写的很不错。u1s1,比廖雪峰的清晰 :)

其实还看到一篇在湾湾的哥们写的文章,真的看的我笑到不行惹,让你也笑一下aiohttp 的 connection pool,国内看不了medium,要挂梯子哦

bye~


python协程扫描器实现
https://wanf3ng.github.io/2021/04/20/python协程扫描器实现/
作者
wanf3ng
发布于
2021年4月20日
许可协议