top × psutil × systemctl で徹底監視!サーバー上の全プロセス情報をInfluxDBに一括記録する

InfluxDB

「今サーバーで動いているNGINXやPythonスクリプトは、どれくらいCPUを使っているのか?」「メモリを圧迫しているプロセスはどれか?」こうした疑問に答えるには、プロセスの状態を継続的に記録・可視化する仕組みが必要です。
本記事では、topコマンドpsutilsystemctl の3つを組み合わせ、サーバー上で動作している指定プロセスのCPU使用率・メモリ・systemdサービス情報などを5秒ごとにInfluxDB v2へ記録する方法を解説します。

なぜこの機能を作ったのか?

我が家のサーバーでは、InfluxDB・Grafana・NGINX・各種Pythonスクリプトなど、多くのプロセスが常時稼働しています。
「特定のPythonスクリプトがいつの間にか高CPU状態になっていた」「メモリを大量消費しているプロセスがあるが原因が不明」といった問題を事後ではなくリアルタイムで検知するために、プロセスの動作状況をInfluxDBに記録してGrafanaで可視化する仕組みを作りました。

どのプロセスがサーバーに負荷をかけているかをGrafanaで見えるようにしたい!

実現したいこと

  • 監視対象コマンド(python / nginx / influxd / grafana など)に絞ってプロセスを取得する
  • 各プロセスのCPU使用率・メモリ・仮想メモリ・累積CPU時間を取得する
  • プロセスの親子関係を判定し、子プロセスのリソースを親に合算する
  • systemdサービス名・ロード状態・アクティブ状態を取得する
  • Pythonスクリプトの場合はスクリプト名・説明・グループ情報もタグとして記録する
  • 取得した全プロセス情報を5秒ごとにInfluxDB v2へ一括保存する

この記事でわかること

  • topコマンド(バッチモード) の出力をPythonでパースしてプロセス情報を取得する方法
  • psutil を使ったプロセスの親子関係の判定と子プロセスのリソース合算方法
  • systemctl を使ったPIDからsystemdサービス情報を一括取得する方法
  • マルチコアCPUの使用率を「全体で最大100%」に正規化する方法
  • 複数プロセスのデータをInfluxDB v2へ一括(バッチ)書き込みする方法

必要な準備と用意するもの

ハードウェア
  • Linux搭載サーバー(Raspberry Pi / NUCBOX など)
ソフトウェア/サービス
  • Python 3
    • psutil
  • topコマンド(procpsパッケージ、通常はデフォルトインストール済み)
  • systemd(サービス情報の取得に使用)
  • InfluxDB v2(稼働済み)

完成イメージ

プログラムを起動すると5秒ごとに以下の処理が実行されます。

top -b -n 1 で現在のプロセス一覧を取得してパース
systemctl show --type=service でsystemdサービス情報を一括取得してPID→サービス情報の辞書を作成
③ 監視対象コマンド(python / nginx など)に絞ってループ
④ psutil でプロセスの親子関係を確認し、子プロセスのリソースを親に合算
⑤ systemdサービス情報・プログラムパスなどのメタ情報を付加
⑥ 全プロセスのタグ・フィールドをリストにまとめてInfluxDB v2へバッチ一括書き込み

システムの仕組み

なぜtopコマンドとpsutilを併用するのか?

psutil だけでもCPU使用率やメモリは取得できますが、top コマンドとの使い分けには理由があります。

topコマンドが得意なこと
バッチモード(-b -n 1)で実行すると、全プロセスのCPU使用率・メモリ使用率・仮想メモリ・常駐メモリ・累積CPU時間をスナップショットとして一括取得できる。TIME+(累積CPU時間)はpsutilのcpu_times()とは別の形式で取得でき、プロセスが起動してからどれだけCPUを消費したかの追跡に使いやすい。

psutilが得意なこと
プロセスの親子関係(children()ppid())・コマンドラインの取得(cmdline())・センサー情報など、topでは取りにくい詳細情報を取得できる。

この2つを組み合わせることで、「topで全プロセスのリソースを一括取得 → psutilで親子関係とメタ情報を補完」という効率的な収集が実現できます。

親子プロセスのリソース合算

NGINXやPythonは親プロセスが複数の子プロセス(ワーカー)を生成する場合があります。
親プロセスの CPU% だけを見ると実際の負荷を過小評価してしまうため、このプログラムでは 子プロセスのCPU・メモリ使用量を親プロセスに合算して記録します。

psutil の children(recursive=True) で全子プロセスを取得し、それぞれのPIDで top の辞書を引いてリソースを加算します。子プロセス自身のレコードは is_child フラグで区別されるため、Grafanaで「親プロセスの合算値のみ」を絞り込むことができます。

systemdサービス情報の一括取得

systemctl show --type=service を1回実行するだけで全サービスの情報を取得し、PID→サービス情報の辞書(service_map)を作成します。
各プロセスごとに systemctl status を呼び出すと処理が重くなるため、この「一括取得→辞書引き」の方式で効率化しています。

実装のポイント

topのTIME+をtimedeltaに変換するパーサー

topの TIME+ 列は複数のフォーマットが混在するため、専用のパーサー関数を実装しています。

  • 5:44.59 → 分:秒.ミリ秒(最も一般的な形式)
  • 1:05:44 → 時:分:秒(長時間稼働プロセス)
  • 6.17 → 小数秒
  • 123 → 整数秒

topのメモリ表記をバイトに変換するパーサー

topの VIRT/RES/SHR 列は 1.2g512m123k のように単位付きで表示されます。
単位なしの場合はKB扱いとなるtopの仕様に合わせて、すべてバイト単位の整数値に統一して保存します。

マルチコアCPU使用率の正規化

topのCPU使用率は「1コアあたり100%」の基準で表示されます(4コアなら最大400%)。
Grafanaで直感的に把握しやすいよう、cpu_percent = cpu_percent_raw / コア数 で全体を100%基準に正規化して保存します。

正規化前の生の値(cpu_percent_raw)も併せて保存しているため、コア単位での比較が必要な場合にも対応できます。

Pythonスクリプトの識別とcommonライブラリ

コマンドが python の場合、cmdline() でフルコマンドラインを取得し、実行しているスクリプトのパス・名前を特定します。
さらに common.get_program_metadata() を使って各Pythonスクリプトに定義された __description____group__ 変数を読み取り、タグとして付与することで、Grafanaでスクリプト単位の集計・フィルタリングができます。

common はプロジェクト固有の自作ライブラリです
common は標準ライブラリでも pip パッケージでもなく、このスマートホームプロジェクト内の自作ユーティリティライブラリです。そのまま流用することはできません。
get_program_metadata() の処理内容は「指定したPythonファイルを開いて __description____group__ の値を正規表現で抽出して返す」というものです。この機能が不要な場合は、この関数の呼び出しと関連するタグ(program_description / program_group)を削除しても動作します。

監視対象のPythonスクリプトに以下のように変数を定義しておくと、Grafanaでグループや説明で絞り込めるようになります。

# -*- coding: utf-8 -*-
__group__ = '情報記録'           # Grafanaでのグループ名
__description__ = 'センサーデータを記録'  # Grafanaでの説明

# ... 以降に通常の処理 ...

事前準備

psutilのインストール

pip install psutil

topコマンドの動作確認

top -b -n 1 | head -20

PID・USER・CPU%・MEM%・TIME+・COMMANDなどの列が表示されれば準備OKです。

監視対象コマンドの設定

プログラム冒頭の RECORD_TARGET_COMMANDS リストに、監視したいコマンド名を列挙します。
サーバーで稼働しているサービスに合わせてカスタマイズしてください。

RECORD_TARGET_COMMANDS = ['python', 'influxd', 'telegraf', 'grafana', 'nginx', 'dnsmasq']

実装方法

プログラム全体構成

  • parse_cputime():topのTIME+文字列をtimedeltaに変換(複数フォーマット対応)
  • parse_mem_value():topのVIRT/RES/SHR表記をバイト単位の整数に変換
  • parse_top_output()top -b -n 1 の出力をパースしてプロセスリストを作成
  • get_service_map():systemctlで全サービス情報を一括取得し、PID→サービス情報の辞書を作成
  • get_service_info_from_pid():service_mapにないPIDをフォールバックで個別取得
  • build_process_info():上記を組み合わせてタグ・フィールドのリストを構築
  • メインループ:5秒ごとに build_process_info() を呼び出してInfluxDBへバッチ書き込み

topのTIME+パーサーとメモリ単位変換

import datetime

def parse_cputime(s: str) -> datetime.timedelta:
    """topのTIME+をtimedeltaに変換(複数フォーマット対応)"""
    if s is None:
        raise ValueError("Empty time string")
    s = s.strip().replace(",", ".")
    if not s:
        raise ValueError("Empty time string")

    # H:MM:SS(2つのコロン)
    if s.count(":") == 2:
        h, m, sec = s.split(":")
        return datetime.timedelta(hours=int(h), minutes=int(m), seconds=int(float(sec)))

    # M:SS(.ms)(1つのコロン)
    if s.count(":") == 1:
        minutes, sec_part = s.split(":", 1)
        if "." in sec_part:
            seconds_str, frac = sec_part.split(".", 1)
            ms = int(round(float("0." + frac) * 1000))
            return datetime.timedelta(minutes=int(minutes), seconds=int(seconds_str), milliseconds=ms)
        else:
            return datetime.timedelta(minutes=int(minutes), seconds=int(sec_part))

    # 小数秒(例: "6.17")
    if "." in s:
        total_seconds = float(s)
        seconds = int(total_seconds)
        ms = int(round((total_seconds - seconds) * 1000))
        return datetime.timedelta(seconds=seconds, milliseconds=ms)

    # 整数秒(例: "123")
    if s.isdigit():
        return datetime.timedelta(seconds=int(s))

    raise ValueError(f"Unknown time format: {s}")


def parse_mem_value(value: str) -> int:
    """topのVIRT/RES/SHR表記をバイトに変換(単位なしはKB扱い)"""
    value = value.strip().lower()
    if value[-1].isdigit():
        return int(float(value) * 1024)    # 単位なし → KB → バイト
    num = float(value[:-1])
    unit = value[-1]
    if unit == 'g':
        return int(num * 1024 ** 3)
    elif unit == 'm':
        return int(num * 1024 ** 2)
    elif unit == 'k':
        return int(num * 1024)
    else:
        raise ValueError(f"Unknown memory unit: {value}")

topコマンド出力のパーサー

import subprocess
import re

def parse_top_output():
    """top の1回分の出力をパースしてプロセスリストを返す"""
    top_output = subprocess.check_output(["top", "-b", "-n", "1"]).decode()
    lines = top_output.splitlines()

    processes = []
    found = False
    for line in lines:
        if re.match(r"\s*PID\s+USER", line):
            found = True
            continue
        if not found:
            continue

        cols = re.split(r"\s+", line.strip(), maxsplit=11)
        if len(cols) < 12:
            continue

        pid = int(cols[0])
        processes.append({
            "pid": pid,
            "user": cols[1],
            "pr": cols[2],
            "ni": cols[3],
            "virt": parse_mem_value(cols[4]),
            "res": parse_mem_value(cols[5]),
            "shr": parse_mem_value(cols[6]),
            "state": cols[7],
            "cpu": float(cols[8]),
            "mem": float(cols[9]),
            "time": parse_cputime(cols[10]),
            "command": cols[11] if len(cols) > 11 else None
        })
    return processes

topのPIDヘッダー行でスキャン開始を判定
re.match(r"\s*PID\s+USER", line) でプロセス一覧の開始行(ヘッダー行)を検出し、その次の行からプロセスデータのパースを始めます。ヘッダー前のシステム概要行(CPU使用率・メモリ全体など)はスキップされます。

systemdサービス情報の一括取得

def get_service_map():
    """systemdのサービス情報を一括取得してPID→サービス情報の辞書を返す"""
    cmd = [
        "systemctl", "show", "--type=service",
        "-p", "Id", "-p", "MainPID", "-p", "LoadState",
        "-p", "ActiveState", "-p", "SubState", "-p", "Description"
    ]
    result = subprocess.run(cmd, capture_output=True, text=True)
    lines = result.stdout.strip().splitlines()

    pid_info_map = {}
    current_info = {}
    pid = None

    for line in lines:
        if not line.strip() or "=" not in line:
            continue
        key, value = line.split("=", 1)
        current_info[key] = value

        if key == 'MainPID':
            pid = int(value)
        elif key == "SubState":
            if "Id" in current_info and pid is not None:
                info_copy = current_info.copy()
                info_copy["ServiceName"] = info_copy.pop("Id")
                pid_info_map[pid] = info_copy
            current_info = {}
            pid = None

    return pid_info_map

プロセス情報の構築(メイン処理)

import os
import psutil
import common
from typing import Optional

RECORD_TARGET_COMMANDS = ['python', 'influxd', 'telegraf', 'grafana', 'nginx', 'dnsmasq']

def build_process_info():
    top_list = parse_top_output()
    service_map = get_service_map()
    top_dict = {p["pid"]: p for p in top_list}

    tags_list = []
    fields_list = []

    for pid, info in top_dict.items():
        command = info["command"]
        if command not in RECORD_TARGET_COMMANDS:
            continue

        try:
            p = psutil.Process(pid)
        except:
            continue

        # 子プロセスを取得
        try:
            children = p.children(recursive=True)
        except (psutil.NoSuchProcess, psutil.AccessDenied):
            children = []

        # 親子判定
        try:
            ppid = p.ppid()
        except:
            ppid = None
        is_parent = (len(children) > 0)
        is_child  = (ppid != 1 and len(children) == 0)

        # 子プロセスのリソースを親に合算
        total_cpu  = info["cpu"]
        total_mem  = info["mem"]
        total_virt = info["virt"]
        total_res  = info["res"]
        total_shr  = info["shr"]
        child_pids = [c.pid for c in children]

        for child in children:
            try:
                cpid = child.pid
                if cpid in top_dict:
                    total_virt += top_dict[cpid]["virt"]
                    total_res  += top_dict[cpid]["res"]
                    total_shr  += top_dict[cpid]["shr"]
                    total_cpu  += top_dict[cpid]["cpu"]
                    total_mem  += top_dict[cpid]["mem"]
            except psutil.NoSuchProcess:
                continue

        # CPU正規化(コア数で割って全体100%基準に)
        cpu_cores = psutil.cpu_count()
        normalized_cpu = total_cpu / cpu_cores

        # コマンドライン・プログラムパスの取得(Pythonスクリプト判定)
        try:
            cmdline = p.cmdline()
            application_path = cmdline[0] if cmdline else None
            program_path = None
            if len(cmdline) > 2 and cmdline[1] == "-E":
                program_path = cmdline[2]
            elif len(cmdline) > 1:
                program_path = cmdline[1]
            program_name = os.path.basename(program_path) if program_path else None
            # プログラムのメタ情報取得(自作ライブラリのget_program_metadata関数)
            program_description, program_group = common.get_program_metadata(program_path)
        except:
            application_path = program_path = program_name = None
            program_description = program_group = None

        # Python以外はコマンド名をプログラム名として使用
        if 'python' not in command:
            program_name = command

        # systemdサービス情報の取得(service_mapで高速引き、なければ個別取得)
        if pid in service_map:
            svc = service_map[pid]
            service_name        = svc['ServiceName']
            service_loadstate   = svc['LoadState']
            service_activestate = svc['ActiveState']
            service_substate    = svc['SubState']
            service_description = svc['Description']
        else:
            service_info = get_service_info_from_pid(pid)
            if service_info:
                service_name        = service_info['ServiceName']
                service_loadstate   = service_info['LoadState']
                service_activestate = service_info['ActiveState']
                service_substate    = service_info['SubState']
                service_description = service_info['Description']
            else:
                service_name = service_loadstate = service_activestate = None
                service_substate = service_description = None

        if not program_description:
            program_description = service_name
            program_group       = service_name

        tags_list.append({
            "user":                info["user"],
            "command":             command,
            "program_name":        program_name,
            "program_description": program_description,
            "program_group":       program_group,
            "service_name":        service_name
        })

        fields_list.append({
            "pid":                  pid,
            "priority":             info["pr"],
            "nice_value":           info["ni"],
            "state":                info["state"],
            "children_pids":        ",".join(map(str, child_pids)),
            "parent_pid":           ppid,
            "is_parent":            is_parent,
            "is_child":             is_child,
            "application_path":     application_path,
            "program_path":         program_path,
            "service_loadstate":    service_loadstate,
            "service_activestate":  service_activestate,
            "service_substate":     service_substate,
            "service_description":  service_description,
            "virtual_memory":       total_virt,
            "resident_memory":      total_res,
            "shared_memory":        total_shr,
            "cpu_percent_raw":      total_cpu,
            "cpu_percent":          normalized_cpu,
            "memory_percent":       total_mem,
            "cpu_time_total":       info["time"].total_seconds()
        })

    return tags_list, fields_list

メインループとInfluxDBへのバッチ書き込み

import time
from influx.v2 import writer as influxwriter
import smarthomelog

influx = influxwriter.Writer()
success_log = smarthomelog.Log(__file__, smarthomelog.LogType.SUCCESS)
cpu_cores = psutil.cpu_count()

while True:
    tags, fields = build_process_info()
    influx.insert_process_status_log(tags, fields)
    success_log.write('処理が正常に終了しました。', __file__)
    time.sleep(5)

InfluxDB書き込みメソッド(insert_process_status_log)

複数プロセスのデータを1回のAPIコールで一括書き込みするため、write_dict_batch() を使用しています。プロセス数が多い場合でもInfluxDBへの接続オーバーヘッドを最小限に抑えられます。

def insert_process_status_log(self, tags: List[Dict], fields: List[Dict]):
    measurement = 'process_status_log'
    try:
        # タグリスト・フィールドリストを一括でInfluxDBに書き込む
        self.conn.write_dict_batch(measurement, tags, fields)
    except Exception as e:
        print(e)

コードの解説

topのヘッダー行でパース開始を判定する仕組み

if re.match(r"\s*PID\s+USER", line):
    found = True
    continue

topの出力は上部に全体負荷のサマリ行が続き、その後にプロセス一覧が始まります。PID USER で始まるヘッダー行を正規表現で検出し、そこからプロセスデータのパースを開始します。ロケール設定によって列の間隔が変わっても対応できるよう \s+ で柔軟にマッチさせています。

topの出力を最大12列でsplitする理由

cols = re.split(r"\s+", line.strip(), maxsplit=11)

COMMAND列(12列目)はコマンド名だけでなくスペースを含むフルパスになる場合があります。maxsplit=11 で最大11回分割(12列目以降は結合したまま)にすることで、コマンド名が途中で切れる問題を防いでいます。

serviceMapの「SubState」行で辞書登録をトリガーする仕組み

elif key == "SubState":
    if "Id" in current_info and pid is not None:
        info_copy = current_info.copy()
        info_copy["ServiceName"] = info_copy.pop("Id")
        pid_info_map[pid] = info_copy
    current_info = {}
    pid = None

systemctl show --type=service の出力は全サービスの情報が連続して出力されます。各サービスのブロックの最後に現れる SubState= 行をトリガーにして、その時点で蓄積した current_info を辞書に登録しリセットします。

取得データ一覧(InfluxDB保存内容)

  • measurement:process_status_log
  • タグ(tags)
    • user:プロセスを実行しているユーザー名
    • command:コマンド名(python / nginx / influxd など)
    • program_name:実行スクリプト名(Pythonの場合)
    • program_description:スクリプトの説明(__description__変数)
    • program_group:スクリプトのグループ(__group__変数)
    • service_name:systemdサービス名
  • フィールド(fields)
    • pid、priority、nice_value、state(プロセス状態)
    • children_pids(子プロセスID一覧)、parent_pid、is_parent、is_child
    • application_path(実行アプリパス)、program_path(スクリプトパス)
    • service_loadstate、service_activestate、service_substate、service_description
    • virtual_memory(仮想メモリ)、resident_memory(常駐メモリ)、shared_memory(共有メモリ)
    • cpu_percent_raw(生のCPU使用率)、cpu_percent(正規化後)、memory_percent
    • cpu_time_total(累積CPU時間・秒)

systemdによる自動起動設定

[Unit]
Description=Process Status Record to InfluxDB
After=network.target influxd.service

[Service]
ExecStart=/usr/bin/python3 /home/<username>/projects/smarthome/core/record/raspi/process_status_record.py
Restart=always
RestartSec=10
User=<username>

[Install]
WantedBy=multi-user.target
sudo cp process_status_record.service /etc/systemd/system/
sudo systemctl daemon-reload
sudo systemctl enable process_status_record
sudo systemctl start process_status_record

動作確認

直接実行で確認

python3 process_status_record.py

InfluxDB Data Explorerで確認

from(bucket: "your-bucket")
  |> range(start: -1m)
  |> filter(fn: (r) => r._measurement == "process_status_log")
  |> filter(fn: (r) => r._field == "cpu_percent")

各プロセスの cpu_percent が5秒ごとに記録されていれば正常動作しています。
監視対象コマンドごとにデータが存在するか、タグ(program_name・service_nameなど)が正しく付与されているかも確認してください。

Grafanaでの活用例

  • service_name タグでフィルタリング → 特定サービスのCPU/メモリ推移グラフ
  • program_group タグでグループ化 → グループ別のリソース使用量の比較
  • is_parent = true に絞り込み → 子プロセス合算済みの正確なリソース使用量のみ表示
  • cpu_time_total の増分 → プロセスごとのCPU消費速度の比較

まとめ

本記事では、topコマンド・psutil・systemctlの3つを組み合わせて、サーバー上の指定プロセスのリソース情報・サービス情報を5秒ごとにInfluxDB v2へ一括記録する方法を解説しました。

  • top × psutil の組み合わせで「スナップショット的な全プロセス情報取得」と「親子関係・コマンドライン詳細」の両方を効率的に取得できる
  • systemctl –type=service の一括取得 でサービス情報のAPIコストを最小化し、PID→サービス情報の辞書引きで高速処理を実現
  • 子プロセスのリソース合算 により、マルチプロセス型サービス(nginx worker など)の本当の負荷を正確に把握できる
  • write_dict_batch によるバッチ書き込み で複数プロセスのデータを1回のAPIコールで効率的にInfluxDBへ保存できる

Grafanaと組み合わせることで、各サービス・各スクリプトのリソース使用傾向を時系列で可視化でき、異常なプロセスの早期検知やサーバーのチューニングに役立てることができます。

タイトルとURLをコピーしました