一、平台介绍
1、平台功能和特色
实现日志能实时收集线上系统的统一类型日志,通过map reduce的方式进行日志的统计分析,将汇总出来的结果存入数据库。
利用django的框架实现一套后台统计系统,提取出对应的数据进行日志(如:接口平均响应时间等)进行绘图分析。
2、技术手段
syslog-ng : 一个日志收集系统,相比rsyslog性能更优,支持tcp、udp协议日志收集,并且能将汇总日志定时切割。
python中Mrjob模块:是一个编写MapReduce任务的开源python 模块。可以轻松的编写一个MapReduce任务。
Django框架:一套开源python应用程序框架,基于此框架我们搭建一个日志分析平台,图表化的展现核心接口平均响应时间。
jquery:一个优秀的Javascript库
3、整体架构图示
二、环境搭建
1、syslog-ng搭建
1
2
3
4
5
|
#寻找对应版本的yum包安装吧,注意:如果系统有装rsyslog服务,安装时会有冲突,所以需要卸载掉(rpm -e rsyslog) rpm -ivh http: // www /ericlee/eventlog-0 .2.5-1.x86_64.rpm http: //10 .103.11.101 /www/ericlee/eventlog-devel-0 .2.5-1.x86_64.rpm rpm -ivh http: // www /ericlee/syslog-ng-2 .0.5-1.x86_64.rpm chkconfig syslog-ng on |
2、syslog-ng日志收集平台配置
分为了客户端配置、和服务端配置。客户端通过linux系统pipe传输方式将日志实时重写,追加写入到本地的文件中,
并且通过tcp的协议实时将日志传送到syslog-ng服务端。
1
2
3
4
5
6
7
|
# client syslog-ng configuration file. #log.pipe需为客户度日志目录,并且管道类型文件 source s_nginx_access { pipe ( "/opt/logs/nginx/access/log.pipe" );}; #将本地日志实时通过udp协议向syslong-ng服务端传输内容 destination d_log_access { udp( "your syslog server ipaddress" port(1234) ); }; #将日志备份保留一份到本地 destination d_local_access { file ( "/opt/logs/nginx/access/log" template(t_filetemplate));}; |
syslog-ng服务端:
统一监听本地端口,对客户端传送过来日志集中收集。
1
2
3
4
5
6
7
8
|
# server syslog-ng configuration file. #监听本地UDP协议端口1234 source s_nginx_access { udp(ip(0.0.0.0) port(1234)); }; #按照规定的时间目录层级进行日志的存储 destination d_nginx_access { file ( "/opt/data/syslog/$YEAR$MONTH$DAY/access_$HOUR" ); }; log { source (s_nginx_access); destination(d_nginx_access); }; |
三、利用 python的mrjob模块对日志进行分析
1、python需要安装mrjob模块,可以按照如下方式:
1
2
3
|
#download cd mrjob-0.4.3 /opt/python2 .7 /bin/python . /setup .py install |
2、编写脚本
对上一个小时段的日志进行分析,如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
|
#!/opt/python2.7/bin/python __author__ = 'jeson' from mrjob.job import MRJob import re import logging import sys #全局变量,获取到对应的url接口参数 geturl = "" #定义单独mapreduce类,用于汇总计算出分钟段的总请求时间 class MRCounter(MRJob): def mapper( self ,key,line): i = 0 for flow in line.split(): res = r "^\"\/" + geturl if i = = 7 and re.match(res,flow): j = 0 #logging.error(line) for flow2 in line.split(): #logging.error(flow2) if j = = 5 and re.match(r "^\"\d{4}\-" ,flow2): #logging.error(flow2) timerow = flow2.split( ":" ) hm = timerow[ 0 ].lstrip( '"')+":"+timerow[1] if j==12 and re.match(r"\d{1,}\.\d{1,}",flow2): yield hm,float(flow2) j += 1 i += 1 def reducer(self,key,occurrences): yield key,sum(occurrences) #单独mapreduce类,用于汇总计算出分钟段的总请求数目 class MRCounter_number(MRJob): def mapper(self,key,line): i=0 for flow in line.split(): res=r"^\"\/"+geturl if i==7 and re.match(res,flow): j = 0 for flow2 in line.split(): if j==5 and re.match(r"^\"\d{4}",flow2): timerow=flow2.split(":") #logging.error("%s &&&&&&&&&&&&&&&&&&" % len(timerow)) #logging.error(flow2) #print "*********",len(timerow) #hm=timerow[1]+":"+timerow[2] hm=timerow[0].lstrip('"' ) + ":" + timerow[ 1 ] yield hm, 1 j + = 1 i + = 1 def reducer( self ,key,occurrences): yield key, sum (occurrences) #传入参数,Getrequesttime 分析出每一分钟段的对应接口的总请求时间。Getrequestnumber 分析出每一分钟段的对应接口的总请求数量 if __name__ = = '__main__' : if sys.argv[ 2 ] = = "Getrequesttime" : MRCounter.run() elif sys.argv[ 2 ] = = "Getrequestnumber" : MRCounter_number.run() else : print "Getrequesttime or Getrequestnumber" if sys.argv[ 3 ] = = "null" : print "Input url!" else : geturl = sys.argv[ 3 ] |
最终此脚本的执行方式:/opt/python2.7/bin/python ${allpath}/httpflow.py /opt/data/syslog/${tdate}/access_${ttime} Getrequesttime ${url} -o /opt/data/Getrequesttime
执行时脚本将从指定的文件段中获取内容,并生成到/opt/data/Getrequesttime /opt/data/Getrequestnumber 命名为part-00000文件。
接下来,我们需要对两个文件内容进行分析,并且入库:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
|
#!/usr/bin/env python #coding=utf-8 import MySQLdb import sys url = sys.argv[ 1 ] ip = 'mysql address' hostname = "hostname" #对serverlog_url_list url的信息列表插入数据url信息,获取一次原来时段的每个url对应的初始值 db = MySQLdb.connect( "mysql ip address" , "serverlog" , "serverlog" , "gameh5" ) cursor = db.cursor() cursor.execute( 'SELECT id from serverlog_url_list where url_value like "%s"' % url) data_id = cursor.fetchone() if data_id is None : cursor.execute( 'insert into serverlog_url_list (url_value,total_count) values (%s,0)' ,url) cursor.execute( 'SELECT id from serverlog_url_list where url_value like "%s"' % url) data_id = cursor.fetchone() cursor.execute( 'select total_count from serverlog_url_list where id="%s"' ,data_id) url_init_count = cursor.fetchall() tmpcount = url_init_count[ 0 ][ 0 ] urlid = int (data_id[ 0 ]) #对分析出的文件,进行比对,合并后输出统一的结果列表(包括 url、hostname、请求时间、平均响应时间、总时间等等) fp01 = open ( "/opt/data/Getrequestnumber/part-00000" , "r" ) a = [] for line01 in fp01: a.append(line01) fp02 = open ( "/opt/data/Getrequesttime/part-00000" , "r" ) fc02 = sorted (fp02, key = lambda x:x.split()[ 1 ]) for line02 in fc02: i = 0 while line02.split()[ 0 ]! = a[i].split()[ 0 ]: i + = 1 #print "%s\t%s"%(line02.split()[0],a[i].split()[0]) requestime = line02.split()[ 0 ].replace( "T" , " " ).strip( '"' ) #累加求和,加入一个时段的请求数,并且操作数据库更新url对应的请求个数字段total_count tmpcount = tmpcount + int (a[i].split()[ 1 ]) print "%s\t%s\t%s\t%s\t%s\t%s" % (ip,hostname,urlid,requestime,line02.split()[ 1 ],a[i].split()[ 1 ]) fp01.close() fp02.close() cursor.execute( 'update serverlog_url_list set total_count="%s" where url_value like "%s"' % (tmpcount,url)) db.close()
|