nginx日志写入mongodb
1.1 说明
环境
centos + openresty
nginx.conf
http:
log_format main '$remote_addr $request_method "$region" $status '
'"$scheme://$host$request_uri" $body_bytes_sent "$request_body" '
'"$http_referer" $hitratio "$http_user_agent"';
access_log /root/test/src/access.log main;
lua_shared_dict ipdb 45m;
init_by_lua_file '/root/test/src/test/init.lua';
server:
location /:
set $hitratio '';
set $region '';
rewrite_by_lua_file '/root/test/src/main.lua';
location /pureip:
internal;
content_by_lua_file '/root/test/src/pureip.lua';
ip库格式
1.0.0.0 1.0.0.255 澳大利亚 1.0.1.0 1.0.3.255 福建省 1.0.4.0 1.0.7.255 澳大利亚 1.0.8.0 1.0.15.255 广东省 1.0.16.0 1.0.31.255 日本 1.0.32.0 1.0.63.255 广东省 1.0.64.0 1.0.127.255 日本 1.0.128.0 1.0.255.255 泰国 ......
1.2 加载ip到共享内存
-- init.lua
local cjson = require "cjson"
local sfind = string.find
local ssub = string.sub
local tinsert = table.insert
local ilines = io.lines
local tonumber = tonumber
local ipdb = ngx.shared.ipdb
function string_split(source_str, split_char)
local find_index, start_index, num, split_array = 1, 1, 1, {}
while true do
local find_index, _ = sfind(source_str, split_char, start_index)
if not find_index then
split_array[#split_array + 1] = ssub(source_str, start_index, #source_str)
break
end
local sub_str = ssub(source_str, 1, find_index - 1)
split_array[#split_array + 1] = sub_str
source_str = ssub(source_str, find_index + 1, #source_str)
num = num + 1
end
return num, split_array
end
local ipTab = {}
for i = 0, 255 do
ipTab[i] = {}
end
for item in ilines("/root/test/src/iplist") do
local _, arr = string_split(item, " ")
_, _, startIpa, startIpb, startIpc, startIpd = sfind(arr[1], "(%d+).(%d+).(%d+).(%d+)")
startIpnum = startIpa*16777216 + startIpb*65536 + startIpc*256 + startIpd
_, _, endIpa, endIpb, endIpc, endIpd = sfind(arr[2], "(%d+).(%d+).(%d+).(%d+)")
endIpnum = endIpa*16777216 + endIpb*65536 + endIpc*256 + endIpd
tinsert(ipTab[tonumber(startIpa)], {startIpnum, endIpnum, arr[3]})
end
for i = 0, 255 do
local setInfo = cjson.encode(ipTab[i])
ipdb:set("group:" .. i, setInfo)
end
1.3 赋值nginx变量
-- main.lua
local cjson = require "cjson"
local redis = require "resty.redis"
redis.add_commands("expire", "exists", "del", "sadd", "srem", "smembers", "hmget")
local unescape = ngx.unescape_uri
local host = ngx.var.host
local key_url = unescape(host .. ngx.var.request_uri)
local red = redis:new()
local ok, err = red:connect("127.0.0.1", 6379)
if not ok then
ngx.say("failed to connect: ", err)
return
end
local res, err = red:exists(key_url)
if not res then
ngx.say("failed to run exists: ", err)
end
ngx.var.hitratio = res
local res = ngx.location.capture('/pureip').body
ngx.var.region = string.sub(res, 1, #res-1)
1.4 二分查找地址
-- pureip.lua
local cjson = require "cjson"
local sfind = string.find
local tonumber = tonumber
local mfloor = math.floor
local ipdb = ngx.shared.ipdb
local ip = ngx.var.remote_addr
local _, _, ipa, ipb, ipc, ipd = sfind(ip, "(%d+).(%d+).(%d+).(%d+)")
local ipnum = ipa*16777216 + ipb*65536 + ipc*256 + ipd
local groupId = tonumber(ipa)
local ipGroup = cjson.decode(ipdb:get("group:"..groupId))
--[[function search(tb, target)
low, high = 1, #tb
while low <= high do
mid = mfloor((low+high)/2)
if mid < target then
low = mid
elseif mid > target then
high = mid
else
return mid
end
end
end --]]
function searchII(tb, target)
low, high = 1, #tb
while low <= high do
mid = mfloor((low+high)/2)
midval1,midval2 = tb[mid][1], tb[mid][2]
if midval2 < target then
low = mid
elseif midval1 > target then
high = mid
else
return mid
end
end
end
local mid = searchII(ipGroup, ipnum)
ngx.print(ipGroup[mid][3])
1.5 python写本地日志到mongodb
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import os
import fileinput
import re
import urllib
import subprocess
import time
def uaparse(useragent):
keywordsmap = ['Firefox','Safari','Chrome','MSIE','Opera',]
pattern = '|'.join(keywordsmap)
cpat = re.compile(pattern)
for m in cpat.finditer(useragent):
return m.group()
def getdomain(host):
str = host.split(".")
keyword = ['com', 'net', 'org', 'gov', 'edu', 'mil', 'biz', 'name', 'info', 'mobi']
pattern = '|'.join(keyword)
cpat = re.compile(pattern)
if len(str) >= 2:
m = re.match(cpat, str[-2])
if m:
return '.'.join(str[-3:])
else:
return '.'.join(str[-2:])
def sourceparse(refer):
keywordsour = ['/www\.baidu\.com\/s\?/','/www\.sogou\.com\/web\?/','/www\.soso\.com\/q\?/','/www\.google\.com\search\?/','/www\.so\.com\/s\?/']
pattern = '|'.join(keywordsour)
cpat = re.compile(pattern)
for m in cpat.finditer(refer):
return m.group()
ipP = r"?P<ip>[\d.]*";
methodP = r"?P<method>[a-zA-Z]*";
regionP = r"""?P<region>\"
[^\"]*?
\"
"""
statusP = r"?P<status>[\d]+";
urlP = r"""?P<url>\"
[^\"]*
\"
"""
bodyBytesSentP = r"?P<bodyByteSent>\d+"
requestP = r"""?P<request>\"
[^\"]*?
\"
"""
referP = r"""?P<refer>\"
[^\"]*?
\"
"""
hitratioP = r"?P<hitratio>\d{1}"
userAgentP = r"""?P<userAgent>\"
[^\"]*
\"
"""
nginxLogPattern = re.compile(r"(%s)\ (%s)\ (%s)\ (%s)\ (%s)\ (%s)\ (%s)\ (%s)\ (%s)\ (%s)" %(ipP, methodP, regionP, statusP, urlP, bodyBytesSentP, requestP, referP, hitratioP, userAgentP), re.VERBOSE)
filename = "/root/test/src/access.log"
file = open(filename, "r")
st_size = os.stat(filename)[6]
file.seek(st_size)
while 1:
where = file.tell()
line = file.readline()
if not line:
time.sleep(1)
file.seek(where)
else:
matchs = nginxLogPattern.match(line)
if matchs != None:
allGroup = matchs.groups()
ip = allGroup[0]
method = allGroup[1]
region = allGroup[2]
status = int(allGroup[3])
url = allGroup[4]
proto, rest = urllib.splittype(url)
host, rest = urllib.splithost(rest)
domain = getdomain(host)
bodyBytesSent = int(allGroup[5])
request = allGroup[6]
refer = allGroup[7]
lycos = sourceparse(refer)
hitratio = int(allGroup[8])
userAgent = matchs.group("userAgent")
browser = uaparse(userAgent)
if browser == None:
browser = "Other"
mongo_cmd = '''db.access_log.insert({date: new Date,ip:"%s",method:"%s",region:%s,status:%d,url:%s,domain:"%s",body_bytes_sent:%d,refer:%s,hit:%d,browser:"%s",lycos:"%s"})''' %(ip,method,region,status,url,domain,bodyBytesSent,refer,hitratio,browser,lycos)
shell_cmd = "mongo 1.1.1.1/ddb -uadmin -padmin --eval '%s' " % mongo_cmd
ret = subprocess.Popen(shell_cmd, shell = True, stdout = subprocess.PIPE)
print ret.stdout.read()
else:
raise Exception