Python安全工具源码分析:wydomain – 作者:c01d

工具介绍

wydomain是猪猪侠开发的一款子域名信息搜集工具,因其枚举速度快,结果准确,成为不少白帽居家旅行的必备神器。工具主要分为两个模块,dnsburte模块和wydomain模块,dnsburte模块通过用户自定义字典发送dns查询,最后筛选出符合条件的域名。而wydomain模块则是通过调用多个第三方网站的公开api获取子域名数据。

工具目录:

. # 目录由tree生成
├── captcha.py # 验证码识别
├── common.py # 发送http请求
├── config.py # 主配置文件
├── default.csv # 默认字典
├── dnsburte.py  # dnsburte模块通过用户自定义字典发送dns查询,最后筛选出符合条件的域名。
├── dnspod.csv # dnspod子域名字典
├── domains.log
├── ph_cookie.js
├── README.md
├── requirements.txt # 依赖库
├── result # 子域名枚举结果文件夹
  ├── aliyun.com
    ├── alexa.json
    ├── chaxunla.json
    ├── dnsburte.json
    ├── googlect_dnsnames.json
    ├── googlect_subject.json
    ├── ilinks.json
    ├── netcraft.json
    ├── sitedossier.json
    ├── threatcrowd.json
    └── threatminer.json
  └── weibo.com
      ├── alexa.json
      ├── chaxunla.json
      ├── dnsburte.json
      ├── googlect_dnsnames.json
      ├── googlect_subject.json
      ├── ilinks.json
      ├── netcraft.json
      ├── sitedossier.json
      ├── threatcrowd.json
      └── threatminer.json
├── tools
  ├── __init__.py
  └── skynet.py
├── upload.py
├── utils  # 第三方网站查询模块
  ├── alexa.py
  ├── chaxunla.py
  ├── fileutils.py
  ├── googlect.py
  ├── ilinks.py
  ├── __init__.py
  ├── netcraft.py
  ├── passivetotal.py
  ├── sitedossier.py
  ├── threatcrowd.py
  └── threatminer.py
├── weibo_domains.log
├── wydomain.csv # wydomain 字典
└── wydomain.py # dnsburte模块通过用户自定义字典发送dns查询,最后筛选出符合条件的域名。

5 directories, 47 files

项目地址:https://github.com/ring04h/wydomain

dnsburte模块

主运行流程

脚本直接运行 > parser_args命令行参数获取用户输入 > 传入主函数run > 根据传入的参数,生成DomainFuzzer类的实例 > 调用DomainFuzzer类实例的run方法 > 获取返回的数据写入到空列表subdomains里 > 调用save_result函数将列表中的内容写入到json文件。

在逐个拆解代码块分析之前,最好能够大致浏览一下脚本里都定义了哪些函数,哪些类,调用了哪些python标准模块,第三方模块,以及作者是否自己编写了模块。通过函数名称,类名称及作者的注释可以快速理解部分代码块的主要作用。

python标准模块:

import time
import re
import os
import sys
import json
import Queue
import random
import logging
import argparse
import threading

python第三方模块:

import dns.query
import dns.resolver
import dns.rdatatype

作者自定义模块:

from common import save_result
from utils.fileutils import FileUtils

自定义函数:

run()

自定义类:

Domain()
DomainFuzzer()
bruteWorker(threading.Thread)

全局变量:

logging.basicConfig( # logging模块基础配置文件
   level=logging.INFO,
   format='%(asctime)s [%(levelname)s] %(message)s',
)

nameservers = [  # 公共dns列表
   '119.29.29.29', '182.254.116.116',
   '8.8.8.8', '8.8.4.4',
   '180.76.76.76',
   '1.2.4.8', '210.2.4.8',
   '101.226.4.6', '218.30.118.6',
   '8.26.56.26', '8.20.247.20'
]

代码段分析

当dnsburte模块被用户直接运行时,作者通过使用argparse模块从用户终端获取命令行参数,最后将获取到的参数传入到主函数run()里。

if __name__ == '__main__':
   parser = argparse.ArgumentParser(description="wydomian v 2.0 to bruteforce subdomains of your target domain.")
   parser.add_argument("-t","--thread",metavar="",default=16, # 线程数
       help="thread count")
   parser.add_argument("-time","--timeout",metavar="",default=5, # 超时时间
       help="query timeout")
   parser.add_argument("-d","--domain",metavar="",  # 目标域名
       help="domain name")
   parser.add_argument("-f","--file",metavar="",default="wydomain.csv",  # 字典
       help="subdomains dict file name")
   parser.add_argument("-o","--out",metavar="",default="bruteforce.log", # 结果
       help="result out file")
   args = parser.parse_args() # 将命令行参数传递到run函数。
   try:
       run(args)
   except KeyboardInterrupt:
       logging.info("Ctrl C - Stopping Client")
       sys.exit(1)

调用流程:parser.parse_args() > run()

主函数run()获取到用户终端传入的命令行参数,首先判断了一下用户是否已经输入域名,如果没有输入域名则打印出帮助信息,程序退出。

def run(args):
   domain = args.domain # 域名
   thread_cnt = int(args.thread) # 线程数
   timeout = int(args.timeout) # 超时时间
   dict_file = args.file # 字典
   outfile = args.out # 枚举结果

   if not domain:
       print('usage: dnsburte.py -d aliyun.com') # 判断域名是否已经输入
       sys.exit(1)

该工具正常运行时,子域名枚举的结果存放在result/domain.com文件夹下。脚本首先获取当前文件运行的绝对路径,然后通过域名名称将文件夹创建到result目录下,如果文件夹不存在则直接新建一个文件夹。

    # 初始化缓存路径
   script_path = os.path.dirname(os.path.abspath(__file__)) # 当前文件运行的绝对路径
   _cache_path = os.path.join(script_path, 'result/{0}'.format(domain)) # 缓存的路径
   if not os.path.exists(_cache_path): # 新建文件夹
       os.makedirs(_cache_path, 0777) # 权限

初始化缓存路径以后,作者又新建一个空列表用于存放枚举出的子域名,然后将域名,字典传入到DomainFuzzer()类中,生成一个类的实例。调用实例的run()方法获取子域名列表,通过循环读取出子域名,最后存放到列表里。

调用流程:DomainFuzzer() > 类实例dnsfuzz > dnsfuzz.run() > subdomains_list

    subdomains = [] # 空列表
dnsfuzz = DomainFuzzer(target=domain, dict_file=dict_file, timeout=timeout) # 生成DomainFuzzer的实例

logging.info("starting bruteforce threading({0}) : {1}".format(thread_cnt, domain))
for subname in dnsfuzz.run(thread_cnt=thread_cnt): # 调用实例的run方法
  subdomains.append(subname) # 将循环读取出的子域存放到空列表里

_cache_path是域名枚举结果的文件夹,在上上个代码段,作者只是新建了文件夹用于存放结果,并没有将域名枚举的结果存放到文件夹里,os.path.join()将存放的文件夹与dnsfuzz.run() 获取到域名枚举的结果拼接成一个新的文件路径,最后调用作者自定义的函数save_result()将结果写入到指定文件。而outfile_file()则是用户从命令行传递过来指定保存结果的路径。

    _cache_file = os.path.join(_cache_path, 'dnsburte.json' ) # _cache_path是域名的文件夹,dnsburte.json是写入文件的名称
save_result(_cache_file, subdomains)

script_path = os.path.dirname(os.path.abspath(__file__)) # 当前文件运行的绝对路径
_outfile_file = os.path.join(script_path, outfile) # 拼接最后生成的文件路径
save_result(_outfile_file, subdomains)    

logging.info("dns bruteforce subdomains({0}) successfully...".format(len(subdomains)))
logging.info("result save in : {0}".format(_outfile_file))

主函数分析完毕以后,将目光聚焦到dnsfuzz.run() ,这是DomainFuzzer()类实例调用的方法,作者通过这个方法获取到了子域名枚举的结果,我们现在跟进这个类,找到这个类方法进行分析就能知道作者是通过什么样的方式枚举子域名。

DomainFuzzer类的初始化方法里定义了根域名,子域名爆破所用到的字典,及Domain(),Domain()类可以先不用着急去跟进分析,先通过变量名简单推断一下这个类究竟是用来干什么的。domain肯定是和域名处理有关,resolver是分解器,组合到一起就是域名分解器……(要不要这么机智 = = )。接下来继续看run()方法,run()有一个默认参数线程数量为16,类方法里定一了两个兄宝胎队列,长得一摸一样,不过两兄弟一个队列是用于存放读取字典的队列。而另一个队列则是域名爆破结束以后写入结果的队列。光有空队列不行哇,所以得通过for循环从字典中将子域名字符串提取出来,并且使用join()方法将子域名拼接完成,最后调用队列的put()方法将域名直接put到队列里。

class DomainFuzzer(object):
   """docstring for DomainFuzzer with brute force"""
   def __init__(self, target, dict_file='domain.csv', timeout=5):
       self.target = target # 根域名
       self.dict = FileUtils.getLines(dict_file) # 子域名爆破字典
       self.resolver = Domain(timeout=timeout) # 将超时时间传入到domain类中

   def run(self, thread_cnt=16):
       iqueue, oqueue = Queue.Queue(), Queue.Queue() # 两个队列iqueue是读取字典的队列,oqueue是写入结果的队列

       for line in self.dict:
           iqueue.put('.'.join([str(line),str(self.target)])) # 从字典里提取出子域拼接域名。
           # 将拼接好的域名直接put到队列里

到了这里,作者从字典读取子域名并拼接成功最后put到队列里,队列里有数据了,在后续执行dns查询的时候,只需要从队列里get数据就可以发送请求了哇~

作者又定义了一个空列表用于存放线程,而Domain.extensive()传入了目标的域名,并且返回到变量extensive,从函数的字面意思来看,这里是和泛解析有关的处理。

在run()函数的初始化中,作者定义了线程的数量为16,有线程数,但是并没有生成相应的线程。于是使用for循环生成相应的线程数并添加到线程空列表里,现在唯一值得困惑的就是extensive,不清楚extensive变量的具体类型是什么,是用于干什么的。现在不用过于纠结。也不用跟进分析。知道有这些疑问,自己做个简单的笔记就继续往下看。bruteWorker(),它是一个类。从类的名称来看,这个类的主要作用是用于枚举工作的类。bruteWorker()里传入进去作者定义的两个队列。

现在,threads线程列表里有相应的线程。直接使用for循环依次启动线程。最后两行代码作者判断了一下oqueue写入结果的队列是否不为空,如果不为空则直接get获取队列里的数据

        extensive, threads = self.resolver.extensive(self.target), [] # 定义了两个变量一个extensive 用于存放泛解析,另一个变量则是线程空列表
    
        for i in xrange(thread_cnt):
            '''根据线程数生成相应的线程并添加到线程列表里'''
            threads.append(self.bruteWorker(self, iqueue, oqueue, extensive)) # 将bruteWorker传入队列

        for t in threads: t.start() # 启动线程
        for t in threads: t.join()

        while not oqueue.empty():# 判断队列是否不为空
            yield oqueue.get()

调用流程:dnsfuzz.run() > DomainFuzzer().run() > bruteWorker()

bruteWorker()不是一个单独的类,而是在DomainFuzzer()类里的嵌套类。

该类继承于threading.Thread多线程类,类的初始化方法中定义了DomainFuzzer()类的实例及读取字典与写入结果的队列,extensive的默认参数为空列表。

class bruteWorker(threading.Thread):
    '''多线程模块'''
    """
    domain name brute force threading worker class
    @param dfuzzer      DomainFuzzer base class
    @param iqueue       Subdomain dict Queue()
    @param oqueue       Brutefoce result Queue()
    @param extensive    Doman extensive record sets
    """
    def __init__(self, dfuzzer, iqueue, oqueue, extensive=[]):
        threading.Thread.__init__(self)
        self.queue = iqueue  # 读取队列
        self.output = oqueue  # 写入队列
        self.dfuzzer = dfuzzer
        self.extensive = extensive

run()方法首先判断读取字典的队列是否不为空,如果不为空则从队列里获取数据,其次判断extensive的结果是否存在,如果不存在则直接调用DomainFuzzer()类实例的brute()方法。如果存在仍然调用DomainFuzzer()类实例的brute()方法,只是传入了一个ret的参数为True。从这里可以分析出DomainFuzzer()类实例的brute方法是根据extensive的状态来进行爆破。并且会将爆破的结果返回到rrset参数中。如果rrset参数的值不为空,则通过for循环将rrset中的数据遍历出来,最后将结果put到写入结果的队列。

    def run(self):
        try:
            while not self.queue.empty():
                sub = self.queue.get_nowait() # 从队列里获取数据
                if len(self.extensive) == 0: # not_extensive
                    if self.dfuzzer.resolver.brute(sub):
                        self.output.put(sub)
                else:
                    rrset = self.dfuzzer.resolver.brute(sub, ret=True)
                    if rrset is not None:
                        for answer in rrset['A']:
                            if answer not in self.extensive:
                                self.output.put(sub) # 写入结果
        except Exception, e:
            pass

调用流程:bruteWorker() > bruteWorker().run() > Domain.brute()

到目前为止,只剩下Domain.extensive(),和Domain.brute()方法还没有跟进分析。不过没关系,想要的结果都会在Domain()类中找到答案。

Domain()类的初始化方法中定义了一个recursion空字典及dns.resolver.Resolver()dns查询类。

class Domain(object):
   """docstring for Domain base class"""
   def __init__(self, timeout=5):
       self.recursion = {}
       self.resolver = dns.resolver.Resolver()
       if timeout:
           self.resolver.timeout = timeout
           self.resolver.lifetime = timeout

Domain()类的brute方法根据ret的状态执行不同的查询,ret条件为True时,使用原始dns函数resolver.query()进行查询,并且函数直接返回True,为False时则使用Domain.query()进行查询,最后将dns查询以后的结果返回。作者在这里使用异常处理,用于解决在执行dns查询中会遇到的一些error,不过大多都是捕获以后直接返回False,说句实话我现在也让这两个query()函数搞晕了,不过先静下心慢慢分析,把存有疑问的地方记录到纸上,继续往下分析代码。

    def brute(self, target, ret=False):
  """
  domain brute force fuzz
  @param target           burte force, domain name arg
  @param ret             return result flag
  """
  try:
      if not ret: # return_flag set false, using dns original query func
          if self.resolver.query(target, 'A'):
              return True
      else: # return_flag set true
          return self.query(target, 'A')
  except dns.resolver.NoAnswer:
      return False # catch the except, nothing to do
  except dns.resolver.NXDOMAIN:
      return False # catch the except, nothing to do
  except dns.resolver.Timeout:
      return self.burte(target) # timeout retry
  except Exception, e:
      logging.info(str(e))
      return False

要解决掉上一个代码段所存在的疑问,所以现在跟进Domain()类 > query()方法进行分析。

query()方法定义了两个变量,一个是目标地址,另一个则是rdtype,rdtype一开始不理解具体是用来干什么的可以不用过于纠结,只需要知道传入的是一个类型的变量。当函数获取到用户传递过来两个参数,调用random随机取公共dns列表里的ip地址对target进行查询。作者将执行dns查询的以后的answer对象传递给了Domain()类 > parser()方法。

    def query(self, target, rdtype):
  try:
      self.resolver.nameservers = [random.choice(nameservers),random.choice(nameservers)]
      answer = self.resolver.query(target, rdtype)
      return self.parser(answer)
  except dns.resolver.NoAnswer:
      return None # catch the except, nothing to do
  except dns.resolver.NXDOMAIN:
      return None # catch the except, nothing to do
  except dns.resolver.Timeout:
      # timeout retry
      print(target, rdtype, '<timeout>')
  except Exception, e:
      logging.info(str(e))

紧接着跟进parse()方法进行分析,这个函数有点东西哇。函数内定义了一个空字典,最后return的也是该变量,5个if条件都是和域名解析的类型有关,A类型,CNAME类型等等。这是粗略看了一下函数内部的部分代码块得到的信息,接下来详细开始分析。

两个for循环用于从dns查询的结果里获取解析的类型,ip地址等信息,item.rdtype变量里存放的是执行查询以后的子域名解析类型。变量的类型则是数字。比如id 1所对应的解析类型是A,id 5所对应的解析类型是CNAME。函数通过dns查询获取到子域名所对应的解析id,而接着的一个if条件是与get_type_id(‘A’)做的判断,由此可以推断出脚本的get_type_name()方法是通过id反查解析类型,而get_type_id()方法则是通过解析类型反查id。

推断结果正不正确不要急,反正也是推断,到最后我们在跟进函数进行分析,验证一下猜想。

根据作者的注释可以得知result字典里只存放了两种格式,域名和ip地址。字典的键为解析的类型,值为域名或者ip地址组成的列表。

    def parser(self, answer):
  """
  result relationship only two format
  @domain     domain name
  @address   ip address
  """

  result = {}
  for rrsets in answer.response.answer:
      for item in rrsets.items:
          rdtype = self.get_type_name(item.rdtype) # 获取子域名解析的类型
          if item.rdtype == self.get_type_id('A'):
              '''判断解析的类型'''
              if result.has_key(rdtype):
                  result[rdtype].append(item.address)
              else:
                  result[rdtype] = [item.address]

          if item.rdtype == self.get_type_id('CNAME'):
              '''判断解析的类型'''
              if result.has_key(rdtype):
                  result[rdtype].append(item.target.to_text())
              else:
                  result[rdtype] = [item.target.to_text()]

          if item.rdtype == self.get_type_id('MX'):
              '''判断解析的类型'''
              if result.has_key(rdtype):
                  result[rdtype].append(item.exchange.to_text())
              else:
                  result[rdtype] = [item.exchange.to_text()]
                       
          if item.rdtype == self.get_type_id('NS'):
                   '''判断解析的类型'''
              if result.has_key(rdtype):
                  result[rdtype].append(item.target.to_text())
              else:
                  result[rdtype] = [item.target.to_text()]
  return result

前几个解析类型都大同小异,但txt解析类型首先获取了item.to_text()的值,然后判断include字符串是否在item.to_text()结果里,如果存在使用正则提取字符串,然后判断该字符串是否是域名,如果是域名则继续执行query()查询域名解析。如果include字符串不在结果里,则正则匹配有关ip的字符串。

                if item.rdtype == self.get_type_id('TXT'):
  rd_result = item.to_text()
  if 'include' in rd_result:
      regex = re.compile(r'(?<=include:).*?(?= )')
      for record in regex.findall(rd_result):
          if self.is_domain(record):
              self.query(record, rdtype)
  else:
      regex = re.compile(r'(?<=ip4:).*?(?= |/)')
      qname = rrsets.name.to_text()
      self.recursion[qname] = []
      for record in regex.findall(rd_result):
          self.recursion[qname].append(record)
  result[rdtype] = self.recursion

调用流程:Domain() > Domain.brute() > Domain.query() > Domain.parser()

get_type_name()是通过dns查询结果中的id反查解析类型,而get_type_id()则是通过解析类型反查id。

is_domian()是一个类静态方法,类和实例均可以调用。根据正则匹配的结果返回True或者False。

is_ipv4()则是匹配ip地址。根据正则的匹配结果返回True或者False。

在这里也收获到两条正则,记录下来,以后遇到匹配域名或匹配ip地址的地方就可以直接借鉴了哇~

	def get_type_name(self, typeid):
    	return dns.rdatatype.to_text(typeid) # 根据类型的id返回解析文字

	def get_type_id(self, name):
    	return dns.rdatatype.from_text(name)  # 根据解析的文字类型返回id

	@staticmethod  # 静态方法,类和实例均可调用
	def is_domain(self, domain):
    '''正则匹配域名'''
   		domain_regex = re.compile(
        r'(?:[A-Z0-9_](?:[A-Z0-9-_]{0,247}[A-Z0-9])?\.)+(?:[A-Z]{2,6}|[A-Z0-9-]{2,}(?<!-))\Z', 
        re.IGNORECASE)
    	return True if domain_regex.match(domain) else False

	def is_ipv4(self, address):
    	'''正则匹配ipv4地址'''
    	ipv4_regex = re.compile(
        r'(?:25[0-5]|2[0-4]\d|[0-1]?\d?\d)(?:\.(?:25[0-5]|2[0-4]\d|[0-1]?\d?\d)){3}',
        re.IGNORECASE)
    	return True if ipv4_regex.match(address) else False

调用流程:Domain.parser() > get_type_name() & get_type_id() & is_domain()

Domain类的最后,则是一个处理泛解析的函数。

泛域名解析是指,利用通配符* (星号)来做二级域名以实现所有的二级域名均指向同一IP地址。在域名前添加任何子域名,均可访问到所指向的web地址。

在这个函数中,作者定义了两个列表,第一个列表为空,第二个列表则是生成指定的三个子域名。然后循环进行dns查询,如果查询的结果不为空,则将esets列表中添加新的记录以后return。

	def extensive(self, target):
    	'''处理泛解析'''
    	(ehost, esets) = ['wyspider{0}.{1}'.format(i, target) for i in range(3)], []
    	for host in ehost:
        	try:
            	record = self.query(host, 'A')
            	if record is not None:
                	esets.extend(record['A'])
        	except Exception:
            	pass
    	return esets

调用流程:Domain() > extensive()

总结:

脚本直接运行 > parser_args命令行参数获取用户输入 > 传入主函数run > 根据传入的参数,生成DomainFuzzer类的实例 > 调用DomainFuzzer类实例的run方法 > 获取返回的数据写入到空列表subdomains里 > 调用save_result函数将列表中的内容写入到json文件。

wydomain模块

wydomain模块通过调用多个,第三方威胁情报搜集网站的公开api获取子域名数据。

主运行流程

脚本被直接运行时 > parser.parse_args()获取用户命令行输入 > 命令行获取的参数传入主函数run() > 根据用户传入的域名调用指定类的run()方法获取结果 > 最后调用save_result()将获取到的结果写入到指定文件。

python标准模块:

import os
import sys
import json
import argparse
import logging

作者自定义模块:

from common import save_result, read_json
from utils.fileutils import FileUtils

from utils.alexa import Alexa
from utils.threatminer import Threatminer
from utils.threatcrowd import Threatcrowd
from utils.sitedossier import Sitedossier
from utils.netcraft import Netcraft
from utils.ilinks import Ilinks
from utils.chaxunla import Chaxunla
from utils.googlect import TransparencyReport

自定义函数:

run()

全局变量:

logging.basicConfig(
    level=logging.INFO, # filename='/tmp/wyproxy.log',
    format='%(asctime)s [%(levelname)s] %(message)s',
)

代码块分析

作者使用args = parser.parse_args()获取用户从命令行传递过来的域名及写入子域名结果文件的路径。最后将两个参数传入到run()函数中。

if __name__ == '__main__':
    parser = argparse.ArgumentParser(description="wydomain v 2.0 to discover subdomains of your target domain.")
    parser.add_argument("-d","--domain",metavar="",
        help="domain name") # 域名
    parser.add_argument("-o","--out",metavar="",default="domains.log",
        help="result out file")  # 结果输出文件路径
    args = parser.parse_args()

    try:
        run(args)
    except KeyboardInterrupt:
        logging.info("Ctrl C - Stopping Client") # 异常记录到日志
        sys.exit(1)

调用流程:parser.parse_args() > run()

run()函数获取域名及写入子域名结果文件的路径。

首先判断用户是否已经输入域名,如果没有输入域名则脚本打印帮助信息,程序退出。

	def run(args):
    	domain = args.domain
    	outfile = args.out

    	if not domain:
        	print('usage: wydomain.py -d aliyun.com')
        	sys.exit(1)

初始化缓存文件路径。

该工具正常运行时,api获取的结果存放在result/domain.com文件夹下。脚本首先获取当前文件运行的绝对路径,然后通过域名名称将文件夹创建到result目录下,如果文件夹不存在则直接新建一个文件夹。

	# init _cache_path
	script_path = os.path.dirname(os.path.abspath(__file__))
	_cache_path = os.path.join(script_path, 'result/{0}'.format(domain))
	if not os.path.exists(_cache_path):
    	os.makedirs(_cache_path, 0777)

logging.info()在这里的作用其实就是相当于print打印了一条提示信息。有关logging的代码块可以直接忽略掉哇~

os.path.join()则是将缓存的域名路径与第三方网站获取到子域名的信息文件组合为一个完整的路径。方便作者后续写入文件的操作。在这个函数中,作者调用了多个类的run()方法,Alexa.run(),Threatminer.run(),Threatcrowd.run(),Sitedossier.run(),Netcraft.run(),Ilinks.run(),Chaxunla().run(),TransparencyReport().run()这些类分别对应了需要进行查询的网站,当执行run方法以后,函数通过发送请求获取数据进行处理,返回指定网站查询的数据存入到变量,最后通过save_result()方法将结果写入到用户所指定的路径。

    # alexa result json file
logging.info("starting alexa fetcher...")
_cache_file = os.path.join(_cache_path, 'alexa.json')
result = Alexa(domain=domain).run()
save_result(_cache_file, result)
logging.info("alexa fetcher subdomains({0}) successfully...".format(len(result)))

# threatminer result json file
logging.info("starting threatminer fetcher...")
_cache_file = os.path.join(_cache_path, 'threatminer.json')
result = Threatminer(domain=domain).run()
save_result(_cache_file, result)
logging.info("threatminer fetcher subdomains({0}) successfully...".format(len(result)))

# threatcrowd result json file
logging.info("starting threatcrowd fetcher...")
_cache_file = os.path.join(_cache_path, 'threatcrowd.json')
result = Threatcrowd(domain=domain).run()
save_result(_cache_file, result)
logging.info("threatcrowd fetcher subdomains({0}) successfully...".format(len(result)))

# sitedossier result json file
logging.info("starting sitedossier fetcher...")
_cache_file = os.path.join(_cache_path, 'sitedossier.json')
result = Sitedossier(domain=domain).run()
save_result(_cache_file, result)
logging.info("sitedossier fetcher subdomains({0}) successfully...".format(len(result)))

# netcraft result json file
logging.info("starting netcraft fetcher...")
_cache_file = os.path.join(_cache_path, 'netcraft.json')
result = Netcraft(domain=domain).run()
save_result(_cache_file, result)
logging.info("netcraft fetcher subdomains({0}) successfully...".format(len(result)))

# ilinks result json file
logging.info("starting ilinks fetcher...")
_cache_file = os.path.join(_cache_path, 'ilinks.json')
result = Ilinks(domain=domain).run()
save_result(_cache_file, result)
logging.info("ilinks fetcher subdomains({0}) successfully...".format(len(result)))

# chaxunla result json file
logging.info("starting chaxunla fetcher...")
_cache_file = os.path.join(_cache_path, 'chaxunla.json')
result = Chaxunla(domain=domain).run()
save_result(_cache_file, result)
logging.info("chaxunla fetcher subdomains({0}) successfully...".format(len(result)))

# google TransparencyReport result json file
logging.info("starting google TransparencyReport fetcher...")
result = TransparencyReport(domain=domain).run()
_cache_file = os.path.join(_cache_path, 'googlect_subject.json')
save_result(_cache_file, result.get('subjects'))
_cache_file = os.path.join(_cache_path, 'googlect_dnsnames.json')
save_result(_cache_file, result.get('dns_names'))
logging.info("google TransparencyReport fetcher subdomains({0}) successfully...".format(len(result.get('dns_names'))))

调用流程:run() > Alexa.run() &Threatminer.run() & Threatcrowd.run() & Sitedossier.run() & Netcraft.run() & Ilinks.run() > Chaxunla.run() > TransparencyReport.run() > save_result() 写入返回的结果

作者代码写的很清晰。不过在这里也可以借鉴sublist3r工具作者的实现思路。

以下代码块节选自国外子域名爆破工具sublist3r.py,该工具的作者首先定义了一个supported_engines字典用于存放类,字典的键为搜索引擎的名称,值则为搜索引擎所对应的类名。chosenEnums空字典则用于存放用户以选择的搜索引擎。如果用户没有指定搜索引擎则默认使用所有的搜索引擎,否则使用split函数进行分割分别获取用户指定的搜索引擎,然后调用for循环将用户以选择的搜索引擎类添加到列表。最后通过列表解析从选择好的搜索引擎列表中传入指定的域名进行子域名枚举。

supported_engines = {'baidu': BaiduEnum,
                    'yahoo': YahooEnum,
                    'google': GoogleEnum,
                    'bing': BingEnum,
                    'ask': AskEnum,
                    'netcraft': NetcraftEnum,
                    'dnsdumpster': DNSdumpster,
                    'virustotal': Virustotal,
                    'threatcrowd': ThreatCrowd,
                    'ssl': CrtSearch,
                    'passivedns': PassiveDNS
                    } # 存放搜索引擎类

chosenEnums = []

if engines is None:
   chosenEnums = [
       BaiduEnum, YahooEnum, GoogleEnum, BingEnum, AskEnum,
       NetcraftEnum, DNSdumpster, Virustotal, ThreatCrowd,
       CrtSearch, PassiveDNS
  ] # 默认使用所有的搜索引擎。
else:
   engines = engines.split(',')
   for engine in engines:
       if engine.lower() in supported_engines:
           chosenEnums.append(supported_engines[engine.lower()])

# Start the engines enumeration
enums = [enum(domain, [], q=subdomains_queue, silent=silent, verbose=verbose) for enum in chosenEnums]
for enum in enums:
   enum.start()
for enum in enums:
   enum.join()

继续回到run()函数,作者通过调用不同类的run()方法获取个个网站的api查询结果,最后将他们分别保存到了不同的文件。subdomains空列表里用于存放所有获取到的子域名地址,而run函数剩下的这一部分代码块则是作者将所有的查询结果文件进行处理,最后进行整合。

read_json(),save_result()这些函数都是作者自定义的函数,主要的作用是对于文件的读取,写入,删除等等操作。

    # Collection API Subdomains
sub_files = [
  'alexa.json',
  'chaxunla.json',
  'ilinks.json',
  'netcraft.json',
  'sitedossier.json',
  'threatcrowd.json',
  'threatminer.json']

# process all cache files
subdomains = []
for file in sub_files:
  _cache_file = os.path.join(_cache_path, file)
  json_data = read_json(_cache_file)
  if json_data:
      subdomains.extend(json_data)

# process openssl x509 dns_names
_cache_file = os.path.join(_cache_path, 'googlect_dnsnames.json')
json_data = read_json(_cache_file)
for sub in json_data:
  if sub.endswith(domain):
      subdomains.append(sub)

# collection burte force subdomains
_burte_file = os.path.join(_cache_path, 'dnsburte.json')
if FileUtils.exists(_burte_file):
  json_data = read_json(_burte_file)
  if json_data:
      subdomains.extend(json_data)

# save all subdomains to outfile
subdomains = list(set(subdomains))
_result_file = os.path.join(script_path, outfile)
save_result(_result_file, subdomains)
logging.info("{0} {1} subdomains save to {2}".format(
  domain, len(subdomains), _result_file))

调用流程:subdomains for循环读取> sub_files内容

只要和网站查询有关,就一定会涉及到发送请求及获取数据,最后根据需求筛选数据。接下来逐个跟进这些类进行分析,Alexa(),Threatminer(),Threatcrowd(),Sitedossier(),Netcraft(),Ilinks(),Chaxunla(),TransparencyReport()。

1.alexa模块

数据源:http://www.alexa.cn/

python标准模块:

import re
import time
import logging

作者自定义模块:

from common import http_request_get, http_request_post, is_domain

自定义类:

Alexa(object)

自定义类方法:

run()
fetch_chinaz()
fetch_alexa_cn()
get_sign_alexa_cn()

Alexa类的初始化方法中,定义了两个变量,一个是目标域名,而另一个则是空列表

class Alexa(object):
    """docstring for Alexa"""
    def __init__(self, domain):
        super(Alexa, self).__init__()
        self.domain = domain
        self.subset = []

run方法为Alexa类的主要运行函数。函数内部调用了fetch_chinaz()与fetch_alexa_cn()方法。从这两个方法的命名来看,通过调用这两个方法可直接取指定网站的返回结果,函数最后返回的则是一个去重以后的列表。

	def run(self):
    	try:
        	self.fetch_chinaz()
        	self.fetch_alexa_cn()
        	return list(set(self.subset))
    	except Exception as e:
        	logging.info(str(e))
        	return self.subset

现在跟进fetch_chinaz(),与fetch_alexa_cn()方法。作者在注释里也提示这两个方法,可以直接获取指定网站的子域名查询结果。既然要进行查询,肯定会发送http请求,作者使用http_request_get()发送api接口的查询请求。http的请求过程则让作者定义到函数里了,在发送请求的时候直接传入域名就可以发送请求了。发送请求以后,脚本获取到了http的返回信息,然后实用正则将html中的子域名提取出来,最后存放到subset空列表里。

fetch_alexa_cn()获取数据的方法与fetch_chinaz()大同小异,只是在发送查询请求之前调用了get_sign_alexa_cn(),从方法的命名来看,这里获取了签名的状态,当签名的状态不为空时,则获取签名中domain,sig,keyt的参数,没有这三个参数是无法发送http请求进行查询。根据获取到签名的这三个参数。作者构造post请求进行发送,最后通过各种字符串的处理提取出子域名,并将结果添加到subset列表里。

	def fetch_chinaz(self):
    	"""get subdomains from alexa.chinaz.com"""

    	url = 'http://alexa.chinaz.com/?domain={0}'.format(self.domain)
    	r = http_request_get(url).content
    	subs = re.compile(r'(?<="\>\r\n<li>).*?(?=</li>)') # 正则匹配<li>标签内的域名
    	result = subs.findall(r)
    	for sub in result:
        	if is_domain(sub):
            	self.subset.append(sub)

	def fetch_alexa_cn(self):
    	"""get subdomains from alexa.cn"""
    	sign = self.get_sign_alexa_cn()
    	if sign is None:
        	raise Exception("sign_fetch_is_failed")
    	else:
        	(domain,sig,keyt) = sign

    	pre_domain = self.domain.split('.')[0] # baidu.com > baidu == pre_domain

    	url = 'http://www.alexa.cn/api_150710.php'
    	payload = {
        	'url': domain,
        	'sig': sig,
        	'keyt': keyt,
        	}
    	r = http_request_post(url, payload=payload).text

    	for sub in r.split('*')[-1:][0].split('__'):
        	if sub.split(':')[0:1][0] == 'OTHER':
            	break
        	else:
            	sub_name = sub.split(':')[0:1][0]
            	sub_name = ''.join((sub_name.split(pre_domain)[0], domain))
            	if is_domain(sub_name):
                	self.subset.append(sub_name)

调用流程:Alexa() > run() > fetch_chinaz() & fetch_alexa_cn()

2.threatminer模块

数据源:https://www.threatminer.org

python标准模块:

import re
import logging

作者自定义模块:

from common import curl_get_content, http_request_get, is_domain

自定义类:

Threatminer()

自定义类方法:

run()

Threatminer类初始化方法中定义了三个参数,第一个参数为用户指定的根域名domain,第二个参数为subset空列表用于存放已经获取到的子域名结果,第三个参数website则是数据源地址。

run()方法中根据用户指定的域名拼接api接口请求,然后获取返回的html信息,使用正则进行筛选子域名。最后将获取到的结果添加到subset列表。

class Threatminer(object):
    """docstring for Threatminer"""
    def __init__(self, domain):
        super(Threatminer, self).__init__()
        self.domain = domain
        self.subset = []
        self.website = "https://www.threatminer.org"
    
    def run(self):
        try:
            url = "{0}/getData.php?e=subdomains_container&q={1}&t=0&rt=10&p=1".format(self.website, self.domain)
            # content = curl_get_content(url).get('resp')
            content = http_request_get(url).content

            _regex = re.compile(r'(?<=<a href\="domain.php\?q=).*?(?=">)')
            for sub in _regex.findall(content):
                if is_domain(sub):
                    self.subset.append(sub)

            return list(set(self.subset))
        except Exception as e:
            logging.info(str(e))
            return self.subset

调用流程:Threatminer() > run()

3.threatcrowd模块

数据源:

https://www.threatcrowd.org

python标准模块

import re
import json
import logging

作者自定义模块:

from common import curl_get_content, http_request_get, is_domain

自定义类:

Threatcrowd()

自定义类方法:

run()

Threatcrowd()类初始化方法中定义了三个参数,第一个参数为用户指定的根域名domain,第二个参数为subset空列表用于存放已经获取到的子域名结果,第三个参数website则是数据源地址。

run()方法中根据用户指定的域名拼接api接口请求,然后获取返回的json信息,使用json.loads()进行加载筛选子域名。最后将获取到的结果添加到subset列表。

class Threatcrowd(object):
   """docstring for Threatcrowd"""
   def __init__(self, domain):
       super(Threatcrowd, self).__init__()
       self.domain = domain
       self.subset = []
       self.website = "https://www.threatcrowd.org"

   def run(self):
       try:
           url = "{0}/searchApi/v2/domain/report/?domain={1}".format(self.website, self.domain)
           # content = curl_get_content(url).get('resp')
           content = http_request_get(url).content

           for sub in json.loads(content).get('subdomains'):
               if is_domain(sub):
                   self.subset.append(sub)

           return list(set(self.subset))
       except Exception as e:
           logging.info(str(e))
           return self.subset

调用流程:Threatcrowd() > run()

4.sitedossier模块

数据源:http://www.sitedossier.com

python标准模块:

import re
import time
import logging

作者自定义模块:

from captcha import Captcha
from common import http_request_get, http_request_post, is_domain

自定义类:

Sitedossier()

自定类方法:

run()
get_content()
parser()
get_subdomain()
human_act()
audit()
get_audit_img()
__str__()

类初始化方法中定义了域名参数domain,Captcha()验证码类,subset空列表用于存放获取到的子域名结果。

class Sitedossier(object):
    """docstring for Sitedossier"""
    def __init__(self, domain):
        super(Sitedossier, self).__init__()
        self.domain = domain
        self.captcha = Captcha()
        self.subset = []

run()方法根据用户传递的domain参数调用get_content()方法进行api查询,获取查询的结果。parser()方法用于从返回的结果里筛选特定的数据。set函数用于列表去重。

    def run(self):
  try:
      url = 'http://www.sitedossier.com/parentdomain/{0}'.format(self.domain)
      r = self.get_content(url)
      self.parser(r)
      return list(set(self.subset))
  except Exception, e:
      logging.info(str(e))
      return self.subset

run() > get_content() 方法中调用http_request_get()发送http请求,发送请求以后首先判断网站是否存在验证码验证,如果存在,则调用human_act进行验证码处理。否则调用get_content()获取网页数据。

    def get_content(self, url):
  logging.info('request: {0}'.format(url))
  r = http_request_get(url).text
  if self.human_act(r) is True:
      return r
  else:
      self.get_content(url)

跟进get_content() > human_act()方法进行分析,human_act()首先获取发送请求以后的数据,if条件判断auditimage与blacklisted是否在返回的页面里,如果存在,则调用get_audit_img()获取图片验证码,如果图片的地址不为空,则调用captcha.verification()进行验证码验证。再次进行判断验证码验证以后的结果中是否存在Result键,如果存在则直接调用audit()方法。

    def human_act(self, response):
  if 'auditimage' in response or 'blacklisted' in response:
      imgurl = self.get_audit_img(response)
      if imgurl is not None:
          ret = self.captcha.verification(imgurl)
          if ret.has_key('Result'):
              self.audit(ret['Result'])
              return True
          else:
              raise Exception("captcha_verification_is_empty")
      else:
          raise Exception("audit_img_is_empty")
  else:
      return True

跟进human_act() > get_audit_img()方法,作者使用正则表达式筛选出图片验证码的url地址,最后拼接完整的url地址进行返回。

	def get_audit_img(self, response):
    	auditimg = re.compile(r'(?<=<img src\=\"/auditimage/).*?(?=\?" alt="Please)')
    	imgurl = auditimg.findall(response)[0:]
    	if len(imgurl) >= 1:
        	imgurl = 'http://www.sitedossier.com/auditimage/{0}'.format(imgurl[0])
        	return imgurl
    	else:
        	return None

跟进human_act() > audit()方法,方法内部根据用户传入的code参数发送http请求。

	def audit(self, code):
    	payload = {'w':code}
    	url = 'http://www.sitedossier.com/audit'
    	r = http_request_post(url, payload=payload)

human_act() > captcha.verification方法暂时不跟进,只需要知道这里作者调用了第三方的验证码识别接口,通过获取到验证码图片的地址以后,传递给该函数,该函数将识别出的验证码结果返回。后续详细分析Captcha()类。

接下来跟进run() > parser()函数进行分析。作者通过调用正则表达式从response中筛选页面的page,根据page调用get_subdomain()方法获取子域名信息。最后将结果添加到subset列表。

    def parser(self, response):
  npage = re.search('<a href="/parentdomain/(.*?)"><b>Show', response)
  if npage:
      for sub in self.get_subdomain(response):
          self.subset.append(sub)
      nurl = 'http://www.sitedossier.com/parentdomain/{0}'.format(npage.group(1))
      response = self.get_content(nurl)
      self.parser(response)
  else:
      for sub in self.get_subdomain(response):
          self.subset.append(sub)

跟进parser() > get_subdomain()方法,函数内部通过正则表达式提取出域名。

    def get_subdomain(self, response):
  domain = re.compile(r'(?<=<a href\=\"/site/).*?(?=\">)')
  for sub in domain.findall(response):
      yield sub

跟进最后一个类方法Sitedossier() > str

    def __str__(self):
  handler = lambda e: str(e)
  return json.dumps(self, indent=2, default=handler)

调用方法:Sitedossier() > run() > get_content() & parser()

get_content() > human_act() > get_audit_img() & audit()

parser() > get_subdomain()

5.netcraft模块

数据源:http://searchdns.netcraft.com

python标准模块:

import re
import time
import logging
import subprocess

作者自定义模块:

from common import http_request_get, http_request_post, is_domain

自定义类:

Netcraft()

自定义类函数类:

run()
parser()
get_subdomains()
get_cookie()

Netcraft类初始化方法中定义了cookie,subset空列表,用户指定的域名地址,及数据源地址site参数。

class Netcraft(object):
    """docstring for Netcraft"""
    def __init__(self, domain):
        super(Netcraft, self).__init__()
        self.cookie = ''
        self.subset = []
        self.domain = domain
        self.site = 'http://searchdns.netcraft.com'

主运行方法run(),首先通过get_cookie()方法获取到用户cookie信息,调用http_request_get()发送http请求之前会将cookie信息传递到请求体里,最后通过parser()方法处理http的返回信息,并且将获取到的子域名信息添加到subset列表。

	def run(self):
    	try:
        	self.cookie = self.get_cookie().get('cookie')
        	url = '{0}/?restriction=site+contains&position=limited&host=.{1}'.format(
            	self.site, self.domain)
        	r = http_request_get(url, custom_cookie=self.cookie)
        	self.parser(r.text)
        	return list(set(self.subset))
    	except Exception, e:
        	logging.info(str(e))
        	return self.subset

跟进run() > get_cookie()与run() > parser()方法进行详细分析。

get_cookie() 方法里通过subprocess.Popen()调用操作系统命令执行phantomjs ph_cookie.js,将运行的结果返回到stdoutput与erroutput参数中,最后组成cookie字典return。如果cmdline字符串用户可以从外部控制,则这里会存在远程命令漏洞。

	def get_cookie(self):
    	try:
        	cmdline = 'phantomjs ph_cookie.js'
        	run_proc = subprocess.Popen(cmdline,shell=True,stdout=subprocess.PIPE,stderr=subprocess.PIPE)
        	(stdoutput,erroutput) = run_proc.communicate()
        	response = {
            	'cookie': stdoutput.rstrip(),
            	'error': erroutput.rstrip(),
        	}
        	return response
    	except Exception, e:
        	logging.info(str(e))
        	return {'cookie':'', 'error': str(e)}

parser()方法使用正则表达式提取出页面的page id,然后通过parser() > get_subdomains()获取子域名信息。 parser() > is_domain()方法判断获取到的数据是否为域名格式。最后将获取到的数据添加到subset列表。

	def parser(self, response):
    	npage = re.search('<A href="(.*?)"><b>Next page</b></a>', response)
    	if npage:
        	for item in self.get_subdomains(response):
            	if is_domain(item):
                	self.subset.append(item)
        	nurl = '{0}{1}'.format(self.site, npage.group(1))
        	r = http_request_get(nurl, custom_cookie=self.cookie)
        	time.sleep(3)
        	self.parser(r.text)
    	else:
        	for item in self.get_subdomains(response):
            	if is_domain(item):
                	self.subset.append(item)

跟进最后一个方法parser() > get_subdomains(),作者使用正则匹配a标签内的域名地址。

	def get_subdomains(self, response):
    	_regex = re.compile(r'(?<=<a href\="http://).*?(?=/" rel\="nofollow")')
    	domains = _regex.findall(response)
    	for sub in domains:
        	yield sub

调用流程:Netcraft() > run() > get_cookie() & http_request_get() & parser()

parser() > get_subdomains()

6.Ilinks模块:

数据源:http://i.links.cn/subdomain/

python标准模块:

import re

作者自定义模块:

from common import http_request_get, http_request_post, is_domain

自定义类:

Ilinks()

自定义类方法:

run()

Ilinks()类初始化方法中定义了,用户传递的域名domain参数,及数据源的api地址,subset列表用于存放执行查询以后的子域名结果。

run()方法根据用户指定的domain地址,传递到post请求体内,发送请求获取返回的数据,最后使用正则表达式将查询的结果筛选出来存放到列表 subset。

class Ilinks(object):
    """docstring for Ilinks"""
    def __init__(self, domain):
        super(Ilinks, self).__init__()
        self.domain = domain
        self.url = 'http://i.links.cn/subdomain/'
        self.subset = []

    def run(self):
        try:
            payload = {
                'b2': 1,
                'b3': 1,
                'b4': 1,
                'domain': self.domain
            }
            r = http_request_post(self.url,payload=payload).text
            subs = re.compile(r'(?<=value\=\"http://).*?(?=\"><input)')
            for item in subs.findall(r):
                if is_domain(item):
                    self.subset.append(item)

            return list(set(self.subset))
        except Exception as e:
            logging.info(str(e))
            return self.subset

调用流程:Ilinks() > run()

7.chaxunla模块:

数据源:http://api.chaxun.la/toolsAPI/getDomain

python标准模块:

import re
import time
import json
import logging

python第三方模块:

import requests

作者自定义模块:

from captcha import Captcha
from common import is_domain

自定义类:

Chaxunla()

自定义类方法:

run()
download()
verify_code()

全局变量:

req = requests.Session()

Chaxunla的初始化方法中定义了用户传递的domain参数,数据源的api地址url参数,子域名枚举的结果列表subset,还有一个用于验证的空字符串。

class Chaxunla(object):
    """docstring for Chaxunla"""
    def __init__(self, domain):
        super(Chaxunla, self).__init__()
        self.domain = domain
        self.url = 'http://api.chaxun.la/toolsAPI/getDomain/'
        self.subset = []
        self.verify = ""

run()方法中首先拼接url地址发送查询的请求,通过if条件判断返回的字符串status是否等于1,如果等于1则执行正常数据查询,最后将获取到的数据添加到subset列表中。

    def run(self):
  try:
      timestemp = time.time()
      url = "{0}?0.{1}&callback=&k={2}&page=1&order=default&sort=desc&action=moreson&_={3}&verify={4}".format(
           self.url, timestemp, self.domain, timestemp, self.verify)
      result = json.loads(req.get(url).content) # 读取json格式数据
      if result.get('status') == '1':
          for item in result.get('data'):
              if is_domain(item.get('domain')):
                  self.subset.append(item.get('domain'))
      elif result.get('status') == 3:
          logging.info("chaxun.la api block you ip...")
          logging.info("input you verify_code in http://subdomain.chaxun.la/wuyun.org/")
          # print('get verify_code():', self.verify)
          # self.verify_code()
          # self.run()
      return list(set(self.subset))
  except Exception as e:
      logging.info(str(e))
      return self.subset

跟进verify_code()方法,首先生成了时间戳,然后通过时间戳构造图片url地址,调用download()方法下载图片,最后调用captcha.verification()方法识别验证码,返回识别以后的结果。

    def verify_code(self):
  timestemp = time.time() # 时间戳
  imgurl = 'http://api.chaxun.la/api/seccode/?0.{0}'.format(timestemp)
  if self.download(imgurl):
      captcha = Captcha()
      code_result = captcha.verification(filename='captcha.gif')
      self.verify = code_result.get('Result')

跟进download方法,根据url发送请求获取图片信息,然后保存到本地。

    def download(self, url):
  try:
      r = req.get(url)
      with open("captcha.gif", "wb") as image:    
          image.write(r.content)
      return True
  except Exception, e:
      return False

调用流程:Chaxunla() > run() > verify_code() & download()

8.googlect模块

数据源:https://www.google.com/transparencyreport/jsonp/ct

python标准模块:

import time
import json
import logging

from random import Random,uniform
from urllib import quote

作者自定义模块:

from common import http_request_get

自定义函数:

random_sleep()
random_str()

自定义类:

TransparencyReport()

TransparencyReport()类初始化方法中定义了用户指定的域名domain参数,token空字符串,dns_names空列表用于存放dns,subjects空列表,hashs空列表,默认num_result参数,website参数为api接口地址。

class TransparencyReport(object):
   """docstring for TransparencyReport"""
   def __init__(self, domain):
       self.domain = domain
       self.token = ""
       self.dns_names = []
       self.subjects = []
       self.hashs = []
       self.num_result = 0
       self.website = 'https://www.google.com/transparencyreport/jsonp/ct'

run()方法中主要调用了parser_subject(),与parser_dnsname()这这两个方法,run()方法最后的返回值是一个字典,字典的值为空列表。所以推断通过运行上面的两个方法获取某些数据,添加到已经定义好的空列表里,最后return组成好的字典变量。

    def run(self):
  self.parser_subject()
  self.hashs = list(set(self.hashs)) # unique sort hash
  self.parser_dnsname()
  self.dns_names = list(set(self.dns_names))
  self.subjects = list(set(self.subjects))
  return {'subjects': self.subjects, 'dns_names': self.dns_names}

主要跟进run() > parser_subject()与run() > parser_dnsname()这两个方法进行分析。

run() > parser_subject() > 调用random_str()获取执行以后的结果。获取callback参数然后拼接url地址,通过http_request_get()方法发送http请求并获取返回的content,json.loads读取并处理返回的json格式数据。result通过get()函数获取读取下一页数据的token,最后 for循环根据条件读取出域名数据,将其添加到dns_names列表中。

    def parser_subject(self):
  try:
      callback = random_str()
      url = '{0}/search?domain={1}&incl_exp=true&incl_sub=true&token={2}&c={3}'.format(self.website, self.domain, quote(self.token), callback)
      content = http_request_get(url).content
      result = json.loads(content[27:-3])
      self.token = result.get('nextPageToken')
      for subject in result.get('results'):
          if subject.get('subject'):
              self.dns_names.append(subject.get('subject'))
          if subject.get('hash'):
              self.hashs.append(subject.get('hash'))
  except Exception as e:
      logging.info(str(e))

  if self.token:
      self.parser_subject()

run > parser_dnsname()方法作者主要用于处理调用谷歌搜索的部分加密数据。如果没有特定的hash字符串api的请求是无法完成的。

def parser_dnsname(self):
   for hashstr in self.hashs:
       try:
           callback = random_str()
           url = '{0}/cert?hash={1}&c={2}'.format(
                   self.website, quote(hashstr), callback)
           content = http_request_get(url).content
           result = json.loads(content[27:-3])
           if result.get('result').get('subject'):
               self.subjects.append(result.get('result').get('subject'))
           if result.get('result').get('dnsNames'):
               self.dns_names.extend(result.get('result').get('dnsNames'))
       except Exception as e:
           logging.info(str(e))
       random_sleep()

很多搜索引擎的查询结果都使用自身的加密算法对搜索的结果进行加密,比如雅虎搜索引擎,所以在编写这种类似爬虫脚本的时候,一定要根据网站执行查询请求时发送的数据与返回的数据进行相应的处理。

以下代码块节选自EngineCrawler项目 lib/engines/yahoo.py 文件

def _extract_url(self, resp):
   '''
  雅虎搜索返回的url结果:
  yahoo_encryption_url = '
  http://r.search.yahoo.com/_ylt=AwrgDaPBKfdaMjAAwOBXNyoA;_ylu=X3oDMTByb2lvbXVuBGNvbG8DZ3ExBHBvcwMxBHZ0aWQDBHNlYwNzcg--/RV=2/RE=1526176322/RO=10/RU=
  http://berkeleyrecycling.org/page.php?id=1
  /RK=2/RS=6rTzLqNgZrFS8Kb4ivPrFbBBuFs-'
  '''
   soup = BeautifulSoup(resp.text, "lxml")
   try:
       a_tags = soup.find_all("a", " ac-algo fz-l ac-21th lh-24")
       for a_tag in a_tags:
           yahoo_encryption_url = a_tag['href']
           yahoo_decrypt_url = unquote(yahoo_encryption_url)   # 解码
           split_url = yahoo_decrypt_url.split('http://')
           if len(split_url) == 1:
               result_https_url = 'https://' + split_url[0].split('https://')[1].split('/R')[0] # 获取返回https协议的url
               self.urls.append(result_https_url)
               print '[-]Yahoo: ' + result_https_url
           else:
               result_http_url = 'http://' + split_url[2].split('/R')[0] # 获取返回http协议的url
               print '[-]Yahoo: ' + result_http_url
               self.urls.append(result_http_url)
   except Exception:
       pass

调用流程:

TransparencyReport() > run() > parser_subject() & parser_dnsname() & parser_dnsname()

9.passivetotal模块

数据源地址:https://www.passivetotal.org

python标准模块:

import re
import json
import logging

python第三方模块:

import requests

作者自定义模块:

from common import curl_get_content, http_request_get, is_domain,http_request_post

自定义类:

PassiveTotal()

自定义类方法:

run()

PassiveTotal()类初始化方法中定义了用户指定的domain参数,用于存放子域名的subset空列表,website则是数据源地址。

run()方法中首先根据账号,key进行认证,然后构造http请求将需要查询的网站地址传递到post请求体里,根据返回的信息使用json.loads进行读取处理。最后使用for循环将子域名结果添加到列表内。

class PassiveTotal(object):
   """docstring for PassiveTotal"""
   def __init__(self, domain):
       super(PassiveTotal, self).__init__()
       self.domain = domain
       self.subset = []
       self.website = "https://www.passivetotal.org"

   def run(self):
       try:
           # 此auth 需要自行申请,每个auth 每天有查询次数限制
           auth=("[email protected]","d160262241ccf53222d42edc6883c129")
           payload={"query":"*.%s" % self.domain}
           url = "https://api.passivetotal.org/v2/enrichment/subdomains"
           response = requests.get(url,auth=auth,params=payload)

           for sub in json.loads(response.content)['subdomains']:
               sub="%s.%s" %(sub,self.domain)
               if is_domain(sub):
                   self.subset.append(sub)

           return list(set(self.subset))
       except Exception as e:
           logging.info(str(e))
           return self.subset

调用流程:PassiveTotal() > run()

*本文作者:c01d,转载请注明来自FreeBuf.COM

来源:freebuf.com 2019-06-11 15:00:17 by: c01d

© 版权声明
THE END
喜欢就支持一下吧
点赞0
分享
评论 抢沙发

请登录后发表评论