nginx日志写入mongodb

1.1 说明

环境

centos + openresty

nginx.conf

http:
log_format main '$remote_addr $request_method "$region" $status '
'"$scheme://$host$request_uri" $body_bytes_sent "$request_body" '
'"$http_referer" $hitratio "$http_user_agent"';
access_log /root/test/src/access.log main;
lua_shared_dict ipdb 45m;
init_by_lua_file '/root/test/src/test/init.lua';

server:
location /:
set $hitratio '';
set $region '';
rewrite_by_lua_file '/root/test/src/main.lua';

location /pureip:
internal;
content_by_lua_file '/root/test/src/pureip.lua';

ip库格式

1.0.0.0 1.0.0.255 澳大利亚 1.0.1.0 1.0.3.255 福建省 1.0.4.0 1.0.7.255 澳大利亚 1.0.8.0 1.0.15.255 广东省 1.0.16.0 1.0.31.255 日本 1.0.32.0 1.0.63.255 广东省 1.0.64.0 1.0.127.255 日本 1.0.128.0 1.0.255.255 泰国 ......

1.2 加载ip到共享内存

-- init.lua
local cjson = require "cjson"
local sfind = string.find
local ssub = string.sub
local tinsert = table.insert
local ilines = io.lines
local tonumber = tonumber
local ipdb = ngx.shared.ipdb
function string_split(source_str, split_char)
    local find_index, start_index, num, split_array = 1, 1, 1, {}
    while true do
        local find_index, _ = sfind(source_str, split_char, start_index)
        if not find_index then
            split_array[#split_array + 1] = ssub(source_str, start_index, #source_str)
            break
        end
        local sub_str = ssub(source_str, 1, find_index - 1)
        split_array[#split_array + 1] = sub_str
        source_str = ssub(source_str, find_index + 1, #source_str)
        num = num + 1
    end
    return num, split_array
end
local ipTab = {}
for i = 0, 255 do
    ipTab[i] = {}
end
for item in ilines("/root/test/src/iplist") do
    local _, arr = string_split(item, " ")
    _, _, startIpa, startIpb, startIpc, startIpd = sfind(arr[1], "(%d+).(%d+).(%d+).(%d+)")
    startIpnum = startIpa*16777216 + startIpb*65536 + startIpc*256 + startIpd
    _, _, endIpa, endIpb, endIpc, endIpd = sfind(arr[2], "(%d+).(%d+).(%d+).(%d+)")
    endIpnum = endIpa*16777216 + endIpb*65536 + endIpc*256 + endIpd
    tinsert(ipTab[tonumber(startIpa)], {startIpnum, endIpnum, arr[3]})
end
for i = 0, 255 do
    local setInfo =  cjson.encode(ipTab[i])
    ipdb:set("group:" .. i, setInfo)
end

1.3 赋值nginx变量

-- main.lua
local cjson = require "cjson"
local redis = require "resty.redis"
redis.add_commands("expire", "exists", "del", "sadd", "srem", "smembers", "hmget")
local unescape = ngx.unescape_uri
local host = ngx.var.host
local key_url = unescape(host .. ngx.var.request_uri)
local red = redis:new()
local ok, err = red:connect("127.0.0.1", 6379)
if not ok then
    ngx.say("failed to connect: ", err)
    return
end
local res, err = red:exists(key_url)
if not res then
    ngx.say("failed to run exists: ", err)
end
ngx.var.hitratio = res

local res = ngx.location.capture('/pureip').body
ngx.var.region = string.sub(res, 1, #res-1)

1.4 二分查找地址

-- pureip.lua
local cjson = require "cjson"
local sfind = string.find
local tonumber = tonumber
local mfloor = math.floor
local ipdb = ngx.shared.ipdb
local ip = ngx.var.remote_addr
local _, _, ipa, ipb, ipc, ipd = sfind(ip, "(%d+).(%d+).(%d+).(%d+)")
local ipnum = ipa*16777216 + ipb*65536 + ipc*256 + ipd
local groupId = tonumber(ipa)
local ipGroup = cjson.decode(ipdb:get("group:"..groupId))
--[[function search(tb, target)
    low, high = 1, #tb
    while low <= high do
        mid = mfloor((low+high)/2)
        if mid < target then
            low = mid
        elseif mid > target then
            high = mid
        else
            return mid
        end
    end
end --]]
function searchII(tb, target)
    low, high = 1, #tb
    while low <= high do
        mid = mfloor((low+high)/2)
        midval1,midval2 = tb[mid][1], tb[mid][2]
        if midval2 < target then
            low = mid
        elseif midval1 > target then
            high = mid
        else
            return mid
        end
    end
end
local mid = searchII(ipGroup, ipnum)
ngx.print(ipGroup[mid][3])

1.5 python写本地日志到mongodb

#!/usr/bin/env python
# -*- coding: utf-8 -*-
import os
import fileinput
import re
import urllib
import subprocess
import time
def uaparse(useragent):
    keywordsmap = ['Firefox','Safari','Chrome','MSIE','Opera',]
    pattern = '|'.join(keywordsmap)
    cpat = re.compile(pattern)
    for m in cpat.finditer(useragent):
        return m.group()
def getdomain(host):
    str = host.split(".")
    keyword = ['com', 'net', 'org', 'gov', 'edu', 'mil', 'biz', 'name', 'info', 'mobi']
    pattern = '|'.join(keyword)
    cpat = re.compile(pattern)
    if len(str) >= 2:
        m = re.match(cpat, str[-2])
        if m:
            return '.'.join(str[-3:])
        else:
            return '.'.join(str[-2:])
def sourceparse(refer):
        keywordsour = ['/www\.baidu\.com\/s\?/','/www\.sogou\.com\/web\?/','/www\.soso\.com\/q\?/','/www\.google\.com\search\?/','/www\.so\.com\/s\?/']
        pattern = '|'.join(keywordsour)
        cpat = re.compile(pattern)
        for m in cpat.finditer(refer):
            return m.group()
ipP = r"?P<ip>[\d.]*";
methodP = r"?P<method>[a-zA-Z]*";
regionP = r"""?P<region>\"
            [^\"]*?
            \"
            """
statusP = r"?P<status>[\d]+";
urlP = r"""?P<url>\"
            [^\"]*
            \"
            """
bodyBytesSentP = r"?P<bodyByteSent>\d+"
requestP = r"""?P<request>\"
            [^\"]*?
            \"
            """
referP = r"""?P<refer>\"
            [^\"]*?
            \"
            """
hitratioP = r"?P<hitratio>\d{1}"
userAgentP = r"""?P<userAgent>\"
            [^\"]*
            \"
            """
nginxLogPattern = re.compile(r"(%s)\ (%s)\ (%s)\ (%s)\ (%s)\ (%s)\ (%s)\ (%s)\ (%s)\ (%s)" %(ipP, methodP, regionP, statusP, urlP, bodyBytesSentP, requestP, referP, hitratioP, userAgentP), re.VERBOSE)

filename = "/root/test/src/access.log"
file = open(filename, "r")
st_size = os.stat(filename)[6]
file.seek(st_size)
while 1:
    where = file.tell()
    line = file.readline()
    if not line:
        time.sleep(1)
        file.seek(where)
    else:
        matchs = nginxLogPattern.match(line)
        if matchs != None:
            allGroup = matchs.groups()
            ip = allGroup[0]
            method = allGroup[1]
            region = allGroup[2]
            status = int(allGroup[3])
            url = allGroup[4]
            proto, rest = urllib.splittype(url)
            host, rest = urllib.splithost(rest)
            domain = getdomain(host)
            bodyBytesSent = int(allGroup[5])
            request = allGroup[6]
            refer = allGroup[7]
            lycos = sourceparse(refer)
            hitratio = int(allGroup[8])
            userAgent = matchs.group("userAgent")
            browser = uaparse(userAgent)
            if browser == None:
                browser = "Other"
            mongo_cmd = '''db.access_log.insert({date: new Date,ip:"%s",method:"%s",region:%s,status:%d,url:%s,domain:"%s",body_bytes_sent:%d,refer:%s,hit:%d,browser:"%s",lycos:"%s"})''' %(ip,method,region,status,url,domain,bodyBytesSent,refer,hitratio,browser,lycos)
            shell_cmd = "mongo 1.1.1.1/ddb -uadmin -padmin --eval '%s' " % mongo_cmd
            ret = subprocess.Popen(shell_cmd, shell = True, stdout = subprocess.PIPE)
            print ret.stdout.read()
        else:
            raise Exception