Nascan 是巡风主要是做目标的资产识别(信息收集).
nascan.py 文件位于 nascan/nascan.py
- # coding:utf-8
- # author:wolf@YSRC
- import thread
- from lib.common import *
- from lib.start import *
- if __name__ == "__main__":
- try:
- CONFIG_INI = get_config() # 读取配置
- log.write('info', None, 0, u'获取配置成功')
- STATISTICS = get_statistics() # 读取统计信息
- MASSCAN_AC = [0]
- NACHANGE = [0]
- thread.start_new_thread(monitor, (CONFIG_INI,STATISTICS,NACHANGE)) # 心跳线程
- thread.start_new_thread(cruise, (STATISTICS,MASSCAN_AC)) # 失效记录删除线程
- socket.setdefaulttimeout(int(CONFIG_INI['Timeout']) / 2) # 设置连接超时
- ac_data = []
- while True:
- now_time = time.localtime()
- now_hour = now_time.tm_hour
- now_day = now_time.tm_mday
- now_date = str(now_time.tm_year) + str(now_time.tm_mon) + str(now_day)
- cy_day, ac_hour = CONFIG_INI['Cycle'].split('|')
- log.write('info', None, 0, u'扫描规则:' + str(CONFIG_INI['Cycle']))
- if (now_hour == int(ac_hour) and now_day % int(cy_day) == 0 and now_date not in ac_data) or NACHANGE[0]: # 判断是否进入扫描时段
- ac_data.append(now_date)
- NACHANGE[0] = 0
- log.write('info', None, 0, u'开始扫描')
- s = start(CONFIG_INI)
- s.masscan_ac = MASSCAN_AC
- s.statistics = STATISTICS
- s.run()
- time.sleep(60)
- except Exception, e:
- print e
读取了配置, get_config() 跟进去
- nascan/lib/common.py
- def get_config():
- config = {}
- config_info = mongo.na_db.Config.find_one({"type": "nascan"})
- for name in config_info['config']:
- if name in ['Discern_cms', 'Discern_con', 'Discern_lang', 'Discern_server']:
- config[name] = format_config(name, config_info['config'][name]['value'])
- else:
- config[name] = config_info['config'][name]['value']
- return config
就是读取了 mongodb 里面 Config 表下的内容.
回到 nascan.py
get_statistics()则是读取统计信息, 返回时间.
也是位于 nascan/lib/common.py
- def get_statistics():
- date_ = datetime.datetime.now().strftime('%Y-%m-%d')
- now_stati = mongo.na_db.Statistics.find_one({"date": date_})
- if not now_stati:
- now_stati = {date_: {"add": 0, "update": 0, "delete": 0}}
- return now_stati
- else:
- return {date_: now_stati['info']}
MASSCAN_AC 是系统来判断是否支持 masscan 扫描. 为 1 的话就是 masscan 正在扫描.
NACHANGE 是用来看现在的扫描列表和开始的列表有没有变化, 有变化设为 1.
- thread.start_new_thread(monitor, (CONFIG_INI,STATISTICS,NACHANGE)) # 心跳线程
- thread.start_new_thread(cruise, (STATISTICS,MASSCAN_AC)) # 失效记录删除线程
- socket.setdefaulttimeout(int(CONFIG_INI['Timeout']) / 2) # 设置连接超时
进入 monitor 心跳线程
位于 nascan/lib/common.py
- def monitor(CONFIG_INI, STATISTICS, NACHANGE):
- while True:
- try:
- time_ = datetime.datetime.now()
- date_ = time_.strftime('%Y-%m-%d')
- mongo.na_db.Heartbeat.update({"name": "heartbeat"}, {"$set": {"up_time": time_}})
- if date_ not in STATISTICS: STATISTICS[date_] = {"add": 0, "update": 0, "delete": 0}
- mongo.na_db.Statistics.update({"date": date_}, {"$set": {"info": STATISTICS[date_]}}, upsert=True)
- new_config = get_config()
- if base64.b64encode(CONFIG_INI["Scan_list"]) != base64.b64encode(new_config["Scan_list"]):NACHANGE[0] = 1
- CONFIG_INI.clear()
- CONFIG_INI.update(new_config)
- except Exception, e:
- print e
- time.sleep(30)
再次调用了 get_config(), 获取了配置信息, 如果 Config 表的 base64 编码值如果有变化将 NACHANGE[0]改成 NACHANGE[1]. 系统更新 config, 然后睡眠 30 秒, 表示需要重新扫描.
返回 nascan.py
Cruise()函数, 位于 nascan/lib/common.py
- def cruise(STATISTICS,MASSCAN_AC):
- while True:
- now_str = datetime.datetime.now()
- week = int(now_str.weekday())
- hour = int(now_str.hour)
- if week>= 1 and week <= 5 and hour>= 9 and hour <= 18: # 非工作时间不删除
- try:
- data = mongo.NA_INFO.find().sort("time", 1)
- for history_info in data:
- while True:
- if MASSCAN_AC[0]: # 如果 masscan 正在扫描即不进行清理
- time.sleep(10)
- else:
- break
- ip = history_info['ip']
- port = history_info['port']
- try:
- sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
- sock.connect((ip, int(port)))
- sock.close()
- except Exception, e:
- time_ = datetime.datetime.now()
- date_ = time_.strftime('%Y-%m-%d')
- mongo.NA_INFO.remove({"ip": ip, "port": port})
- log.write('info', None, 0, '%s:%s delete' % (ip, port))
- STATISTICS[date_]['delete'] += 1
- del history_info["_id"]
- history_info['del_time'] = time_
- history_info['type'] = 'delete'
- mongo.NA_HISTORY.insert(history_info)
- except:
- pass
- time.sleep(3600)
记录失效目标并删除线程, 对目标 (ip:port) 进行 sock 连接, 如果连接不上就删除 INFO 里面 ip 和 port. 然后写进 history 表里.
回到 nascan.py
- if (now_hour == int(ac_hour) and now_day % int(cy_day) == 0 and now_date not in ac_data) or NACHANGE[0]: # 判断是否进入扫描时段
- (now_hour == int(ac_hour) and now_day % int(cy_day) == 0 and now_date not in ac_data)
是判断是否到达扫描的周期时间.
或者就是 NACHANGE[0]的值为 1, 任何一个成立都可以重新扫描.
进入 Start()函数
nascan/lib/start.py
在 start 类中,__init__初始化了传递过来的配置信息. 直接看 run(), 处理目标 IP 地址和使用 masscan 进行初步扫描等.
- def run(self):
- global AC_PORT_LIST
- all_ip_list = []
- for ip in self.scan_list:
- if "/" in ip: ip = cidr.CIDR(ip)
- if not ip:continue
- ip_list = self.get_ip_list(ip)
- for white_ip in self.white_list:
- if white_ip in ip_list:
- ip_list.remove(white_ip)
- if self.mode == 1:
- self.masscan_path = self.config_ini['Masscan'].split('|')[2]
- self.masscan_rate = self.config_ini['Masscan'].split('|')[1]
- ip_list = self.get_ac_ip(ip_list)
- self.masscan_ac[0] = 1
- AC_PORT_LIST = self.masscan(ip_list) # 如果安装了 Masscan 即使用 Masscan 进行全端口扫描
- if not AC_PORT_LIST: continue
- self.masscan_ac[0] = 0
- for ip_str in AC_PORT_LIST.keys(): self.queue.put(ip_str) # 加入队列
- self.scan_start() # 开始扫描
- else:
- all_ip_list.extend(ip_list)
- if self.mode == 0:
- if self.icmp: all_ip_list = self.get_ac_ip(all_ip_list)
- for ip_str in all_ip_list: self.queue.put(ip_str) # 加入队列
- self.scan_start() # TCP 探测模式开始扫描
if "/" in ip: ip = cidr.CIDR(ip) , 支持这样的格式: 127.0.0.1/24
if self.mode == 1 判断是否支持 masscan 扫描, 如果支持就使用 Masscan 进行全端口扫描. 如果没有开启, 将 ip 添加到 all_ip_list 这个列表中.
跟 masscan()函数
- nascan/lib/start.py
- def masscan(self, ip):
- try:
- if len(ip) == 0: return
- sys.path.append(sys.path[0] + "/plugin")
- m_scan = __import__("masscan")
- result = m_scan.run(ip, self.masscan_path, self.masscan_rate)
- return result
- except Exception, e:
- print e
- print 'No masscan plugin detected'
调用了 / plugin/masscan.py
- def run(ip_list,path,rate):
- try:
- ip_file = open('target.log','w')
- ip_file.write("\n".join(ip_list))
- ip_file.close()
- path = str(path).translate(None, ';|&')
- rate = str(rate).translate(None, ';|&')
- if not os.path.exists(path):return
- os.system("%s -p1-65535 -iL target.log -oL tmp.log --randomize-hosts --rate=%s"%(path,rate))
- result_file = open('tmp.log', 'r')
- result_json = result_file.readlines()
- result_file.close()
- del result_json[0]
- del result_json[-1]
- open_list = {}
- for res in result_json:
- try:
- ip = res.split()[3]
- port = res.split()[2]
- if ip in open_list:
- open_list[ip].append(port)
- else:
- open_list[ip] = [port]
- except:pass
- os.remove('target.log')
- os.remove('tmp.log')
- return open_list
- except:
- pass
先过滤了;|& 三个特殊字符. 然后拼接到命令中
masscan -p1-65535 -iL target.log -oL tmp.log --randomize-hosts --rate=20000
用 masscan 扫描好了后保存 tmp.log 文件里然后读取结果.
不管开没开 masscan, 都会进入 scan_start().
跟进到 ThreadNum, 位于 / nascan/lib/start.py
- class ThreadNum(threading.Thread):
- def __init__(self, queue):
- threading.Thread.__init__(self)
- self.queue = queue
- def run(self):
- while True:
- try:
- task_host = self.queue.get(block=False)
- except:
- break
- try:
- if self.mode:
- port_list = AC_PORT_LIST[task_host]
- else:
- port_list = self.config_ini['Port_list'].split('|')[1].split('\n')
- _s = scan.scan(task_host, port_list)
- _s.config_ini = self.config_ini # 提供配置信息
- _s.statistics = self.statistics # 提供统计信息
- _s.run()
- except Exception, e:
- print e
- finally:
- self.queue.task_done()
run()函数, 把 IP 地址和端口号列表传到另一个 scan()函数中.
位于 / nascan/lib/scan.py
- class scan:
- def __init__(self, task_host, port_list):
- self.ip = task_host
- self.port_list = port_list
- self.config_ini = {}
- def run(self):
- self.timeout = int(self.config_ini['Timeout'])
- for _port in self.port_list:
- self.server = '' self.banner =''
- self.port = int(_port)
- self.scan_port() # 端口扫描
- if not self.banner:continue
- self.server_discern() # 服务识别
- if self.server == '':
- web_info = self.try_web() # 尝试 web 访问
- if web_info:
- log.write('web', self.ip, self.port, web_info)
- time_ = datetime.datetime.now()
- mongo.NA_INFO.update({'ip': self.ip, 'port': self.port},
- {"$set": {'banner': self.banner, 'server': 'web', 'webinfo': web_info,
- 'time': time_}})
scan 类的 run 函数. 先进行了端口扫描, scan_port()函数
位于 / nascan/lib/scan.py
- def scan_port(self):
- try:
- sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
- sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
- sock.connect((self.ip, self.port))
- time.sleep(0.2)
- except Exception, e:
- return
- try:
- self.banner = sock.recv(1024)
- sock.close()
- if len(self.banner) <= 2:
- self.banner = 'NULL'
- except Exception, e:
- self.banner = 'NULL'
- log.write('portscan', self.ip, self.port, None)
- banner = ''
- hostname = self.ip2hostname(self.ip)
- time_ = datetime.datetime.now()
- date_ = time_.strftime('%Y-%m-%d')
- try:
- banner = unicode(self.banner, errors='replace')
- if self.banner == 'NULL': banner = '' mongo.NA_INFO.insert({"ip": self.ip,"port": self.port,"hostname": hostname,"banner": banner,"time": time_})
- self.statistics[date_]['add'] += 1
- except:
- if banner:
- history_info = mongo.NA_INFO.find_and_modify(
- query={"ip": self.ip, "port": self.port, "banner": {"$ne": banner}}, remove=True)
- if history_info:
- mongo.NA_INFO.insert(
- {"ip": self.ip, "port": self.port, "hostname": hostname, "banner": banner, "time": time_})
- self.statistics[date_]['update'] += 1
- del history_info["_id"]
- history_info['del_time'] = time_
- history_info['type'] = 'update'
- mongo.NA_HISTORY.insert(history_info)
通过 socket 连接, 获得端口服务返回的 banner 信息, 然后进入 server_discern()函数, 通过正则表达式, 依次比较, 获得服务类型.
server_discern()函数
位于 / nascan/lib/scan.py
- def server_discern(self):
- for mark_info in self.config_ini['Discern_server']: # 快速识别
- try:
- name, default_port, mode, reg = mark_info
- if mode == 'default':
- if int(default_port) == self.port:
- self.server = name
- elif mode == 'banner':
- matchObj = re.search(reg, self.banner, re.I | re.M)
- if matchObj:
- self.server = name
- if self.server:break
- except:
- continue
- if not self.server and self.port not in [80,443,8080]:
- for mark_info in self.config_ini['Discern_server']: # 发包识别
- try:
- name, default_port, mode, reg = mark_info
- if mode not in ['default','banner']:
- dis_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
- dis_sock.connect((self.ip, self.port))
- mode = mode.decode('string_escape')
- reg = reg.decode('string_escape')
- dis_sock.send(mode)
- time.sleep(0.3)
- dis_recv = dis_sock.recv(1024)
- matchObj = re.search(reg, dis_recv, re.I | re.M)
- if matchObj:
- self.server = name
- break
- except:
- pass
- if self.server:
- log.write("server", self.ip, self.port, self.server)
- mongo.NA_INFO.update({"ip": self.ip, "port": self.port}, {"$set": {"server": self.server}})
对于没识别出来的服务类型, 端口号又不是常见端口号, 会重新发包, 发送特定包才会返回应答 banner 的服务类型.
最后如果还没识别出来, 进入 try_web()函数
位于 / nascan/lib/scan.py
- def try_web(self):
- title_str, html = '',''
- try:
- if self.port == 443:
- info = urllib2.urlopen("https://%s:%s" % (self.ip, self.port), timeout=self.timeout)
- else:
- info = urllib2.urlopen("http://%s:%s" % (self.ip, self.port), timeout=self.timeout)
- html = info.read()
- header = info.headers
- except urllib2.HTTPError, e:
- html = e.read()
- header = e.headers
- except:
- return
- if not header: return
- if 'Content-Encoding' in header and 'gzip' in header['Content-Encoding']: # 解压 gzip
- html_data = StringIO.StringIO(html)
- gz = gzip.GzipFile(fileobj=html_data)
- html = gz.read()
- try:
- html_code = self.get_code(header, html).strip()
- if html_code and len(html_code) <12:
- html = html.decode(html_code).encode('utf-8')
- except: pass
- try:
- title = re.search(r'<title>(.*?)</title>', html, flags=re.I | re.M)
- if title: title_str = title.group(1)
- except: pass
- try:
- web_banner = str(header) + "\r\n\r\n" + html
- self.banner = web_banner
- history_info = mongo.NA_INFO.find_one({"ip": self.ip, "port": self.port})
- if 'server' not in history_info:
- tag = self.get_tag()
- web_info = {'title': title_str, 'tag': tag}
- return web_info
- else:
- if abs(len(history_info['banner'].encode('utf-8')) - len(web_banner))> len(web_banner) / 60:
- del history_info['_id']
- history_info['del_time'] = datetime.datetime.now()
- mongo.NA_HISTORY.insert(history_info)
- tag = self.get_tag()
- web_info = {'title': title_str, 'tag': tag}
- date_ = datetime.datetime.now().strftime('%Y-%m-%d')
- self.statistics[date_]['update'] += 1
- log.write('info', None, 0, '%s:%s update web info'%(self.ip, self.port))
- return web_info
- except:
- return
- def ip2hostname(self,ip):
- try:
- hostname = socket.gethostbyaddr(ip)[0]
- return hostname
- except:
- pass
- try:
- query_data = "\x00\x00\x00\x00\x00\x01\x00\x00\x00\x00\x00\x00\x20\x43\x4b\x41\x41" + \
- "\x41\x41\x41\x41\x41\x41\x41\x41\x41\x41\x41\x41\x41\x41\x41\x41\x41" + \
- "\x41\x41\x41\x41\x41\x41\x41\x41\x41\x41\x41\x00\x00\x21\x00\x01"
- dport = 137
- _s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
- _s.settimeout(3)
- _s.sendto(query_data, (ip, dport))
- x = _s.recvfrom(1024)
- tmp = x[0][57:]
- hostname = tmp.split("\x00", 2)[0].strip()
- hostname = hostname.split()[0]
- return hostname
- except:
- pass
- def get_code(self, header, html):
- try:
- m = re.search(r'<meta.*?charset=(.*?)"(>| |/)', html, flags=re.I)
- if m: return m.group(1).replace('"','')
- except:
- pass
- try:
- if 'Content-Type' in header:
- Content_Type = header['Content-Type']
- m = re.search(r'.*?charset=(.*?)(;|$)', Content_Type, flags=re.I)
- if m: return m.group(1)
- except:
- pass
这个函数就是尝试用 web 访问, 如果有结果的话就保存下来, 没有的话就不管了.
回到 nascan.
大概每隔一分钟探测是否要进行扫描.
参考文章:
- https://landgrey.me/xunfeng-nascan-analysis/
- https://www.cnblogs.com/yangxiaodi/p/8011563.html
来源: https://www.cnblogs.com/zhengjim/p/9436008.html