1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98
| import os import time import asyncio import aiohttp
async def fetch(session, url): headers = { "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.9", "Accept-Encoding": "gzip, deflate, br", "Accept-Language": "zh-CN,zh;q=0.9", "Connection": "keep-alive", "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/90.0.4430.72 Safari/537.36", } try: async with session.head(url, headers=headers, timeout=30) as res: return [url, res.status] except aiohttp.client_exceptions.ClientConnectorError: pass except asyncio.exceptions.TimeoutError: pass except Exception as e: print(e)
async def check_alive(urls): async with aiohttp.ClientSession() as session: tasks = [asyncio.ensure_future(fetch(session, url)) for url in urls] results = await asyncio.gather(*tasks) return [i[0] for i in list(filter(None, results))]
async def scan(urls, callback=None): async with aiohttp.ClientSession() as session: tasks = [asyncio.ensure_future(fetch(session, url)) for url in urls] if callback: for task in tasks: task.add_done_callback(callback) results = await asyncio.gather(*tasks) return [i for i in list(filter(None, results))]
def logger_print(url): if LOG_PRINT_FLAG: print('正在扫描:',url.result())
def logger_write(log_dir, log_content): log_filename = str(int(time.time()))+'.log' with open(os.path.join(log_dir, log_filename), 'w+') as f: f.write(log_content)
def get_urls_li(csv_path): urls_li = [] with open(csv_path, 'r') as csv: for line in csv.readlines()[1:]: url = line.split(';')[0] urls_li.append(url) return urls_li
def get_dics_li(dic_path): with open(dic_path, 'r') as dic: return [x.strip('\n') for x in dic.readlines()]
async def main(csv_path, dic_path, log_dir): print('正在检测目标IP是否存活...') urls = [] urls_li = get_urls_li(csv_path) dics_li = get_dics_li(dic_path) urls_alive = await check_alive(urls_li) print("共发现{}个存活IP".format(len(urls_alive)))
for url in urls_alive: for dic in dics_li: urls.append(os.path.join(url, dic)) start = time.time() urls_scan = await scan(urls, logger_print) print("扫描完成,共耗时: %.2f 秒" % (time.time() - start))
log_content = '' for url in urls_scan: log_content += '{0} {1}\n'.format(url[0], url[1]) logger_write(log_dir,log_content)
def run(csv_path, dic_path, log_dir): asyncio.run(main(csv_path, dic_path, log_dir))