原文链接:OpenResty + Mysql 实现日志实时存储
应用场景和日志文件解析
本配置主要解决 Nginx 向 mysql 中实时插入日志的问题,采用 OpenResty + Mysql 实现。
1. 刚开始的时候看了 Nginx 和 mysql 的连接模块。比如说 nginx-mysql-module,可以连接 mysql。但是插入日志时遇到问题,我们知道 nginx 的执行过程先是 location 解析并重写阶段,然后是访问权限控制阶段,接着是内容生成阶段,最后是日志记录阶段。mysql 访问阶段属于内容生成阶段,所以代理运行的时间和状态,mysql 都无法获取的到。因此,这种通过 nginx 直连 mysql 的方式无法达到我们的要求。
2. 通过 lua 脚本在日志生成阶段获取信息,然后将数据插入 mysql。nginx 有一个限制,无法在 log 阶段访问 socket 即无法访问 mysql,所以无法在 log 阶段直接将数据存入 mysql。但是可以通过运行包含 mysql 操作的 shell 脚本来解决这个问题。但是这个方法有两个弊端:
-
获取到 Nginx 代理的结果后,每次都要连接 mysql 并向其插入数据。当并发量大时,mysql 端会出现问题。
-
不向 mysql 插入数据,整个时间的消耗大约在 0.02-0.04s 之间。而向 mysql 插入数据后,整个时间消耗大约在 0.4-0.9 之间,消耗的时间是原来的 10 倍。
3. 通过 lua + ngx.time.at + lua_mysql + lua.share.dict 解决问题。整个过程如下所示:
-
在 nginx 启动阶段,ngx.time.at 启动一个延时任务。在任务中,每隔一段时间取出 nginx 内存共享区的 log 数据,将数据合并,存入 mysql,同时再一个相同的延时任务,递归调用。这样就与 crontab 命令相似。当定时器到期,定时器中的 Lua 代码是在一个“轻线程”中运行的,它与创造它的原始请求是完全分离的,因此不存在大量线程同时运行的情况。
-
在日志生成阶段,将数据封装并存入 nginx 的内存共享区。
Mysql 访问权限的问题
不但访问 Mysql 的 Mysql 用户需要有操作对应数据库的权限,还需要调用 Mysql 命令的用户具有访问 mysql 的权限。授权命令如下:
GRANT ALL PRIVILEGES ON *.* to root@xxx IDENTIFIED BY 'password';
Mysql 编码类型
总的来说,Mysql 的数据库对应三种编码。Mysql 客户端显示数据的编码,连接 Mysql 用的编码(即数据存入 mysql 时,数据的编码),Mysql 存储用的编码(字段,表,数据库三种格式可能不同)。不管 Mysql 存储用的编码是什么,只要 Mysql 客户端显示数据的编码和连接 Mysql 用的编码相同,数据就能通过 mysql 客户端正确显示。
配置文件
user root;
worker_processes 2;
events {
worker_connections 1024;
}
http{
lua_package_path "/home/oicq/guomm/nginx_lua/lua-resty-mysql-master/lib/?.lua;;"; --重要
lua_shared_dict logs 10m;
init_worker_by_lua_block {
local delay = 10
function put_log_into_mysql(premature)
local mysql = require "resty.mysql"
local db, err = mysql:new()
if not db then
ngx.log(ngx.ERR,"failed to instantiate mysql: ", err)
return
end
db:set_timeout(1000)
local ok, err, errcode, sqlstate = db:connect{
host = "xxx",
port = 3306,
database = "database_name",
user = "username",
password = "password",
charset = "utf8",
}
if not ok then
ngx.log(ngx.ERR,"failed to connect: ", err, ": ", errcode, " ", sqlstate)
return
end
-- get data from shared dict and put them into mysql
local key = "logs"
local vals = ""
local temp_val = ngx.shared.logs:lpop(key)
while (temp_val ~= nil)
do
vals = vals .. ",".. temp_val
temp_val = ngx.shared.logs:lpop(key)
end
if vals ~= "" then
vals = string.sub(vals, 2,-1)
local command = ("insert into es_visit_record(access_ip,server_ip,access_time,run_time,es_response_time,request_body_byte,run_state,url,post_data) values "..vals)
ngx.log(ngx.ERR,"command is ",command)
local res, err, errcode, sqlstate = db:query(command)
if not res then
ngx.log(ngx.ERR,"insert error: ", err, ": ", errcode, ": ", sqlstate, ".")
return
end
end
local ok, err = db:close()
if not ok then
ngx.log(ngx.ERR,"failed to close: ", err)
return
end
-- decycle call timer to run put_log_into_mysql method, just like crontab
local ok, err = ngx.timer.at(delay, put_log_into_mysql);
if not ok then
ngx.log(ngx.ERR, "failed to create timer: ", err)
return
end
end
local ok, err = ngx.timer.at(delay, put_log_into_mysql)
if not ok then
ngx.log(ngx.ERR, "failed to create timer: ", err)
return
end
}
upstream elasticsearch_servers {
server xxx max_fails=3 fail_timeout=30s;
server xxx max_fails=3 fail_timeout=30s;
server xx max_fails=3 fail_timeout=30s;
}
log_format porxy '$remote_addr,$upstream_addr,[$time_local],$request,$request_body,$status,$body_bytes_sent,$request_time,$upstream_response_time';
server {
listen 9202;
location / {
proxy_pass http://elasticsearch_servers;
log_by_lua_block{
local currentTime = os.date("%Y-%m-%d %H:%M:%S", os.time())
currentTime = "\"" .. currentTime .. "\""
local req_body = '-'
if ngx.var.request_body then
req_body = ngx.var.request_body
req_body = string.gsub(req_body,"\n","")
--req_body = string.gsub(req_body,"\t","")
end
req_body = "\"" .. req_body .. "\""
local req_status = 0
if ngx.var.status then
req_status = ngx.var.status
end
local req_time = 0
if ngx.var.request_time then
req_time = ngx.var.request_time
end
local req_req = "\"" .. ngx.var.request .. "\""
local remote_addr = "\"" .. ngx.var.http_x_forwarded_for .. "\""
local server_addr = "\"" .. ngx.var.upstream_addr .. "\""
local myparams = ("("..remote_addr..",".. server_addr..","..currentTime..","..ngx.var.request_time .. ",".. ngx.var.upstream_response_time..","..ngx.var.body_bytes_sent..","..ngx.var.status..","..req_req..","..req_body..")")
local key = "logs"
local len,err = ngx.shared.logs:rpush(key, myparams)
if err then
ngx.log(ngx.ERR,"failed to put log vals into shared dict")
return
end
}
}
access_log logs/es_access.log porxy;
}
}
欢迎来到这里!
我们正在构建一个小众社区,大家在这里相互信任,以平等 • 自由 • 奔放的价值观进行分享交流。最终,希望大家能够找到与自己志同道合的伙伴,共同成长。
注册 关于