为什么使用 SqlmapAPI?
由于 SQLMAP 每检测一个站点都需要开启一个新的命令行窗口或者结束掉上一个检测任务。虽然 -m 参数可以批量扫描 URL,但是模式也是一个结束扫描后才开始另一个扫描任务。通过 api 接口,下发扫描任务就简单了,无需开启一个新的命令行窗口。
SqlmapAPI
在 sqlmap 安装目录下的 sqlmapapi.py 文件,这个就是 sqlmap api,sqlmapapi 分为服务端以及客户端。
C:\Users\yanghelong\Desktop\sqlmap>python sqlmapapi.py -h
Usage: sqlmapapi.py [options]
Options:
-h, --help 显示帮助信息并
-s, --server 作为Api服务端运行
-c, --client 作为Api客户端运行
-H HOST, --host=HOST 指定服务端IP地址(默认IP : "127.0.0.1")
-p PORT, --port=PORT 指定服务端端口 (默认端口 : 8775)
--adapter=ADAPTER 服务端标准接口(默认值 : "wsgiref")
--username=USERNAME 可选,设置用户名
--password=PASSWORD 可选,设置密码
启动服务端
python sqlmapapi.py -s 命令成功后,在命令行中会返回一些信息。大概的意思是 api 服务端在本地 8775 端口上运行,admin token 为 bca6e58ac03c9cb52f35a87aeaa28dcf,IPC 数据库的位置在 C:\Users\YANGHE~1\AppData\Local\Temp\sqlmapipc-9lqlm_4d,api 服务端已经和 IPC 数据库连接上了,正在使用 bottle 框架 wsgiref 标准接口。
C:\Users\yanghelong\Desktop\sqlmap> python sqlmapapi.py -s
[15:52:26] [INFO] Running REST-JSON API server at '127.0.0.1:8775'..
[15:52:26] [INFO] Admin (secret) token: bca6e58ac03c9cb52f35a87aeaa28dcf
[15:52:26] [DEBUG] IPC database: 'C:\Users\YANGHE~1\AppData\Local\Temp\sqlmapipc-9lqlm_4d'
[15:52:26] [DEBUG] REST-JSON API server connected to IPC database
[15:52:26] [DEBUG] Using adapter 'wsgiref' to run bottle
但是通过上面的这种方式开启 api 服务端有一个缺点,当服务端和客户端不在同一台主机时会连接不上,因此如果要解决这个问题,可以通过输入以下命令来开启 api 服务端:
python sqlmapapi.py -s -H "0.0.0.0" -p 8775
命令成功后,远程客户端就可以通过指定远程主机 IP 和端口来连接到 API 服务端。
固定 admin token
如果您有特殊的需求需要固定 admin token 的话,可以修改 sqlmap 目录下的/lib/utils/api.py 文件,以下是源代码:
DataStore.admin_token = encodeHex(os.urandom(16), binary=False)
接口模式
基于命令行的接口模式
C:\Users\yanghelong\Desktop\sqlmap> python sqlmapapi.py -c
[16:00:23] [DEBUG] Example client access from command line:
$ taskid=$(curl http://127.0.0.1:8775/task/new 2>1 | grep -o -I '[a-f0-9]\{16\}') && echo $taskid
$ curl -H "Content-Type: application/json" -X POST -d '{"url": "http://testphp.vulnweb.com/artists.php?artist=1"}' http://127.0.0.1:8775/scan/$taskid/start
$ curl http://127.0.0.1:8775/scan/$taskid/data
$ curl http://127.0.0.1:8775/scan/$taskid/log
[16:00:23] [INFO] Starting REST-JSON API client to 'http://127.0.0.1:8775'...
[16:00:23] [DEBUG] Calling 'http://127.0.0.1:8775'
[16:00:23] [INFO] Type 'help' or '?' for list of available commands
api> help
help 显示帮助信息
new ARGS 开启一个新的扫描任务
use TASKID 切换指定taskid任务
data 获取当前任务返回的数据
log 获取当前任务的扫描日志
status 获取当前任务的扫描状态
option OPTION 获取当前任务的选项
options 获取当前任务的所有配置信息
stop 停止当前任务
kill 杀死当前任务
list 显示所有任务列表
version Fetch server version
flush 清空任务(默认是所有任务)
exit 退出客户端
检测 GET 注入
new -u url
eg:
//现请求了/task/new创建一个新的taskid,后由发起/scan/cafeb23befb85bb6/start请求开始任务
api> new -u "http://192.168.64.130/dvwa/vulnerabilities/sqli/?id=2&Submit=Submit"
[16:20:11] [DEBUG] Calling 'http://127.0.0.1:8775/task/new'
[16:20:11] [INFO] New task ID is 'cafeb23befb85bb6'
[16:20:11] [DEBUG] Calling 'http://127.0.0.1:8775/scan/cafeb23befb85bb6/start'
[16:20:11] [INFO] Scanning started
api (cafeb23befb85bb6)>
//通过status 获取该任务的扫描结果,若返回内容中的status字段为terminated,说明扫描完成,若返回内容中的status字段为running,说明扫描还在进行中
api (cafeb23befb85bb6)> status
[16:21:29] [DEBUG] Calling 'http://127.0.0.1:8775/scan/cafeb23befb85bb6/status'
{
"success": true,
"status": "running",
"returncode": null
}
//输入 data 命令,来获取扫描完成后注入出来的信息,若返回的内容中data字段不为空就说明存在注入。
//不存在注入返回的内容,data字段为空
{
"success": true,
"data": [],
"error": []
}
检测 POST 注入
//输入以下命令,在data.txt中加入数据,指定注入的位置,来达到检测POST、cookie、UA等注入的目的:
new -r data.txt
api> new -r C:\Users\yanghelong\Desktop\sql\build1\test.txt
[16:29:30] [DEBUG] Calling 'http://127.0.0.1:8775/task/new'
[16:29:30] [INFO] New task ID is '75708f5e15824979'
[16:29:30] [DEBUG] Calling 'http://127.0.0.1:8775/scan/75708f5e15824979/start'
[16:29:30] [INFO] Scanning started
基于命令行的接口模式用起来还是比较方便的,我们只需要通过 new 命令就可以自动创建 taskid 并开始该任务.
基于 HTTP 协议的接口模式
sqlmapapi.py 的 h 基于 http 接口调用模式的主要函数,进入到 lib/utils/api.py 的 server 类,可以发现通过向 server 提交数据进行与服务的交互。
#辅助
@get('/error/401')
@get("/task/new")
@get("/task/<taskid>/delete")
#Admin 命令
@get("/admin/list")
@get("/admin/<token>/list")
@get("/admin/flush")
@get("/admin/<token>/flush")
#sqlmap 核心交互命令
@get("/option/<taskid>/list")
@post("/option/<taskid>/get")
@post("/option/<taskid>/set")
@post("/scan/<taskid>/start")
@get("/scan/<taskid>/stop")
@get("/scan/<taskid>/kill")
@get("/scan/<taskid>/status")
@get("/scan/<taskid>/data")
@get("/scan/<taskid>/log/<start>/<end>")
@get("/scan/<taskid>/log")
@get("/download/<taskid>/<target>/<filename:path>")
@get("/task/new")
新建任务,返回一个随机的 taskid
@get("/task/new")
def task_new():
"""
Create a new task
"""
taskid = encodeHex(os.urandom(8), binary=False)
remote_addr = request.remote_addr
DataStore.tasks[taskid] = Task(taskid, remote_addr)
logger.debug("Created new task: '%s'" % taskid)
return jsonize({"success": True, "taskid": taskid})
@get("/task//delete")
删除任务,在调用时指定 taskid,不指定 taskid 会报错。
@get("/task/<taskid>/delete")
def task_delete(taskid):
"""
Delete an existing task
"""
if taskid in DataStore.tasks:
DataStore.tasks.pop(taskid)
logger.debug("(%s) Deleted task" % taskid)
return jsonize({"success": True})
else:
response.status = 404
logger.warning("[%s] Non-existing task ID provided to task_delete()" % taskid)
return jsonize({"success": False, "message": "Non-existing task ID"})
@get("/admin/list")
获取所有 taskid。
def task_list(token=None):
"""
Pull task list
"""
tasks = {}
for key in DataStore.tasks:
if is_admin(token) or DataStore.tasks[key].remote_addr == request.remote_addr:
tasks[key] = dejsonize(scan_status(key))["status"]
logger.debug("(%s) Listed task pool (%s)" % (token, "admin" if is_admin(token) else request.remote_addr))
return jsonize({"success": True, "tasks": tasks, "tasks_num": len(tasks)})
@get("/option//list")
获取特定任务 ID 的列表选项,调用时请指定 taskid,不然会报错。具体代码如下:
def option_list(taskid):
"""
List options for a certain task ID
"""
if taskid not in DataStore.tasks:
logger.warning("[%s] Invalid task ID provided to option_list()" % taskid)
return jsonize({"success": False, "message": "Invalid task ID"})
logger.debug("(%s) Listed task options" % taskid)
return jsonize({"success": True, "options": DataStore.tasks[taskid].get_options()})
@post("/option//get")
获取特定任务 ID 的选项值,调用时请指定 taskid,不然会报错。具体代码如下:
def option_get(taskid):
"""
Get value of option(s) for a certain task ID
"""
if taskid not in DataStore.tasks:
logger.warning("[%s] Invalid task ID provided to option_get()" % taskid)
return jsonize({"success": False, "message": "Invalid task ID"})
options = request.json or []
results = {}
for option in options:
if option in DataStore.tasks[taskid].options:
results[option] = DataStore.tasks[taskid].options[option]
else:
logger.debug("(%s) Requested value for unknown option '%s'" % (taskid, option))
return jsonize({"success": False, "message": "Unknown option '%s'" % option})
logger.debug("(%s) Retrieved values for option(s) '%s'" % (taskid, ",".join(options)))
return jsonize({"success": True, "options": results})
@post("/option//set")
为特定任务 ID 设置选项值,调用时请指定 taskid,不然会报错。具体代码如下:
def option_set(taskid):
"""
Set value of option(s) for a certain task ID
"""
if taskid not in DataStore.tasks:
logger.warning("[%s] Invalid task ID provided to option_set()" % taskid)
return jsonize({"success": False, "message": "Invalid task ID"})
if request.json is None:
logger.warning("[%s] Invalid JSON options provided to option_set()" % taskid)
return jsonize({"success": False, "message": "Invalid JSON options"})
for option, value in request.json.items():
DataStore.tasks[taskid].set_option(option, value)
logger.debug("(%s) Requested to set options" % taskid)
return jsonize({"success": True})
@post("/scan//start")
开始扫描特定任务,调用时请指定 taskid,不然会报错。具体代码如下:
def scan_start(taskid):
"""
Launch a scan
"""
if taskid not in DataStore.tasks:
logger.warning("[%s] Invalid task ID provided to scan_start()" % taskid)
return jsonize({"success": False, "message": "Invalid task ID"})
if request.json is None:
logger.warning("[%s] Invalid JSON options provided to scan_start()" % taskid)
return jsonize({"success": False, "message": "Invalid JSON options"})
# Initialize sqlmap engine's options with user's provided options, if any
for option, value in request.json.items():
DataStore.tasks[taskid].set_option(option, value)
# Launch sqlmap engine in a separate process
DataStore.tasks[taskid].engine_start()
logger.debug("(%s) Started scan" % taskid)
return jsonize({"success": True, "engineid": DataStore.tasks[taskid].engine_get_id()})
@get("/scan//stop")
停止扫描特定任务,调用时请指定 taskid,不然会出现问题。具体代码如下:
def scan_stop(taskid):
"""
Stop a scan
"""
if (taskid not in DataStore.tasks or DataStore.tasks[taskid].engine_process() is None or DataStore.tasks[taskid].engine_has_terminated()):
logger.warning("[%s] Invalid task ID provided to scan_stop()" % taskid)
return jsonize({"success": False, "message": "Invalid task ID"})
DataStore.tasks[taskid].engine_stop()
logger.debug("(%s) Stopped scan" % taskid)
return jsonize({"success": True})
@get("/scan//status")
该接口可查询扫描状态,调用时请指定 taskid,不然会出现问题。具体代码如下:
def scan_status(taskid):
"""
Returns status of a scan
"""
if taskid not in DataStore.tasks:
logger.warning("[%s] Invalid task ID provided to scan_status()" % taskid)
return jsonize({"success": False, "message": "Invalid task ID"})
if DataStore.tasks[taskid].engine_process() is None:
status = "not running"
else:
status = "terminated" if DataStore.tasks[taskid].engine_has_terminated() is True else "running"
logger.debug("(%s) Retrieved scan status" % taskid)
return jsonize({
"success": True,
"status": status,
"returncode": DataStore.tasks[taskid].engine_get_returncode()
})
@get("/scan//data")
该接口可获得到扫描结果,调用时请指定 taskid,不然会出现问题。具体代码如下:
def scan_data(taskid):
"""
Retrieve the data of a scan
"""
json_data_message = list()
json_errors_message = list()
if taskid not in DataStore.tasks:
logger.warning("[%s] Invalid task ID provided to scan_data()" % taskid)
return jsonize({"success": False, "message": "Invalid task ID"})
# Read all data from the IPC database for the taskid
for status, content_type, value in DataStore.current_db.execute("SELECT status, content_type, value FROM data WHERE taskid = ? ORDER BY id ASC", (taskid,)):
json_data_message.append({"status": status, "type": content_type, "value": dejsonize(value)})
# Read all error messages from the IPC database
for error in DataStore.current_db.execute("SELECT error FROM errors WHERE taskid = ? ORDER BY id ASC", (taskid,)):
json_errors_message.append(error)
logger.debug("(%s) Retrieved scan data and error messages" % taskid)
return jsonize({"success": True, "data": json_data_message, "error": json_errors_message})
@get("/scan//log")
该接口可查询特定任务的扫描的日志,调用时请指定 taskid,不然会出现问题。具体代码如下:
def scan_log(taskid):
"""
Retrieve the log messages
"""
json_log_messages = list()
if taskid not in DataStore.tasks:
logger.warning("[%s] Invalid task ID provided to scan_log()" % taskid)
return jsonize({"success": False, "message": "Invalid task ID"})
# Read all log messages from the IPC database
for time_, level, message in DataStore.current_db.execute("SELECT time, level, message FROM logs WHERE taskid = ? ORDER BY id ASC", (taskid,)):
json_log_messages.append({"time": time_, "level": level, "message": message})
logger.debug("(%s) Retrieved scan log messages" % taskid)
return jsonize({"success": True, "log": json_log_messages})
def scan_log_limited(taskid, start, end):
"""
Retrieve a subset of log messages
"""
json_log_messages = list()
if taskid not in DataStore.tasks:
logger.warning("[%s] Invalid task ID provided to scan_log_limited()" % taskid)
return jsonize({"success": False, "message": "Invalid task ID"})
if not start.isdigit() or not end.isdigit() or end < start:
logger.warning("[%s] Invalid start or end value provided to scan_log_limited()" % taskid)
return jsonize({"success": False, "message": "Invalid start or end value, must be digits"})
start = max(1, int(start))
end = max(1, int(end))
# Read a subset of log messages from the IPC database
for time_, level, message in DataStore.current_db.execute("SELECT time, level, message FROM logs WHERE taskid = ? AND id >= ? AND id <= ? ORDER BY id ASC", (taskid, start, end)):
json_log_messages.append({"time": time_, "level": level, "message": message})
logger.debug("(%s) Retrieved scan log messages subset" % taskid)
return jsonize({"success": True, "log": json_log_messages})
@get("/download////")
下载服务端指定任务的文件。具体代码如下:
def download(taskid, target, filename):
"""
Download a certain file from the file system
"""
if taskid not in DataStore.tasks:
logger.warning("[%s] Invalid task ID provided to download()" % taskid)
return jsonize({"success": False, "message": "Invalid task ID"})
path = os.path.abspath(os.path.join(paths.SQLMAP_OUTPUT_PATH, target, filename))
# Prevent file path traversal
if not path.startswith(paths.SQLMAP_OUTPUT_PATH):
logger.warning("[%s] Forbidden path (%s)" % (taskid, target))
return jsonize({"success": False, "message": "Forbidden path"})
if os.path.isfile(path):
logger.debug("(%s) Retrieved content of file %s" % (taskid, target))
with open(path, 'rb') as inf:
file_content = inf.read()
return jsonize({"success": True, "file": base64encode(file_content)})
else:
logger.warning("[%s] File does not exist %s" % (taskid, target))
return jsonize({"success": False, "message": "File does not exist"})
总结
基于 HTTP 的接口模式用起来可能比较繁琐,但是对于程序调用接口还是很友善的。总之该模式的流程是:
- 通过 GET 请求 http://ip:port/task/new 这个地址,创建一个新的扫描任务;
- 通过 POST 请求 http://ip:port/scan//start 地址,并通过 json 格式提交参数,开启一个扫描;
- 通过 GET 请求 http://ip:port/scan//status 地址,即可获取指定的 taskid 的扫描状态。这个返回值分为两种,一种是 run 状态(扫描未完成),一种是 terminated 状态(扫描完成);
- 扫描完成后获取扫描的结果。
欢迎来到这里!
我们正在构建一个小众社区,大家在这里相互信任,以平等 • 自由 • 奔放的价值观进行分享交流。最终,希望大家能够找到与自己志同道合的伙伴,共同成长。
注册 关于