先看下具体效果图
主要是对自己博客服务器的流量的监控
主要思路
读取日志-> 正则解析-> 写入 influxdb->grafana 获取数据渲染
首先我们需要先将 influxdb 和 grafana 安装部署好 以便于后面使用
centos7 安装 influxdb
wget https://dl.influxdata.com/influxdb/releases/influxdb-0.13.0.x86_64.rpm
sudo yum localinstall influxdb-0.13.0.x86_64.rpm
service influxdb restart
如果遇到influxdb 8083端口访问不到web管理页面则需要到/etc/influxdb/influxdb.conf 修改参数
vim 命令模式下:/admin搜索到对应部分去掉注释#[admin] #enabled=true
[admin]
# Determines whether the admin service is enabled.
enabled =true
# The default bind address used by the admin service.
bind-address = ":8083"
# Whether the admin service should use HTTPS.
# https-enabled = false
# The SSL certificate used when HTTPS is enabled.
# https-certificate = "/etc/ssl/influxdb.pem"
然后具体操作添加用户,建库什么均在百度,和mysql大同小异
centos7 下安装 grafana
在官方文档中有很多办法
我是通过配置 yum 然后 install 的
Add the following to a new file at /etc/yum.repos.d/grafana.repo
[grafana]
name=grafana
baseurl=https://packagecloud.io/grafana/stable/el/7/$basearch
repo_gpgcheck=1
enabled=1
gpgcheck=1
gpgkey=https://packagecloud.io/gpg.key https://grafanarel.s3.amazonaws.com/RPM-GPG-KEY-grafana
sslverify=1
sslcacert=/etc/pki/tls/certs/ca-bundle.crt
sudo yum install grafana
service grafana-server restart
默认跑在3000端口
账号密码为
admin
admin
代码实现
type Reader interface {
Read(rc chan []byte)
}
type Writer interface {
Write(rc chan *Message)
}
//存入db的基本数据
type Message struct {
Host string
TimeLocal time.Time
Method, Resource, Protocol string
Status string
BytesSent int
Scheme string
Url string
UpstreamTime, RequestTime float64
}
//扩展性实现接口的struct
type ReadFromFile struct {
path string
}
type WriteToInfluxDB struct {
influxDBDsn string
}
//日志解析中转
type LogProcess struct {
rc chan []byte
wc chan *Message
read Reader
write Writer
}
从文件中读取数据(注释中的数据为我 nginx 日志中的一条数据)
/*14.215.176.15 - - [23/APR/2018:21:43:39 +0800]
"GET /CONSOLE/DIST/LAYOUTS/DEFAULT.F5E6C608DE637CAC3F50.JS HTTP/1.1"
200 5665 "HTTPS://WWW.XHXBLOG.CN/?B3ID=H9OXZSYM"
"MOZILLA/5.0 (WINDOWS NT 6.1; WOW64; RV:43.0) GECKO/20100101 FIREFOX/43.0" "-"*/
func (r *ReadFromFile) Read(rc chan []byte) {
//读取数据
f, err := os.Open(r.path)
if err != nil {
panic(fmt.Sprintf("open file fail:%s", err.Error()))
}
//跳到文件末尾
f.Seek(0, 2)
rd := bufio.NewReader(f)
//循环读取数据将每行数据送入rc channel
for {
line, err := rd.ReadBytes('\n')
if err == io.EOF {
time.Sleep(500 * time.Millisecond)
continue
} else if err != nil {
panic(fmt.Sprintf("ReadBytes error: %s", err.Error()))
}
TypeMonitorChan <- TypeHandleLine
rc <- line[:len(line)-1]
}
}
正则测试模块没有单独写出来了,放在如下函数的注释中测试
其中 method
,resource
,protocol
等是通过正则解析后的字符串再次通过空格 split
分割获取的
func (l *LogProcess) Process() {
//解析数据
r := regexp.MustCompile(`([\d\.]+)\s+([^\[]+)\s+([^\[]+)\s+\[([^\]]+)\]\s+\"([^"]+)\"\s+(\d{3})\s+(\d+)\s+\"([^"]+)\"\s+\"([^"]+)\"\s+`)
/**
测试flag
*/
//flag := 0
loc, _ := time.LoadLocation("Asia/Shanghai")
for v := range l.rc {
ret := r.FindStringSubmatch(string(v))
/*测试**/
/*
if flag != 2 {
sp := strings.Split(ret[5], " ")
fmt.Println(ret)
fmt.Println("Host:", ret[1])
fmt.Println("LocalTime:", ret[4])
fmt.Println("Method:", sp[0])
uu, _ := url.Parse(sp[1])
fmt.Println(uu.Path)
fmt.Println("Path:", sp[1])
fmt.Println("Protocol:", sp[2])
fmt.Println("Status:", ret[6])
fmt.Println("BytesSent:", ret[7])
fmt.Println("Scheme:", ret[9])
fmt.Println(ret[8])
flag++
}
*/
if len(ret) != 10 {
TypeMonitorChan <- TypeErrNum
log.Println("FindStringSubmatch fail:", string(v))
continue
}
t, err := time.ParseInLocation("02/Jan/2006:15:04:05 +0800", ret[4], loc)
if err != nil {
log.Println("ParseInLocation fail:", err.Error(), ret[4])
}
message := &Message{}
//14.215.176.15
message.Host = ret[1]
//23/APR/2018:21:43:39 +0800
message.TimeLocal = t
//MOZILLA/5.0 (WINDOWS NT 6.1; WOW64; RV:43.0) GECKO/20100101 FIREFOX/43.0
message.Scheme = ret[9]
//GET /CONSOLE/DIST/LAYOUTS/DEFAULT.F5E6C608DE637CAC3F50.JS HTTP/1.1
sp := strings.Split(ret[5], " ")
if len(sp) != 3 {
TypeMonitorChan <- TypeErrNum
log.Println("strings.Split fail:", ret[5])
continue
}
//请求方法
message.Method = sp[0]
//请求路径
u, err := url.Parse(sp[1])
if err != nil {
TypeMonitorChan <- TypeErrNum
log.Println("url parse fail:", err)
continue
}
message.Resource = u.Path
//请求协议
message.Protocol = sp[2]
//200
message.Status = ret[7]
//HTTPS://WWW.XHXBLOG.CN/?B3ID=H9OXZSYM
message.Url = ret[8]
//5665
message.BytesSent, _ = strconv.Atoi(ret[7])
l.wc <- message
}
}
写入 influxdb 客户端是用 golang 写的 influxdb gay 地址为:InfluxDB Client
func (w *WriteToInfluxDB) Write(wc chan *Message) {
sp := strings.Split(w.influxDBDsn, "@")
// Create a new HTTPClient
c, err := client.NewHTTPClient(client.HTTPConfig{
Addr: sp[0],
Username: sp[1],
Password: sp[2],
})
if err != nil {
log.Fatal(err)
}
defer c.Close()
for v := range wc {
// Create a new point batch
bp, err := client.NewBatchPoints(client.BatchPointsConfig{
Database: sp[3],
Precision: sp[4],
})
if err != nil {
log.Fatal(err)
}
// Create a point and add to batch
tags := map[string]string{"Path": v.Resource, "Method": v.Method, "Scheme": v.Scheme, "Status": v.Status, "Protocol": v.Protocol}
fields := map[string]interface{}{
"RequestTime": 2.0,
"BytesSent": v.BytesSent,
}
pt, err := client.NewPoint("nginx_log", tags, fields, v.TimeLocal)
if err != nil {
log.Fatal(err)
}
bp.AddPoint(pt)
// Write the batch
if err := c.Write(bp); err != nil {
log.Fatal(err)
}
// Close client resources
if err := c.Close(); err != nil {
log.Fatal(err)
}
log.Println("write success")
}
}
type SystemInfo struct {
HandleLine int `json:"handleLine"`
Tps float64 `json:"tps"`
ReadChanLen int `json:"readChanLen"`
WriteChanLen int `json:"writeChanLen"`
RunTime string `json:"runTime"`
ErrNum int `json:"errNum"`
}
type Monitor struct {
startTime time.Time
data SystemInfo
tpsSli []int
}
此处主要监控运行时间,channel 阻塞数量,处理条数监听一个端口在 8999 上
其次该监听将阻塞在 main 函数上
//专门接收内容的channel,错误数量和处理条数
var TypeMonitorChan = make(chan int, 200)
func (m *Monitor) start(lp *LogProcess) {
go func() {
for n := range TypeMonitorChan {
switch n {
case TypeErrNum:
m.data.ErrNum += 1
case TypeHandleLine:
m.data.HandleLine += 1
}
}
}()
ticker := time.NewTicker(time.Second * 5)
go func() {
<-ticker.C
m.tpsSli = append(m.tpsSli, m.data.HandleLine)
//目的是为了通过两次读取的行数除以单位时间就能得到大概的吞吐量
if len(m.tpsSli) > 2 {
m.tpsSli = m.tpsSli[1:]
}
}()
http.HandleFunc("/monitor", func(writer http.ResponseWriter, request *http.Request) {
m.data.RunTime = time.Now().Sub(m.startTime).String()
m.data.ReadChanLen = len(lp.rc)
m.data.WriteChanLen = len(lp.wc)
if len(m.tpsSli) >= 2 {
m.data.Tps = float64(m.tpsSli[1]-m.tpsSli[0]) / 5
}
ret, _ := json.MarshalIndent(m.data, "", "\t")
io.WriteString(writer, string(ret))
})
http.ListenAndServe(":8999", nil)
}
func main() {
//通过命令行模式输入参数简单化,不输则为默认值
var path, influxDsn string
flag.StringVar(&path, "path", "/var/log/nginx/access.log", "read file path")
flag.StringVar(&influxDsn, "influxDsn", "http://127.0.0.1:8086@haoxiong@8080@nginx_log@s", "influx data source")
flag.Parse()
r := &ReadFromFile{
path: path,
}
w := &WriteToInfluxDB{
influxDBDsn: influxDsn,
}
lp := &LogProcess{
rc: make(chan []byte, 200),
wc: make(chan *Message),
read: r,
write: w,
}
go lp.read.Read(lp.rc)
for i := 0; i < 2; i++ {
go lp.Process()
}
for i := 0; i < 4; i++ {
go lp.write.Write(lp.wc)
}
m := &Monitor{
startTime: time.Now(),
data: SystemInfo{},
}
//监听一个端口阻塞main
m.start(lp)
}
具体代码
欢迎来到这里!
我们正在构建一个小众社区,大家在这里相互信任,以平等 • 自由 • 奔放的价值观进行分享交流。最终,希望大家能够找到与自己志同道合的伙伴,共同成长。
注册 关于