watchdogで即検知!NGINXアクセスログをPythonでリアルタイム監視してInfluxDBに記録する

InfluxDB

自宅のRaspberry PiでNGINXを稼働させていると、アクセスログやエラーログが蓄積されていきます。でも、テキストファイルのままでは「いつ・誰が・どのページに」アクセスしたのかをリアルタイムで把握するのは大変です。
本記事では、Pythonの watchdog ライブラリを使ってNGINXのログファイルを常時監視し、新しいログが書き込まれた瞬間にInfluxDBへ自動保存する仕組みの実装方法を解説します。Grafanaと組み合わせることで、アクセス状況をリアルタイムに可視化することも可能になります。

なぜこの機能を作ったのか?

我が家のRaspberry PiにはNGINXを立て、外部公開のAPIエンドポイントやLINEミニアプリ向けのWebサーバーとして活用しています。
セキュリティ上の観点から、「外部からの不審なアクセスがないか」「エラーが多発していないか」をある程度リアルタイムで監視したいと考えていました。
テキストログをtailで見るだけでは視認性が悪く、蓄積・集計もしにくいため、InfluxDBに記録してGrafanaで可視化する方針にしました。

不審なアクセスをリアルタイムで検知して家族のセキュリティを守りたい!

実現したいこと

  • NGINXのアクセスログ・エラーログをリアルタイムに監視する
  • 新しいログが書き込まれた瞬間にInfluxDBへ自動保存する
  • ログの種類(アクセスログ / エラーログ)をタグで区別して保存する
  • 既存のログ行は無視して、起動後に追記された行だけを取得する
  • 常駐プロセスとして動作させ、自動でログを収集し続ける

この記事でわかること

  • Python の watchdog ライブラリを使ったファイル変更監視の方法
  • ログファイルの「差分だけ」を取得するテクニック(ファイルポジションの管理)
  • InfluxDB v2 へ NGINX ログを保存する方法
  • NGINXの詳細ログフォーマット(detailed形式)の設定方法

必要な準備と用意するもの

ハードウェア
  • Raspberry Pi(例:Raspberry Pi 5)
ソフトウェア/サービス
  • Python 3
    • watchdog
  • NGINX(稼働済み)
  • InfluxDB v2(稼働済み)

完成イメージ

以下のような流れでNGINXのログがInfluxDBへリアルタイムに蓄積されます。

① ユーザーがWebにアクセスする
② NGINXがアクセスログ(access.log)やエラーログ(error.log)にログを書き込む
③ watchdogがファイルの変更を即座に検知する
④ Pythonプログラムが新しく追記された行だけを読み取る
⑤ InfluxDB v2 の nginx_log measurementに保存される
⑥ Grafanaでリアルタイムにログを可視化・集計できる

システムの仕組み

このプログラムの核となる仕組みは2つです。

① watchdogによるファイル監視

watchdog はPythonのファイルシステム監視ライブラリです。OS のファイル変更通知(inotify など)を利用するため、ポーリング(一定間隔でファイルをチェックする方式)と比べて非常に効率的で、ログが書き込まれた瞬間に反応することができます。

ポーリングより高効率
inotify等のOS機能を使うため、定期的なチェックと違いCPUを消費せずにファイル変更を即座に検知できます。

② ファイルポジション管理による差分取得

プログラム起動時に各ログファイルの末尾位置(バイト数)を記録しておき、watchdogがファイル変更を検知した際にはその位置以降の行だけを読み取ります。これにより、起動前に存在していた大量のログを誤って取り込むことを防ぎます。

Pythonのファイルオブジェクトの seek()tell() を組み合わせて読み取り位置を管理しています。これはログローテーション等でファイルが切り替わる場合は対応できないため、その場合は別途処理が必要になります。

実運用する際は、週次などで発生するNGINXのログローテーション(logrotate)に合わせて、このPythonスクリプトも systemctl restart で定期再起動する設定をcron等に入れておくのがおすすめです。

実装のポイント

NGINXのログフォーマット設定

デフォルトのNGINXログよりも詳細な情報を取得するため、NGINXの設定ファイルにカスタムログフォーマット detailed を定義します。
このフォーマットにはリモートIPアドレス、リクエスト内容、ステータスコード、レスポンスサイズ、リファラー、ユーザーエージェントなどが含まれます。

ログフォーマットの定義(log_formatディレクティブ)は http ブロック内でのみ有効です。serverブロックやlocationブロックには書けないので注意してください。通常は /etc/nginx/nginx.confhttp {} ブロック内に記述するか、インクルードされているconfファイルに記述します。

複数のログファイルを1つの監視で賄う

watchdog の Observer はディレクトリ単位で監視します。アクセスログとエラーログが同じディレクトリにある場合、1つの observer.schedule() でまとめて監視できます。
どのファイルが変更されたかは event.src_path で判定します。

事前準備

watchdogのインストール

pip install watchdog

NGINXの詳細ログフォーマット設定

NGINXの設定ファイルに以下のログフォーマットを追加します。
カスタムフォーマット名は detailed とし、各サーバーブロックの access_log ディレクティブでこのフォーマットを指定します。

# =========================================
# 共通ログフォーマット定義(http ブロック内に記述)
# =========================================
log_format detailed '$remote_addr - $remote_user [$time_local] "$request" '
                    '$status $body_bytes_sent "$http_referer" '
                    '"$http_user_agent" "$http_x_forwarded_for"';

server {
    listen 443 ssl;
    server_name <yourdomain>;

    # ...(SSL設定など)...

    # detailed フォーマットでログを出力
    access_log  /srv/nginx/log/access.log detailed;
    error_log   /srv/nginx/log/error.log;

    # ...
}

設定変更後はNGINXをリロードします。

sudo nginx -t          # 設定ファイルの文法チェック
sudo systemctl reload nginx

InfluxDBの接続設定

InfluxDB v2 への接続設定は settings.json などの設定ファイルで管理します。
以下の情報を事前に準備してください。

  • InfluxDB のURL(例:http://localhost:8086)
  • Org名
  • Bucket名
  • APIトークン

実装方法

NGINXログ記録プログラム(nginx_log_record.py)

全体の構成は以下の通りです。

  • LogFileHandler:watchdog のイベントハンドラ。ファイルが変更されると on_modified が呼ばれる
  • read_new_lines():ファイルの読み取り位置を管理して新しい行だけを読む
  • メイン処理:起動時にファイル末尾位置を記録し、watchdog Observer を開始して常駐
# -*- coding: utf-8 -*-
import os
import time
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler
from influx.v2 import writer as influxwriter
import smarthomelog

# Influxライターとログ出力インスタンスを初期化
influx = influxwriter.Writer()
success_log = smarthomelog.Log(__file__, smarthomelog.LogType.SUCCESS)

# ===== ログファイルのパス設定 =====
LOG_DIR = '/srv/nginx/log'
ACCESS_LOG = os.path.join(LOG_DIR, 'access.log')
ERROR_LOG  = os.path.join(LOG_DIR, 'error.log')

# ===== 各ファイルの読み取り位置を管理する辞書 =====
file_positions = {}

def read_new_lines(file_path, log_type):
    """ファイルの末尾から新しい行だけ読む"""
    pos = file_positions.get(file_path, 0)
    with open(file_path, 'r') as f:
        f.seek(pos)
        new_lines = f.readlines()
        file_positions[file_path] = f.tell()  # 読み取り位置を更新

    for line in new_lines:
        if line.strip():
            influx.insert_nginx_log(log_type, line)

    success_log.write('処理が正常に終了しました。', __file__)

class LogFileHandler(FileSystemEventHandler):
    """ログファイル変更イベントハンドラ"""
    def on_modified(self, event):
        if event.is_directory:
            return
        if event.src_path == ACCESS_LOG:
            read_new_lines(ACCESS_LOG, 'ACCESS')
        elif event.src_path == ERROR_LOG:
            read_new_lines(ERROR_LOG, 'ERROR')

if __name__ == "__main__":
    # 起動時に既存のログ末尾位置を記録(既存行はスキップ)
    for log_file in [ACCESS_LOG, ERROR_LOG]:
        if os.path.exists(log_file):
            with open(log_file, 'r') as f:
                f.seek(0, os.SEEK_END)
                file_positions[log_file] = f.tell()

    # watchdog 監視を開始
    event_handler = LogFileHandler()
    observer = Observer()
    observer.schedule(event_handler, LOG_DIR, recursive=False)
    observer.start()

    try:
        while True:
            time.sleep(1)
    except KeyboardInterrupt:
        observer.stop()
    observer.join()

InfluxDB書き込みメソッド(insert_nginx_log)

InfluxDB v2 へのデータ保存は insert_nginx_log メソッドで行います。
ログの種別(アクセスログ / エラーログ)を log_type タグで区別して保存することで、Grafanaで種別ごとにフィルタリングできます。

def insert_nginx_log(self, log_type: str, message: str):
    measurement = 'nginx_log'
    tags = {
        'log_type': log_type   # 'ACCESS' または 'ERROR'
    }
    fields = {
        'message': message,
        'timestamp': datetime.datetime.now().strftime('%Y/%m/%d %H:%M:%S.%f')
    }
    try:
        self.conn.write_dict(measurement, tags, fields)
    except Exception as e:
        print(e)

コードの解説

起動時のファイル末尾位置の記録

for log_file in [ACCESS_LOG, ERROR_LOG]:
    if os.path.exists(log_file):
        with open(log_file, 'r') as f:
            f.seek(0, os.SEEK_END)
            file_positions[log_file] = f.tell()

f.seek(0, os.SEEK_END) でファイルの末尾に移動し、f.tell() で現在のバイト位置を取得・保存します。
これにより、プログラム起動前に書き込まれた既存のログは読み飛ばし、起動後に追記された行だけを処理できます。

差分読み取りの仕組み

def read_new_lines(file_path, log_type):
    pos = file_positions.get(file_path, 0)
    with open(file_path, 'r') as f:
        f.seek(pos)           # 前回読み終えた位置から再開
        new_lines = f.readlines()
        file_positions[file_path] = f.tell()  # 読み取り位置を更新

前回読み取り終了時のバイト位置から seek() で読み始め、readlines() で末尾まで読み取ります。
読み取り後は tell() で次回の開始位置を更新します。この繰り返しで常に「新しく追記された行だけ」を取得できます。

watchdogのイベントハンドラ

class LogFileHandler(FileSystemEventHandler):
    def on_modified(self, event):
        if event.is_directory:
            return
        if event.src_path == ACCESS_LOG:
            read_new_lines(ACCESS_LOG, 'ACCESS')
        elif event.src_path == ERROR_LOG:
            read_new_lines(ERROR_LOG, 'ERROR')

FileSystemEventHandler を継承して on_modified をオーバーライドします。
ディレクトリ変更イベントは無視し、対象ファイルごとに適切な log_type を渡して read_new_lines() を呼び出します。

on_modified が複数回呼ばれる場合について
OSやファイルシステムによっては、1回の書き込みで on_modified が複数回呼ばれることがあります。その場合でも file_positions による位置管理が機能するため、同じ行を重複して処理することはありません。

InfluxDB保存データ構造

InfluxDB の nginx_log measurementに以下の構造でデータが保存されます。

  • measurement:nginx_log
  • タグ(tags)
    • log_type:’ACCESS’ または ‘ERROR’
  • フィールド(fields)
    • message:NGINXログの1行(アクセス情報 or エラー情報)
    • timestamp:InfluxDB書き込み時刻(YYYY/MM/DD HH:MM:SS.ffffff形式)

systemdによる自動起動設定

プログラムをシステム起動時に自動実行するために、systemdのサービスユニットファイルを作成します。

[Unit]
Description=NGINX Log Record to InfluxDB
After=network.target influxd.service nginx.service

[Service]
ExecStart=/usr/bin/python3 /home/<username>/projects/smarthome/core/record/raspi/nginx_log_record.py
Restart=always
RestartSec=10
User=<username>

[Install]
WantedBy=multi-user.target
sudo cp nginx_log_record.service /etc/systemd/system/
sudo systemctl daemon-reload
sudo systemctl enable nginx_log_record
sudo systemctl start nginx_log_record

動作確認

プログラムを直接実行して確認

まずはコマンドラインから直接実行して、正しく動作するか確認します。

python3 nginx_log_record.py

プログラムが起動したら、ブラウザなどで自サーバーにアクセスしてNGINXのアクセスログが生成されることを確認します。
その後、InfluxDB の Data Explorer で nginx_log measurement にデータが保存されているか確認してください。

InfluxDB Data Explorerで確認

InfluxDB の管理画面(デフォルト:http://localhost:8086)にアクセスし、Data Explorer で以下のFluxクエリを実行します。

from(bucket: "your-bucket")
  |> range(start: -5m)
  |> filter(fn: (r) => r._measurement == "nginx_log")
  |> filter(fn: (r) => r._field == "message")

最近のNGINXアクセスログが表示されれば、InfluxDBへの書き込みが正常に動作しています。

systemdサービスの状態確認

sudo systemctl status nginx_log_record

Active: active (running) と表示されていれば正常稼働中です。

まとめ

本記事では、PythonのwatchdogライブラリとInfluxDB v2を使って、NGINXのアクセスログ・エラーログをリアルタイムに取得・蓄積する方法を解説しました。

  • watchdogのFileSystemEventHandlerを継承してon_modifiedをオーバーライドするだけでファイル変更を即座に検知できる
  • seek() / tell() でファイルポジションを管理することで、差分(新規追記行)だけを効率よく取得できる
  • InfluxDBのタグでログ種別を区別することで、Grafanaでの絞り込み・集計が容易になる
  • systemdサービスとして登録することで、常駐・自動起動・クラッシュ時の自動再起動が実現できる

InfluxDBに蓄積したNGINXログはGrafanaで可視化することで、アクセス数の推移や不審なリクエストのパターンを視覚的に把握しやすくなります。スマートホームのセキュリティ監視基盤として、ぜひ活用してみてください。

タイトルとURLをコピーしました