이번 포스트에서는 Python을 사용하여 시스템 로그를 모니터링하고, 웹 대시보드로 표시하며, Telegram으로 알림을 보내는 통합 시스템을 구현하는 방법을 알아보겠습니다.
1. 프로젝트 개요
이 시스템은 다음과 같은 주요 기능들을 제공합니다:
- 시스템 로그 실시간 모니터링
- SQLite 데이터베이스에 로그 저장
- Flask 기반 웹 대시보드
- Telegram을 통한 알림 전송
- 주기적인 로그 통계 보고
2. 필요한 라이브러리 및 초기 설정
import time
import os
import requests
import sqlite3
import threading
from flask import Flask, render_template_string, request, jsonify
TELEGRAM_TOKEN = “YOUR_TOKEN”CHAT_ID = “YOUR_CHAT_ID”
TELEGRAM_LEVELS = [‘경고’, ‘마이너’, ‘메이저’, ‘치명적’]
LOG_FILE_PATH = “/var/log/syslog”
주요 라이브러리들을 임포트하고 필요한 상수들을 정의합니다. Telegram 봇 토큰과 채팅 ID는 실제 값으로 교체해야 합니다.
3. 데이터베이스 설정
def create_db_connection():
conn = sqlite3.connect('logs.db', check_same_thread=False)
conn.row_factory = sqlite3.Row
return conn
conn = create_db_connection()cursor = conn.cursor()
cursor.execute(”’
CREATE TABLE IF NOT EXISTS logs (
id INTEGER PRIMARY KEY AUTOINCREMENT,
timestamp TEXT,
hostname TEXT,
program TEXT,
level TEXT,
message TEXT
)
”’)
conn.commit()
SQLite 데이터베이스를 설정하고 로그를 저장할 테이블을 생성합니다. check_same_thread=False
는 다중 스레드 환경에서 데이터베이스 접근을 가능하게 합니다.
4. Telegram 알림 기능
def send_to_telegram(message):
url = f"https://api.telegram.org/bot{TELEGRAM_TOKEN}/sendMessage"
data = {
'chat_id': CHAT_ID,
'text': message
}
try:
response = requests.post(url, data=data)
if not response.ok:
print(f"텔레그램 메시지 전송 실패: {response.text}")
except Exception as e:
print(f"텔레그램 메시지 전송 중 오류 발생: {e}")
Telegram API를 사용하여 메시지를 전송하는 함수입니다. 예외 처리를 통해 전송 실패 시 적절한 에러 메시지를 출력합니다.
5. 로그 파싱 및 처리
def parse_log_line(line):
try:
parts = line.split(' ')
if len(parts) < 5:
return None
timestamp = ‘ ‘.join(parts[:3])hostname = parts[3]
program_part = parts[4]
program = program_part.split(‘[‘)[0].strip(‘:’)
message = ‘ ‘.join(parts[5:])
level = “정보”
for l in TELEGRAM_LEVELS:
if f”[{l}]” in message:
level = l
message = message.replace(f”[{l}]”, “”).strip()
break
return {
‘timestamp’: timestamp,
‘hostname’: hostname,
‘program’: program,
‘level’: level,
‘message’: message
}
except Exception as e:
print(f”로그 파싱 중 예외 발생: {e}“)
return None
로그 라인을 파싱하여 구조화된 데이터로 변환하는 함수입니다. 시간, 호스트명, 프로그램명, 로그 레벨, 메시지를 추출합니다.
6. 실시간 로그 모니터링
def follow(file_path):
with open(file_path, 'r', encoding='utf-8') as f:
f.seek(0, os.SEEK_END)
while True:
line = f.readline()
if line:
yield line.strip()
else:
time.sleep(0.1)
def log_monitor():thread_conn = create_db_connection()
for line in follow(LOG_FILE_PATH):
parsed = parse_log_line(line)
if parsed:
save_to_db(
thread_conn,
parsed[‘timestamp’],
parsed[‘hostname’],
parsed[‘program’],
parsed[‘level’],
parsed[‘message’]
)
follow
함수는 파일의 새로운 라인을 지속적으로 모니터링하고, log_monitor
함수는 이를 파싱하여 데이터베이스에 저장합니다.
7. 웹 대시보드 구현
@app.route('/')
def index():
return render_template_string(TEMPLATE)
@app.route(‘/api/logs’)def get_logs():
cursor = conn.cursor()
cursor.execute(“SELECT timestamp, hostname, program, level, message FROM logs ORDER BY timestamp DESC LIMIT 20”)
logs = cursor.fetchall()
return jsonify([dict(log) for log in logs])
Flask를 사용하여 웹 대시보드를 구현했습니다. 메인 페이지와 로그를 제공하는 API 엔드포인트를 제공합니다.
8. 실행 코드
if __name__ == '__main__':
threading.Thread(target=log_monitor, daemon=True).start()
threading.Thread(target=send_log_count, daemon=True).start()
app.run(host='0.0.0.0', port=5000)
두 개의 백그라운드 스레드(로그 모니터링, 통계 전송)를 시작하고 Flask 애플리케이션을 실행합니다.
import time
import os
import requests
import sqlite3
import threading
from flask import Flask, render_template_string, request, jsonify
# 텔레그램 토큰 및 채팅 ID 하드코딩
TELEGRAM_TOKEN = “YOUR_TELEGRAM_TOKEN”
CHAT_ID = “YOUR_CHAT_ID”
# 텔레그램으로 전송할 로그 레벨
TELEGRAM_LEVELS = [‘경고’, ‘마이너’, ‘메이저’, ‘치명적’]
# 로그 파일 경로 설정
LOG_FILE_PATH = “/var/log/syslog”
# Flask 애플리케이션 설정
app = Flask(__name__)
# 데이터베이스 설정
def create_db_connection():
conn = sqlite3.connect(‘logs.db’, check_same_thread=False)
conn.row_factory = sqlite3.Row # 딕셔너리 형식으로 결과 반환
return conn
# 데이터베이스 초기화
conn = create_db_connection()
cursor = conn.cursor()
cursor.execute(”’
CREATE TABLE IF NOT EXISTS logs (
id INTEGER PRIMARY KEY AUTOINCREMENT,
timestamp TEXT,
hostname TEXT,
program TEXT,
level TEXT,
message TEXT
)
”’)
conn.commit()
# 텔레그램 메시지 전송 함수
def send_to_telegram(message):
url = f”https://api.telegram.org/bot{TELEGRAM_TOKEN}/sendMessage”
data = {
‘chat_id’: CHAT_ID,
‘text’: message
}
try:
response = requests.post(url, data=data)
if not response.ok:
print(f”텔레그램 메시지 전송 실패: {response.text}”)
except Exception as e:
print(f”텔레그램 메시지 전송 중 오류 발생: {e}”)
# 데이터베이스에 로그 저장
def save_to_db(conn, timestamp, hostname, program, level, message):
try:
cursor = conn.cursor()
cursor.execute(”’
INSERT INTO logs (timestamp, hostname, program, level, message)
VALUES (?, ?, ?, ?, ?)
”’, (timestamp, hostname, program, level, message))
conn.commit()
except Exception as e:
print(f”데이터베이스 저장 중 오류 발생: {e}”)
# 로그 라인 파싱
def parse_log_line(line):
try:
parts = line.split(‘ ‘)
if len(parts) < 5:
return None
timestamp = ‘ ‘.join(parts[:3])
hostname = parts[3]
program_part = parts[4]
program = program_part.split(‘[‘)[0].strip(‘:’)
message = ‘ ‘.join(parts[5:])
level = “정보”
for l in TELEGRAM_LEVELS:
if f”[{l}]” in message:
level = l
message = message.replace(f”[{l}]”, “”).strip()
break
return {
‘timestamp’: timestamp,
‘hostname’: hostname,
‘program’: program,
‘level’: level,
‘message’: message
}
except Exception as e:
print(f”로그 파싱 중 예외 발생: {e}”)
return None
# 로그 파일 실시간 읽기
def follow(file_path):
with open(file_path, ‘r’, encoding=’utf-8′) as f:
f.seek(0, os.SEEK_END)
while True:
line = f.readline()
if line:
yield line.strip()
else:
time.sleep(0.1)
# 로그 모니터링 스레드
def log_monitor():
thread_conn = create_db_connection()
for line in follow(LOG_FILE_PATH):
parsed = parse_log_line(line)
if parsed:
save_to_db(
thread_conn,
parsed[‘timestamp’],
parsed[‘hostname’],
parsed[‘program’],
parsed[‘level’],
parsed[‘message’]
)
# 3분마다 로그 개수 텔레그램으로 전송
def send_log_count():
thread_conn = create_db_connection()
while True:
cursor = thread_conn.cursor()
cursor.execute(“SELECT COUNT(*) as count FROM logs”)
log_count = cursor.fetchone()[‘count’]
send_to_telegram(f”지난 3분 동안 데이터베이스에 쌓인 로그 개수: {log_count}”)
time.sleep(180)
# Flask 웹 라우트
@app.route(‘/’)
def index():
return render_template_string(TEMPLATE)
@app.route(‘/api/logs’)
def get_logs():
cursor = conn.cursor()
cursor.execute(“SELECT timestamp, hostname, program, level, message FROM logs ORDER BY timestamp DESC LIMIT 20”)
logs = cursor.fetchall()
return jsonify([dict(log) for log in logs])
# HTML 템플릿
TEMPLATE = ”’
<!DOCTYPE html>
<html lang=”ko”>
<head>
<meta charset=”UTF-8″>
<title>로그 대시보드</title>
<style>
table { width: 100%; border-collapse: collapse; }
th, td { border: 1px solid #ddd; padding: 8px; }
th { background-color: #f2f2f2; text-align: left; }
</style>
<script>
async function fetchLogs() {
const response = await fetch(‘/api/logs’);
const logs = await response.json();
const tableBody = document.getElementById(‘logs-body’);
tableBody.innerHTML = ”;
logs.forEach(log => {
const row = `<tr>
<td>${log.timestamp}</td>
<td>${log.hostname}</td>
<td>${log.program}</td>
<td>${log.level}</td>
<td>${log.message}</td>
</tr>`;
tableBody.innerHTML += row;
});
}
setInterval(fetchLogs, 5000); // 5초마다 갱신
window.onload = fetchLogs;
</script>
</head>
<body>
<h1>로그 대시보드</h1>
<table>
<thead>
<tr>
<th>시간</th>
<th>호스트명</th>
<th>프로그램</th>
<th>레벨</th>
<th>메시지</th>
</tr>
</thead>
<tbody id=”logs-body”></tbody>
</table>
</body>
</html>
”’
# Flask 앱 실행
if __name__ == ‘__main__’:
threading.Thread(target=log_monitor, daemon=True).start()
threading.Thread(target=send_log_count, daemon=True).start()
app.run(host=’0.0.0.0′, port=5000)

