为什么要扫描子域名 有时候在渗透前,只给了一个主域名地址甚至只给了一家公司|机构的名字,如baidu.com,主域名的安全防护措施一般做的比较好,并且单从这一个主域名我们很难发现网站的漏洞。而子域名也属于目标的资产之一,为了扩大攻击面,我们可以对子域名上的网站进行渗透测试,从而挖掘出漏洞,拿下目标。
子域名扫描原理 不知道多少小伙伴跟我一样,从一开始就对子域名扫描原理存在误解。一开始学会目录扫描之后,就以为子域名扫描和目录扫描的原理是一样的。也是对目标轮询来得到结果。后来才发现其实不然,最简单的子域名扫描只要轮询DNS服务器就可以得到结果,而无需直接请求目标服务器。
那么先来说一下轮询DNS服务器获取子域名的具体步骤。
这里使用了aiodns这个库作为演示,这个库支持协程异步查询,所以查询速度会比较快。下面是一个aiodns查询A记录的例子,非常简洁明了,看过前面协程文章的就应该看得懂,首先创建一个事件流和一个dns解析器,然后将查询函数加入到事件流中,等到完成整个dns查询后再取回结果。
1 2 3 4 5 6 7 8 9 10 11 12 import asyncioimport aiodns loop = asyncio.get_event_loop() resolver = aiodns.DNSResolver(loop=loop)async def query (name, query_type ): return await resolver.query(name, query_type) coro = query('baidu.com' , 'A' ) result = loop.run_until_complete(coro)print (result)
运行结果:
1 [<ares_query_a_result> host=220.181 .38 .148 , ttl=0 , <ares_query_a_result> host=39.156 .69 .79 , ttl=0 ]
subDomainBrute源码分析 在说到子域名扫描工具时就不得不提到一位国人写的工具,虽然获取子域名的方式比较单一,即轮询DNS服务器判断子域名是否存在,并没有采用搜索引擎分析,也没有采用分析证书域得到子域名,但也不失为值得学习的好工具。
作者为Lijiejie,目前我分析的工具版本是1.4版本,也就是当前的最新版
我们先来看一下subDomainsBrute这个工具的文件目录
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 │ .gitignore │ README.md │ screenshot.png │ subDomainsBrute.py │ ├─dict │ dns_servers.txt │ next_sub.txt │ next_sub_full.txt │ subnames.txt │ subnames_all_5_letters.txt │ subnames_full.txt │ └─lib cmdline.py common.py common_py2.py common_py3.py consle_width.py scanner_py2.py scanner_py3.py __init__.py
可以很清晰看到,主文件只有一个subDomainsBrute.py,dict文件夹存放的是字典文件,lib文件夹存放依赖文件。而我们使用的是Python3,所以只需分析主文件subDomainsBrute.py和lib中的cmdline.py、common.py、common_py3.py、consle_width.py和scanner_py3.py即可,Python2版本的文件也大同小异。
入口 首先我们从主文件subDomainsBrute.py入手。这个文件主要是做一些整体的函数调用。
如在主线程检测用户的中断:
1 2 3 4 5 def run_process (*params ): signal.signal(signal.SIGINT, user_abort) s = SubNameBrute(*params) s.run()
定义一些初始化变量:
1 2 3 4 5 6 7 8 9 10 dns_servers = load_dns_servers() next_subs = load_next_sub(options) scan_count = multiprocessing.Value('i' , 0 ) found_count = multiprocessing.Value('i' , 0 ) queue_size_array = multiprocessing.Array('i' , options.process)
多进程实现程序并发扫描:
1 2 3 4 5 6 7 for process_num in range (options.process): p = multiprocessing.Process(target=run_process, args=(domain, options, process_num, dns_servers, next_subs, scan_count, found_count, queue_size_array, tmp_dir) ) all_process.append(p) p.start()
以及扫描结果的输出与写入:
1 2 3 4 5 6 7 8 with open (out_file_name, 'w' ) as f: for _file in glob.glob(tmp_dir + '/*.txt' ): with open (_file, 'r' ) as tmp_f: for domain in tmp_f: if domain not in all_domains: domain_count += 1 all_domains.add(domain) f.write(domain)
公共函数 接下来我们分析一下公共函数模块common_py3.py中的函数
test_server_python3这个函数首先用正确的域名去测试一台DNS能不能发出正确的响应,其次用一个错误的域名去测试DNS,看能不能得到响应,如果不存在的域名也能被解析,则说明从这台DNS服务器得到的数据也是不可靠的。这点细节做的比较好,有些扫描器只检测DNS服务器能不能给出正确的响应,而不进行错误检测。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 async def test_server_python3 (server, dns_servers ): resolver = aiodns.DNSResolver() try : resolver.nameservers = [server] answers = await resolver.query('public-dns-a.baidu.com' , 'A' ) if answers[0 ].host != '180.76.76.76' : raise Exception('Incorrect DNS response' ) try : await resolver.query('test.bad.dns.lijiejie.com' , 'A' ) with open ('bad_dns_servers.txt' , 'a' ) as f: f.write(server + '\n' ) print_msg('[+] Bad DNS Server found %s' % server) except Exception as e: dns_servers.append(server) print_msg('[+] Server %s < OK > Found %s' % (server.ljust(16 ), len (dns_servers))) except Exception as e: print_msg('[+] Server %s <Fail> Found %s' % (server.ljust(16 ), len (dns_servers)))
加载dns服务器和字典文件就不说了,太简单了,直接读文件就可以
接下来是async_wildcard_test这个函数,用于泛解析域名的检测。我们知道,如果一个域名为泛解析域名,那么不管我们查询任何的子域名,都会被成功解析并正确响应。这里的识别办法是,通过查询一个几乎不可能存在的子域名lijiejie-not-existed-test,如果得到了响应,那就说明这个域名是做了泛解析。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 async def async_wildcard_test (domain, dns_servers, level=1 ): try : r = aiodns.DNSResolver() r.nameservers = dns_servers answers = await r.query('lijiejie-not-existed-test.%s' % domain, 'A' ) ips = ', ' .join(sorted ([answer.host for answer in answers])) if level == 1 : print ('any-sub.%s\t%s' % (domain.ljust(30 ), ips)) await async_wildcard_test('any-sub.%s' % domain, dns_servers, 2 ) elif level == 2 : print ('\nUse -w to enable force scan wildcard domain' ) sys.exit(0 ) except Exception as e: return domain
核心模块——扫描 接下来就是对这个项目中最核心的部分——扫描部分scanner_py3.py进行源码分析
整个文件其实就一个类SubNameBrute,里面有两个函数分别是load_sub_names()和scan(),结构很清晰,前者从字典中加载子域名,后者负责轮询扫描
看得出来,先从子域名字典中读取子域名添加进变量normal_lines,然后对全量暴力破解(如无脑五位a-z而不是根据常用关键字如www、vpn)添加进变量wildcard_lines,当然中间还有一些去重之类的操作就不细说了。最后是手动为每个进程的队列分配的任务量,感觉其实duck不必,直接用进程池就可以搞定了。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 async def load_sub_names (self ): normal_lines = [] wildcard_lines = [] wildcard_set = set () regex_list = [] lines = set () with open (self.options.file) as inFile: for line in inFile.readlines(): sub = line.strip() if not sub or sub in lines: continue lines.add(sub) brace_count = sub.count('{' ) if brace_count > 0 : wildcard_lines.append((brace_count, sub)) sub = sub.replace('{alphnum}' , '[a-z0-9]' ) sub = sub.replace('{alpha}' , '[a-z]' ) sub = sub.replace('{num}' , '[0-9]' ) if sub not in wildcard_set: wildcard_set.add(sub) regex_list.append('^' + sub + '$' ) else : normal_lines.append(sub) self.normal_names_set.add(sub) if regex_list: pattern = '|' .join(regex_list) _regex = re.compile (pattern) for line in normal_lines: if _regex.search(line): normal_lines.remove(line) for _ in normal_lines[self.process_num::self.options.process]: await self.queue.put((0 , _)) for _ in wildcard_lines[self.process_num::self.options.process]: await self.queue.put(_)
最后就是scan()函数的分析:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 async def scan (self, j ): self.resolvers[j].nameservers = [self.dns_servers[j % self.dns_count]] if self.dns_count > 1 : while True : s = random.choice(self.resolvers) if s != self.dns_servers[j % self.dns_count]: self.resolvers[j].nameservers.append(s) break while True : try : if time.time() - self.count_time > 1.0 : async with self.lock: self.scan_count.value += self.scan_count_local self.scan_count_local = 0 self.queue_size_array[self.process_num] = self.queue.qsize() if self.found_count_local: self.found_count.value += self.found_count_local self.found_count_local = 0 self.count_time = time.time() try : brace_count, sub = self.queue.get_nowait() self.threads_status[j] = '1' except asyncio.queues.QueueEmpty as e: self.threads_status[j] = '0' await asyncio.sleep(0.5 ) if '1' not in self.threads_status: break else : continue if brace_count > 0 : brace_count -= 1 if sub.find('{next_sub}' ) >= 0 : for _ in self.next_subs: await self.queue.put((0 , sub.replace('{next_sub}' , _))) if sub.find('{alphnum}' ) >= 0 : for _ in 'abcdefghijklmnopqrstuvwxyz0123456789' : await self.queue.put((brace_count, sub.replace('{alphnum}' , _, 1 ))) elif sub.find('{alpha}' ) >= 0 : for _ in 'abcdefghijklmnopqrstuvwxyz' : await self.queue.put((brace_count, sub.replace('{alpha}' , _, 1 ))) elif sub.find('{num}' ) >= 0 : for _ in '0123456789' : await self.queue.put((brace_count, sub.replace('{num}' , _, 1 ))) continue except Exception as e: import traceback print (traceback.format_exc()) break try : if sub in self.found_subs: continue self.scan_count_local += 1 cur_domain = sub + '.' + self.domain answers = await self.resolvers[j].query(cur_domain, 'A' ) if answers: self.found_subs.add(sub) ips = ', ' .join(sorted ([answer.host for answer in answers])) if ips in ['1.1.1.1' , '127.0.0.1' , '0.0.0.0' , '0.0.0.1' ]: continue if self.options.i and is_intranet(answers[0 ].host): continue try : self.scan_count_local += 1 answers = await self.resolvers[j].query(cur_domain, 'CNAME' ) cname = answers[0 ].target.to_unicode().rstrip('.' ) if cname.endswith(self.domain) and cname not in self.found_subs: cname_sub = cname[:len (cname) - len (self.domain) - 1 ] if cname_sub not in self.normal_names_set: self.found_subs.add(cname) await self.queue.put((0 , cname_sub)) except Exception as e: pass first_level_sub = sub.split('.' )[-1 ] max_found = 20 if self.options.w: first_level_sub = '' max_found = 3 if (first_level_sub, ips) not in self.ip_dict: self.ip_dict[(first_level_sub, ips)] = 1 else : self.ip_dict[(first_level_sub, ips)] += 1 if self.ip_dict[(first_level_sub, ips)] > max_found: continue self.found_count_local += 1 self.outfile.write(cur_domain.ljust(30 ) + '\t' + ips + '\n' ) self.outfile.flush() try : self.scan_count_local += 1 await self.resolvers[j].query('lijiejie-test-not-existed.' + cur_domain, 'A' ) except aiodns.error.DNSError as e: if e.args[0 ] in [4 ]: if self.queue.qsize() < 50000 : for _ in self.next_subs: await self.queue.put((0 , _ + '.' + sub)) else : await self.queue.put((1 , '{next_sub}.' + sub)) except Exception as e: pass except aiodns.error.DNSError as e: if e.args[0 ] in [1 , 4 ]: pass elif e.args[0 ] in [11 , 12 ]: self.timeout_subs[sub] = self.timeout_subs.get(sub, 0 ) + 1 if self.timeout_subs[sub] <= 1 : await self.queue.put((0 , sub)) else : print (e) except asyncio.TimeoutError as e: pass except Exception as e: import traceback traceback.print_exc() with open ('errors.log' , 'a' ) as errFile: errFile.write('[%s] %s\n' % (type (e), str (e)))
首先是随机选择了一个DNS解析器,然后的一大段代码都在处理a五位a-z的爆破。接下来就是正式的查询步骤
1 answers = await self.resolvers[j].query(cur_domain, 'A' )
通过这条语句使用aiodns查询了域名的A记录,如果有结果,再查询该域名的CNAME记录,从CNAME中反查出子域名加入到结果中。
1 answers = await self.resolvers[j].query(cur_domain, 'CNAME' )
关于各种DNS查询类型,可以自己百度,这里就不做更多展开了。接下来似乎是在处理泛解析的问题,很迷惑这里怎么又写了一遍。最后就是在处理aiodns抛出的异常的代码,如DNS中没有对应的子域名,超时等问题。
总结一下subDomainBrute的整个程序流程:
graph LR
a0(加载子域名字典)-->a1(DNS可靠性检测优选)-->a2(泛解析域名检测)-->a3(DNS轮询暴破子域名)-->a4(CNAME记录反查子域名)-->a5(输出结果)
总结 通过本次源码分析,学到了:
总的来说这是一款不错的子域名扫描器,速度快也较为精准,但仍然存在可以优化的空间。