子域名扫描原理与subDomainsBrute源码解析

为什么要扫描子域名

有时候在渗透前,只给了一个主域名地址甚至只给了一家公司|机构的名字,如baidu.com,主域名的安全防护措施一般做的比较好,并且单从这一个主域名我们很难发现网站的漏洞。而子域名也属于目标的资产之一,为了扩大攻击面,我们可以对子域名上的网站进行渗透测试,从而挖掘出漏洞,拿下目标。

子域名扫描原理

不知道多少小伙伴跟我一样,从一开始就对子域名扫描原理存在误解。一开始学会目录扫描之后,就以为子域名扫描和目录扫描的原理是一样的。也是对目标轮询来得到结果。后来才发现其实不然,最简单的子域名扫描只要轮询DNS服务器就可以得到结果,而无需直接请求目标服务器。

那么先来说一下轮询DNS服务器获取子域名的具体步骤。

这里使用了aiodns这个库作为演示,这个库支持协程异步查询,所以查询速度会比较快。下面是一个aiodns查询A记录的例子,非常简洁明了,看过前面协程文章的就应该看得懂,首先创建一个事件流和一个dns解析器,然后将查询函数加入到事件流中,等到完成整个dns查询后再取回结果。

1
2
3
4
5
6
7
8
9
10
11
12
import asyncio
import aiodns

loop = asyncio.get_event_loop()
resolver = aiodns.DNSResolver(loop=loop)

async def query(name, query_type):
return await resolver.query(name, query_type)

coro = query('baidu.com', 'A')
result = loop.run_until_complete(coro)
print(result)

运行结果:

1
[<ares_query_a_result> host=220.181.38.148, ttl=0, <ares_query_a_result> host=39.156.69.79, ttl=0]

subDomainBrute源码分析

在说到子域名扫描工具时就不得不提到一位国人写的工具,虽然获取子域名的方式比较单一,即轮询DNS服务器判断子域名是否存在,并没有采用搜索引擎分析,也没有采用分析证书域得到子域名,但也不失为值得学习的好工具。

作者为Lijiejie,目前我分析的工具版本是1.4版本,也就是当前的最新版

我们先来看一下subDomainsBrute这个工具的文件目录

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
.gitignore
│ README.md
│ screenshot.png
│ subDomainsBrute.py

├─dict
│ dns_servers.txt
│ next_sub.txt
│ next_sub_full.txt
│ subnames.txt
│ subnames_all_5_letters.txt
│ subnames_full.txt

└─lib
cmdline.py
common.py
common_py2.py
common_py3.py
consle_width.py
scanner_py2.py
scanner_py3.py
__init__.py

可以很清晰看到,主文件只有一个subDomainsBrute.py,dict文件夹存放的是字典文件,lib文件夹存放依赖文件。而我们使用的是Python3,所以只需分析主文件subDomainsBrute.py和lib中的cmdline.py、common.py、common_py3.py、consle_width.py和scanner_py3.py即可,Python2版本的文件也大同小异。

入口

首先我们从主文件subDomainsBrute.py入手。这个文件主要是做一些整体的函数调用。

如在主线程检测用户的中断:

1
2
3
4
5
def run_process(*params):
signal.signal(signal.SIGINT, user_abort) # 检测来自键盘的中断 Ctrl+C
s = SubNameBrute(*params)
s.run()

定义一些初始化变量:

1
2
3
4
5
6
7
8
9
10
# 加载可用的dns服务器
dns_servers = load_dns_servers()
# 加载子域名
next_subs = load_next_sub(options)
# 初始化扫描数
scan_count = multiprocessing.Value('i', 0)
# 初始化发现数
found_count = multiprocessing.Value('i', 0)
# 初始化进程队列长度,就是进程数
queue_size_array = multiprocessing.Array('i', options.process)

多进程实现程序并发扫描:

1
2
3
4
5
6
7
for process_num in range(options.process):
p = multiprocessing.Process(target=run_process,
args=(domain, options, process_num, dns_servers, next_subs,
scan_count, found_count, queue_size_array, tmp_dir)
)
all_process.append(p)
p.start()

以及扫描结果的输出与写入:

1
2
3
4
5
6
7
8
with open(out_file_name, 'w') as f:
for _file in glob.glob(tmp_dir + '/*.txt'):
with open(_file, 'r') as tmp_f:
for domain in tmp_f:
if domain not in all_domains:
domain_count += 1
all_domains.add(domain) # cname query can result in duplicated domains
f.write(domain)

公共函数

接下来我们分析一下公共函数模块common_py3.py中的函数

test_server_python3这个函数首先用正确的域名去测试一台DNS能不能发出正确的响应,其次用一个错误的域名去测试DNS,看能不能得到响应,如果不存在的域名也能被解析,则说明从这台DNS服务器得到的数据也是不可靠的。这点细节做的比较好,有些扫描器只检测DNS服务器能不能给出正确的响应,而不进行错误检测。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
async def test_server_python3(server, dns_servers):
resolver = aiodns.DNSResolver()
try: # 测试dns服务器是否能得到正确的响应
resolver.nameservers = [server]
answers = await resolver.query('public-dns-a.baidu.com', 'A') # an existed domain
if answers[0].host != '180.76.76.76':
raise Exception('Incorrect DNS response')
try: # 测试服务器对错误域名的解析能力,如果不存在的域名也能解析成功,这样的DNS服务器不可靠,是无法使用的
await resolver.query('test.bad.dns.lijiejie.com', 'A') # non-existed domain
with open('bad_dns_servers.txt', 'a') as f:
f.write(server + '\n')
print_msg('[+] Bad DNS Server found %s' % server)
except Exception as e:
# 两个测试都通过就加入到可用dns服务器列表
dns_servers.append(server)
print_msg('[+] Server %s < OK > Found %s' % (server.ljust(16), len(dns_servers)))
except Exception as e:
print_msg('[+] Server %s <Fail> Found %s' % (server.ljust(16), len(dns_servers)))

加载dns服务器和字典文件就不说了,太简单了,直接读文件就可以

接下来是async_wildcard_test这个函数,用于泛解析域名的检测。我们知道,如果一个域名为泛解析域名,那么不管我们查询任何的子域名,都会被成功解析并正确响应。这里的识别办法是,通过查询一个几乎不可能存在的子域名lijiejie-not-existed-test,如果得到了响应,那就说明这个域名是做了泛解析。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
# 异步泛解析检查,有的域名能解析任意域名
async def async_wildcard_test(domain, dns_servers, level=1):
try:
r = aiodns.DNSResolver()
r.nameservers = dns_servers
answers = await r.query('lijiejie-not-existed-test.%s' % domain, 'A')
ips = ', '.join(sorted([answer.host for answer in answers]))
if level == 1:
print('any-sub.%s\t%s' % (domain.ljust(30), ips))
await async_wildcard_test('any-sub.%s' % domain, dns_servers, 2)
elif level == 2:
print('\nUse -w to enable force scan wildcard domain')
sys.exit(0)
except Exception as e:
return domain

核心模块——扫描

接下来就是对这个项目中最核心的部分——扫描部分scanner_py3.py进行源码分析

整个文件其实就一个类SubNameBrute,里面有两个函数分别是load_sub_names()scan(),结构很清晰,前者从字典中加载子域名,后者负责轮询扫描

看得出来,先从子域名字典中读取子域名添加进变量normal_lines,然后对全量暴力破解(如无脑五位a-z而不是根据常用关键字如www、vpn)添加进变量wildcard_lines,当然中间还有一些去重之类的操作就不细说了。最后是手动为每个进程的队列分配的任务量,感觉其实duck不必,直接用进程池就可以搞定了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
async def load_sub_names(self):
normal_lines = []
wildcard_lines = []
wildcard_set = set()
regex_list = []
lines = set()
with open(self.options.file) as inFile:
for line in inFile.readlines():
sub = line.strip()
if not sub or sub in lines:
continue
lines.add(sub)

brace_count = sub.count('{')
if brace_count > 0:
wildcard_lines.append((brace_count, sub))
sub = sub.replace('{alphnum}', '[a-z0-9]')
sub = sub.replace('{alpha}', '[a-z]')
sub = sub.replace('{num}', '[0-9]')
if sub not in wildcard_set:
wildcard_set.add(sub)
regex_list.append('^' + sub + '$')
else:
normal_lines.append(sub)
self.normal_names_set.add(sub)

if regex_list:
pattern = '|'.join(regex_list)
_regex = re.compile(pattern)
for line in normal_lines:
if _regex.search(line):
normal_lines.remove(line)

for _ in normal_lines[self.process_num::self.options.process]:
await self.queue.put((0, _)) # priority set to 0
for _ in wildcard_lines[self.process_num::self.options.process]:
await self.queue.put(_)

最后就是scan()函数的分析:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
async def scan(self, j):
self.resolvers[j].nameservers = [self.dns_servers[j % self.dns_count]]
if self.dns_count > 1:
while True:
s = random.choice(self.resolvers)
if s != self.dns_servers[j % self.dns_count]:
self.resolvers[j].nameservers.append(s)
break
while True:
try:
if time.time() - self.count_time > 1.0:
async with self.lock:
self.scan_count.value += self.scan_count_local
self.scan_count_local = 0
self.queue_size_array[self.process_num] = self.queue.qsize()
if self.found_count_local:
self.found_count.value += self.found_count_local
self.found_count_local = 0
self.count_time = time.time()

try:
brace_count, sub = self.queue.get_nowait()
self.threads_status[j] = '1'
except asyncio.queues.QueueEmpty as e:
self.threads_status[j] = '0'
await asyncio.sleep(0.5)
if '1' not in self.threads_status:
break
else:
continue

if brace_count > 0:
brace_count -= 1
if sub.find('{next_sub}') >= 0:
for _ in self.next_subs:
await self.queue.put((0, sub.replace('{next_sub}', _)))
if sub.find('{alphnum}') >= 0:
for _ in 'abcdefghijklmnopqrstuvwxyz0123456789':
await self.queue.put((brace_count, sub.replace('{alphnum}', _, 1)))
elif sub.find('{alpha}') >= 0:
for _ in 'abcdefghijklmnopqrstuvwxyz':
await self.queue.put((brace_count, sub.replace('{alpha}', _, 1)))
elif sub.find('{num}') >= 0:
for _ in '0123456789':
await self.queue.put((brace_count, sub.replace('{num}', _, 1)))
continue
except Exception as e:
import traceback
print(traceback.format_exc())
break

try:

if sub in self.found_subs:
continue

self.scan_count_local += 1
cur_domain = sub + '.' + self.domain
# print('Query %s' % cur_domain)
answers = await self.resolvers[j].query(cur_domain, 'A')

if answers:
self.found_subs.add(sub)
ips = ', '.join(sorted([answer.host for answer in answers]))
if ips in ['1.1.1.1', '127.0.0.1', '0.0.0.0', '0.0.0.1']:
continue
if self.options.i and is_intranet(answers[0].host):
continue

try:
self.scan_count_local += 1
answers = await self.resolvers[j].query(cur_domain, 'CNAME')
cname = answers[0].target.to_unicode().rstrip('.')
if cname.endswith(self.domain) and cname not in self.found_subs:
cname_sub = cname[:len(cname) - len(self.domain) - 1] # new sub
if cname_sub not in self.normal_names_set:
self.found_subs.add(cname)
await self.queue.put((0, cname_sub))
except Exception as e:
pass

first_level_sub = sub.split('.')[-1]
max_found = 20

if self.options.w:
first_level_sub = ''
max_found = 3

if (first_level_sub, ips) not in self.ip_dict:
self.ip_dict[(first_level_sub, ips)] = 1
else:
self.ip_dict[(first_level_sub, ips)] += 1
if self.ip_dict[(first_level_sub, ips)] > max_found:
continue

self.found_count_local += 1

self.outfile.write(cur_domain.ljust(30) + '\t' + ips + '\n')
self.outfile.flush()
try:
self.scan_count_local += 1
await self.resolvers[j].query('lijiejie-test-not-existed.' + cur_domain, 'A')
except aiodns.error.DNSError as e:
if e.args[0] in [4]:
if self.queue.qsize() < 50000:
for _ in self.next_subs:
await self.queue.put((0, _ + '.' + sub))
else:
await self.queue.put((1, '{next_sub}.' + sub))
except Exception as e:
pass

except aiodns.error.DNSError as e:
if e.args[0] in [1, 4]:
pass
elif e.args[0] in [11, 12]: # 12 timeout # (11, 'Could not contact DNS servers')
# print('timed out sub %s' % sub)
self.timeout_subs[sub] = self.timeout_subs.get(sub, 0) + 1
if self.timeout_subs[sub] <= 1:
await self.queue.put((0, sub)) # Retry
else:
print(e)
except asyncio.TimeoutError as e:
pass
except Exception as e:
import traceback
traceback.print_exc()
with open('errors.log', 'a') as errFile:
errFile.write('[%s] %s\n' % (type(e), str(e)))

首先是随机选择了一个DNS解析器,然后的一大段代码都在处理a五位a-z的爆破。接下来就是正式的查询步骤

1
answers = await self.resolvers[j].query(cur_domain, 'A')

通过这条语句使用aiodns查询了域名的A记录,如果有结果,再查询该域名的CNAME记录,从CNAME中反查出子域名加入到结果中。

1
answers = await self.resolvers[j].query(cur_domain, 'CNAME')

关于各种DNS查询类型,可以自己百度,这里就不做更多展开了。接下来似乎是在处理泛解析的问题,很迷惑这里怎么又写了一遍。最后就是在处理aiodns抛出的异常的代码,如DNS中没有对应的子域名,超时等问题。

总结一下subDomainBrute的整个程序流程:

graph LR
a0(加载子域名字典)-->a1(DNS可靠性检测优选)-->a2(泛解析域名检测)-->a3(DNS轮询暴破子域名)-->a4(CNAME记录反查子域名)-->a5(输出结果)

总结

通过本次源码分析,学到了:

  • 子域名扫描的步骤和流程

  • DNS服务器可靠性检测判断

  • 如何检测一个域名是否为泛解析域名

  • 可以优化DNS的筛选步骤,如可以预先对可用DNS进行测速,选择最快的DNS服务器进行查询

  • 扫描方法过于单一,只支持暴力查询,可以加入如搜索引擎查询等其他方法使扫描结果更为全面

  • 代码存在冗余,优化以后可以更为简洁明了

总的来说这是一款不错的子域名扫描器,速度快也较为精准,但仍然存在可以优化的空间。


子域名扫描原理与subDomainsBrute源码解析
https://wanf3ng.github.io/2021/02/02/子域名扫描原理与subDomainsBrute源码解析/
作者
wanf3ng
发布于
2021年2月2日
许可协议