Profile picture

[Python] Nginx-Log parser 만들기

JaehyoJJAng2023년 03월 22일

✍️ Parser.py

  • nginx 웹 서버 로그에서 접속 기록을 추출하여 적절한 포맷으로 출력하는 스크립트를 작성
  • 적절한 포맷으로 출력된 데이터는 MySQL 데이터베이스에 적재

🔎 웹 서버 로그 확인하기

  • 파이썬을 이용하여 로그(access.log)로 부터 유의미한 데이터 추출하기
    • 각 필드를 구분 (split() 함수 응용)
  • MySQL 데이터베이스에 로그를 적재할 테이블 생성
  • 생성된 테이블에 정제된 데이터를 삽입하도록 함

🖇️ 요구사항

  • 웹 서버에 접속 이벤트를 발생시키고, 로깅이 되는지 테스트
  • parser.py 작성하기.
  • 데이터 웨어하우스에 로그를 적재할 공간 준비.
  • 수집기(collect.py)를 작성하여, 데이터 웨어하우스에 로그를 쌓기

📕 파서(parser.py) 작성

필수 라이브러리 설치하기 pip install django-environ pymysql


스크립트 - https://github.com/JaehyoJJAng/nginx-log-parser


💡 execute_tail 메소드

- ubprocess.Popen(['tail','-n','3','-F',self.log_file],stdout=subprocess.PIPE,stderr=subprocess.PIPE)

  • tail 명령어를 실행하고 ,stdoutstderr 를 파이프로 연결하여 tail 명령의 출력을 읽어들임

- line = stdout.readline().decode('utf-8')

  • tail 명령의 출력을 한 줄 씩 읽어들임
    • decode('utf-8') : 바이트 스트림을 문자열로 변환

🔊 파서 스크립트 실행결과

python3 nginx_parser.py

image

출력문

{
  "source_ip": "83.149.9.216",
  "method": "GET",
  "status_code": "200",
  "path": "/presentations/logstash-monitorama-2013/images/kibana-dashboard3.png",
  "timestamp": "2015-05-17 19:05:43"
}

정상적으로 JSON의 형태로 변환되어 표준 출력으로 나오는걸 볼 수 있음

📜 데이터 웨어하우스 준비

  • JSON 형태로 정제된 데이터를 parse.parseTBL 테이블에 적재

✕ 단위 테스트 작성하기

test/test_nginx_parser.py

from __init__ import Parser,MySQL
import unittest

class ParserTest(unittest.TestCase):
    @classmethod
    def setUpClass(cls)-> None:
        cls.parser : Parser = Parser()

    def test_set_data(self)-> None:
        test_line : str = """83.149.9.216 - - [17/May/2015:10:05:43 +0000] "GET /presentations/logstash-monitorama-2013/images/kibana-dashboard3.png HTTP/1.1" 200 171717 "http://semicomplete.com/presentations/logstash-monitorama-2013/" "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/32.0.1700.77 Safari/537.36"
""".strip()
        
        data = self.parser.set_data(line=test_line)        
        # for key,value in data.items():
        #     if key == 'source_ip':
        #         self.assertEqual('83.149.9.216')
        
class MySQLTest(unittest.TestCase):
    @classmethod
    def setUpClass(cls)-> None:
        cls.mysql : MySQL = MySQL()
    
    def test_connect_db(self)-> None:
        self.mysql.connect_db()
    
    def test_execute_query(self)-> None:
        data = {
            'source_ip': '83.149.9.216', 'method': 'GET', 'status_code': '200', 'path': '/presentations/logstash-monitorama-2013/images/kibana-dashboard3.png', 'timestamp': '2015-05-17 19:05:43'
            }
        self.mysql.execute_query(data=data)
    
    def test_create_table(self)-> None:
        self.mysql.create_table()
        
if __name__ == '__main__':
    unittest.main()

Loading script...