「今サーバーで動いているNGINXやPythonスクリプトは、どれくらいCPUを使っているのか?」「メモリを圧迫しているプロセスはどれか?」こうした疑問に答えるには、プロセスの状態を継続的に記録・可視化する仕組みが必要です。
本記事では、topコマンド・psutil・systemctl の3つを組み合わせ、サーバー上で動作している指定プロセスのCPU使用率・メモリ・systemdサービス情報などを5秒ごとにInfluxDB v2へ記録する方法を解説します。
なぜこの機能を作ったのか?
我が家のサーバーでは、InfluxDB・Grafana・NGINX・各種Pythonスクリプトなど、多くのプロセスが常時稼働しています。
「特定のPythonスクリプトがいつの間にか高CPU状態になっていた」「メモリを大量消費しているプロセスがあるが原因が不明」といった問題を事後ではなくリアルタイムで検知するために、プロセスの動作状況をInfluxDBに記録してGrafanaで可視化する仕組みを作りました。

どのプロセスがサーバーに負荷をかけているかをGrafanaで見えるようにしたい!
実現したいこと
- 監視対象コマンド(python / nginx / influxd / grafana など)に絞ってプロセスを取得する
- 各プロセスのCPU使用率・メモリ・仮想メモリ・累積CPU時間を取得する
- プロセスの親子関係を判定し、子プロセスのリソースを親に合算する
- systemdサービス名・ロード状態・アクティブ状態を取得する
- Pythonスクリプトの場合はスクリプト名・説明・グループ情報もタグとして記録する
- 取得した全プロセス情報を5秒ごとにInfluxDB v2へ一括保存する
この記事でわかること
- topコマンド(バッチモード) の出力をPythonでパースしてプロセス情報を取得する方法
- psutil を使ったプロセスの親子関係の判定と子プロセスのリソース合算方法
- systemctl を使ったPIDからsystemdサービス情報を一括取得する方法
- マルチコアCPUの使用率を「全体で最大100%」に正規化する方法
- 複数プロセスのデータをInfluxDB v2へ一括(バッチ)書き込みする方法
必要な準備と用意するもの
- Linux搭載サーバー(Raspberry Pi / NUCBOX など)
- Python 3
- psutil
- topコマンド(procpsパッケージ、通常はデフォルトインストール済み)
- systemd(サービス情報の取得に使用)
- InfluxDB v2(稼働済み)
完成イメージ
プログラムを起動すると5秒ごとに以下の処理が実行されます。
① top -b -n 1 で現在のプロセス一覧を取得してパース
② systemctl show --type=service でsystemdサービス情報を一括取得してPID→サービス情報の辞書を作成
③ 監視対象コマンド(python / nginx など)に絞ってループ
④ psutil でプロセスの親子関係を確認し、子プロセスのリソースを親に合算
⑤ systemdサービス情報・プログラムパスなどのメタ情報を付加
⑥ 全プロセスのタグ・フィールドをリストにまとめてInfluxDB v2へバッチ一括書き込み
システムの仕組み
なぜtopコマンドとpsutilを併用するのか?
psutil だけでもCPU使用率やメモリは取得できますが、top コマンドとの使い分けには理由があります。
topコマンドが得意なこと
バッチモード(-b -n 1)で実行すると、全プロセスのCPU使用率・メモリ使用率・仮想メモリ・常駐メモリ・累積CPU時間をスナップショットとして一括取得できる。TIME+(累積CPU時間)はpsutilのcpu_times()とは別の形式で取得でき、プロセスが起動してからどれだけCPUを消費したかの追跡に使いやすい。
psutilが得意なこと
プロセスの親子関係(children()・ppid())・コマンドラインの取得(cmdline())・センサー情報など、topでは取りにくい詳細情報を取得できる。
この2つを組み合わせることで、「topで全プロセスのリソースを一括取得 → psutilで親子関係とメタ情報を補完」という効率的な収集が実現できます。
親子プロセスのリソース合算
NGINXやPythonは親プロセスが複数の子プロセス(ワーカー)を生成する場合があります。
親プロセスの CPU% だけを見ると実際の負荷を過小評価してしまうため、このプログラムでは 子プロセスのCPU・メモリ使用量を親プロセスに合算して記録します。
psutil の children(recursive=True) で全子プロセスを取得し、それぞれのPIDで top の辞書を引いてリソースを加算します。子プロセス自身のレコードは is_child フラグで区別されるため、Grafanaで「親プロセスの合算値のみ」を絞り込むことができます。
systemdサービス情報の一括取得
systemctl show --type=service を1回実行するだけで全サービスの情報を取得し、PID→サービス情報の辞書(service_map)を作成します。
各プロセスごとに systemctl status を呼び出すと処理が重くなるため、この「一括取得→辞書引き」の方式で効率化しています。
実装のポイント
topのTIME+をtimedeltaに変換するパーサー
topの TIME+ 列は複数のフォーマットが混在するため、専用のパーサー関数を実装しています。
5:44.59→ 分:秒.ミリ秒(最も一般的な形式)1:05:44→ 時:分:秒(長時間稼働プロセス)6.17→ 小数秒123→ 整数秒
topのメモリ表記をバイトに変換するパーサー
topの VIRT/RES/SHR 列は 1.2g・512m・123k のように単位付きで表示されます。
単位なしの場合はKB扱いとなるtopの仕様に合わせて、すべてバイト単位の整数値に統一して保存します。
マルチコアCPU使用率の正規化
topのCPU使用率は「1コアあたり100%」の基準で表示されます(4コアなら最大400%)。
Grafanaで直感的に把握しやすいよう、cpu_percent = cpu_percent_raw / コア数 で全体を100%基準に正規化して保存します。
正規化前の生の値(cpu_percent_raw)も併せて保存しているため、コア単位での比較が必要な場合にも対応できます。
Pythonスクリプトの識別とcommonライブラリ
コマンドが python の場合、cmdline() でフルコマンドラインを取得し、実行しているスクリプトのパス・名前を特定します。
さらに common.get_program_metadata() を使って各Pythonスクリプトに定義された __description__・__group__ 変数を読み取り、タグとして付与することで、Grafanaでスクリプト単位の集計・フィルタリングができます。
common はプロジェクト固有の自作ライブラリです
common は標準ライブラリでも pip パッケージでもなく、このスマートホームプロジェクト内の自作ユーティリティライブラリです。そのまま流用することはできません。
get_program_metadata() の処理内容は「指定したPythonファイルを開いて __description__ と __group__ の値を正規表現で抽出して返す」というものです。この機能が不要な場合は、この関数の呼び出しと関連するタグ(program_description / program_group)を削除しても動作します。
監視対象のPythonスクリプトに以下のように変数を定義しておくと、Grafanaでグループや説明で絞り込めるようになります。
# -*- coding: utf-8 -*-
__group__ = '情報記録' # Grafanaでのグループ名
__description__ = 'センサーデータを記録' # Grafanaでの説明
# ... 以降に通常の処理 ...事前準備
psutilのインストール
pip install psutiltopコマンドの動作確認
top -b -n 1 | head -20PID・USER・CPU%・MEM%・TIME+・COMMANDなどの列が表示されれば準備OKです。
監視対象コマンドの設定
プログラム冒頭の RECORD_TARGET_COMMANDS リストに、監視したいコマンド名を列挙します。
サーバーで稼働しているサービスに合わせてカスタマイズしてください。
RECORD_TARGET_COMMANDS = ['python', 'influxd', 'telegraf', 'grafana', 'nginx', 'dnsmasq']実装方法
プログラム全体構成
- parse_cputime():topのTIME+文字列をtimedeltaに変換(複数フォーマット対応)
- parse_mem_value():topのVIRT/RES/SHR表記をバイト単位の整数に変換
- parse_top_output():
top -b -n 1の出力をパースしてプロセスリストを作成 - get_service_map():systemctlで全サービス情報を一括取得し、PID→サービス情報の辞書を作成
- get_service_info_from_pid():service_mapにないPIDをフォールバックで個別取得
- build_process_info():上記を組み合わせてタグ・フィールドのリストを構築
- メインループ:5秒ごとに build_process_info() を呼び出してInfluxDBへバッチ書き込み
topのTIME+パーサーとメモリ単位変換
import datetime
def parse_cputime(s: str) -> datetime.timedelta:
"""topのTIME+をtimedeltaに変換(複数フォーマット対応)"""
if s is None:
raise ValueError("Empty time string")
s = s.strip().replace(",", ".")
if not s:
raise ValueError("Empty time string")
# H:MM:SS(2つのコロン)
if s.count(":") == 2:
h, m, sec = s.split(":")
return datetime.timedelta(hours=int(h), minutes=int(m), seconds=int(float(sec)))
# M:SS(.ms)(1つのコロン)
if s.count(":") == 1:
minutes, sec_part = s.split(":", 1)
if "." in sec_part:
seconds_str, frac = sec_part.split(".", 1)
ms = int(round(float("0." + frac) * 1000))
return datetime.timedelta(minutes=int(minutes), seconds=int(seconds_str), milliseconds=ms)
else:
return datetime.timedelta(minutes=int(minutes), seconds=int(sec_part))
# 小数秒(例: "6.17")
if "." in s:
total_seconds = float(s)
seconds = int(total_seconds)
ms = int(round((total_seconds - seconds) * 1000))
return datetime.timedelta(seconds=seconds, milliseconds=ms)
# 整数秒(例: "123")
if s.isdigit():
return datetime.timedelta(seconds=int(s))
raise ValueError(f"Unknown time format: {s}")
def parse_mem_value(value: str) -> int:
"""topのVIRT/RES/SHR表記をバイトに変換(単位なしはKB扱い)"""
value = value.strip().lower()
if value[-1].isdigit():
return int(float(value) * 1024) # 単位なし → KB → バイト
num = float(value[:-1])
unit = value[-1]
if unit == 'g':
return int(num * 1024 ** 3)
elif unit == 'm':
return int(num * 1024 ** 2)
elif unit == 'k':
return int(num * 1024)
else:
raise ValueError(f"Unknown memory unit: {value}")topコマンド出力のパーサー
import subprocess
import re
def parse_top_output():
"""top の1回分の出力をパースしてプロセスリストを返す"""
top_output = subprocess.check_output(["top", "-b", "-n", "1"]).decode()
lines = top_output.splitlines()
processes = []
found = False
for line in lines:
if re.match(r"\s*PID\s+USER", line):
found = True
continue
if not found:
continue
cols = re.split(r"\s+", line.strip(), maxsplit=11)
if len(cols) < 12:
continue
pid = int(cols[0])
processes.append({
"pid": pid,
"user": cols[1],
"pr": cols[2],
"ni": cols[3],
"virt": parse_mem_value(cols[4]),
"res": parse_mem_value(cols[5]),
"shr": parse_mem_value(cols[6]),
"state": cols[7],
"cpu": float(cols[8]),
"mem": float(cols[9]),
"time": parse_cputime(cols[10]),
"command": cols[11] if len(cols) > 11 else None
})
return processestopのPIDヘッダー行でスキャン開始を判定
re.match(r"\s*PID\s+USER", line) でプロセス一覧の開始行(ヘッダー行)を検出し、その次の行からプロセスデータのパースを始めます。ヘッダー前のシステム概要行(CPU使用率・メモリ全体など)はスキップされます。
systemdサービス情報の一括取得
def get_service_map():
"""systemdのサービス情報を一括取得してPID→サービス情報の辞書を返す"""
cmd = [
"systemctl", "show", "--type=service",
"-p", "Id", "-p", "MainPID", "-p", "LoadState",
"-p", "ActiveState", "-p", "SubState", "-p", "Description"
]
result = subprocess.run(cmd, capture_output=True, text=True)
lines = result.stdout.strip().splitlines()
pid_info_map = {}
current_info = {}
pid = None
for line in lines:
if not line.strip() or "=" not in line:
continue
key, value = line.split("=", 1)
current_info[key] = value
if key == 'MainPID':
pid = int(value)
elif key == "SubState":
if "Id" in current_info and pid is not None:
info_copy = current_info.copy()
info_copy["ServiceName"] = info_copy.pop("Id")
pid_info_map[pid] = info_copy
current_info = {}
pid = None
return pid_info_mapプロセス情報の構築(メイン処理)
import os
import psutil
import common
from typing import Optional
RECORD_TARGET_COMMANDS = ['python', 'influxd', 'telegraf', 'grafana', 'nginx', 'dnsmasq']
def build_process_info():
top_list = parse_top_output()
service_map = get_service_map()
top_dict = {p["pid"]: p for p in top_list}
tags_list = []
fields_list = []
for pid, info in top_dict.items():
command = info["command"]
if command not in RECORD_TARGET_COMMANDS:
continue
try:
p = psutil.Process(pid)
except:
continue
# 子プロセスを取得
try:
children = p.children(recursive=True)
except (psutil.NoSuchProcess, psutil.AccessDenied):
children = []
# 親子判定
try:
ppid = p.ppid()
except:
ppid = None
is_parent = (len(children) > 0)
is_child = (ppid != 1 and len(children) == 0)
# 子プロセスのリソースを親に合算
total_cpu = info["cpu"]
total_mem = info["mem"]
total_virt = info["virt"]
total_res = info["res"]
total_shr = info["shr"]
child_pids = [c.pid for c in children]
for child in children:
try:
cpid = child.pid
if cpid in top_dict:
total_virt += top_dict[cpid]["virt"]
total_res += top_dict[cpid]["res"]
total_shr += top_dict[cpid]["shr"]
total_cpu += top_dict[cpid]["cpu"]
total_mem += top_dict[cpid]["mem"]
except psutil.NoSuchProcess:
continue
# CPU正規化(コア数で割って全体100%基準に)
cpu_cores = psutil.cpu_count()
normalized_cpu = total_cpu / cpu_cores
# コマンドライン・プログラムパスの取得(Pythonスクリプト判定)
try:
cmdline = p.cmdline()
application_path = cmdline[0] if cmdline else None
program_path = None
if len(cmdline) > 2 and cmdline[1] == "-E":
program_path = cmdline[2]
elif len(cmdline) > 1:
program_path = cmdline[1]
program_name = os.path.basename(program_path) if program_path else None
# プログラムのメタ情報取得(自作ライブラリのget_program_metadata関数)
program_description, program_group = common.get_program_metadata(program_path)
except:
application_path = program_path = program_name = None
program_description = program_group = None
# Python以外はコマンド名をプログラム名として使用
if 'python' not in command:
program_name = command
# systemdサービス情報の取得(service_mapで高速引き、なければ個別取得)
if pid in service_map:
svc = service_map[pid]
service_name = svc['ServiceName']
service_loadstate = svc['LoadState']
service_activestate = svc['ActiveState']
service_substate = svc['SubState']
service_description = svc['Description']
else:
service_info = get_service_info_from_pid(pid)
if service_info:
service_name = service_info['ServiceName']
service_loadstate = service_info['LoadState']
service_activestate = service_info['ActiveState']
service_substate = service_info['SubState']
service_description = service_info['Description']
else:
service_name = service_loadstate = service_activestate = None
service_substate = service_description = None
if not program_description:
program_description = service_name
program_group = service_name
tags_list.append({
"user": info["user"],
"command": command,
"program_name": program_name,
"program_description": program_description,
"program_group": program_group,
"service_name": service_name
})
fields_list.append({
"pid": pid,
"priority": info["pr"],
"nice_value": info["ni"],
"state": info["state"],
"children_pids": ",".join(map(str, child_pids)),
"parent_pid": ppid,
"is_parent": is_parent,
"is_child": is_child,
"application_path": application_path,
"program_path": program_path,
"service_loadstate": service_loadstate,
"service_activestate": service_activestate,
"service_substate": service_substate,
"service_description": service_description,
"virtual_memory": total_virt,
"resident_memory": total_res,
"shared_memory": total_shr,
"cpu_percent_raw": total_cpu,
"cpu_percent": normalized_cpu,
"memory_percent": total_mem,
"cpu_time_total": info["time"].total_seconds()
})
return tags_list, fields_listメインループとInfluxDBへのバッチ書き込み
import time
from influx.v2 import writer as influxwriter
import smarthomelog
influx = influxwriter.Writer()
success_log = smarthomelog.Log(__file__, smarthomelog.LogType.SUCCESS)
cpu_cores = psutil.cpu_count()
while True:
tags, fields = build_process_info()
influx.insert_process_status_log(tags, fields)
success_log.write('処理が正常に終了しました。', __file__)
time.sleep(5)InfluxDB書き込みメソッド(insert_process_status_log)
複数プロセスのデータを1回のAPIコールで一括書き込みするため、write_dict_batch() を使用しています。プロセス数が多い場合でもInfluxDBへの接続オーバーヘッドを最小限に抑えられます。
def insert_process_status_log(self, tags: List[Dict], fields: List[Dict]):
measurement = 'process_status_log'
try:
# タグリスト・フィールドリストを一括でInfluxDBに書き込む
self.conn.write_dict_batch(measurement, tags, fields)
except Exception as e:
print(e)コードの解説
topのヘッダー行でパース開始を判定する仕組み
if re.match(r"\s*PID\s+USER", line):
found = True
continuetopの出力は上部に全体負荷のサマリ行が続き、その後にプロセス一覧が始まります。PID USER で始まるヘッダー行を正規表現で検出し、そこからプロセスデータのパースを開始します。ロケール設定によって列の間隔が変わっても対応できるよう \s+ で柔軟にマッチさせています。
topの出力を最大12列でsplitする理由
cols = re.split(r"\s+", line.strip(), maxsplit=11)COMMAND列(12列目)はコマンド名だけでなくスペースを含むフルパスになる場合があります。maxsplit=11 で最大11回分割(12列目以降は結合したまま)にすることで、コマンド名が途中で切れる問題を防いでいます。
serviceMapの「SubState」行で辞書登録をトリガーする仕組み
elif key == "SubState":
if "Id" in current_info and pid is not None:
info_copy = current_info.copy()
info_copy["ServiceName"] = info_copy.pop("Id")
pid_info_map[pid] = info_copy
current_info = {}
pid = Nonesystemctl show --type=service の出力は全サービスの情報が連続して出力されます。各サービスのブロックの最後に現れる SubState= 行をトリガーにして、その時点で蓄積した current_info を辞書に登録しリセットします。
取得データ一覧(InfluxDB保存内容)
- measurement:process_status_log
- タグ(tags)
- user:プロセスを実行しているユーザー名
- command:コマンド名(python / nginx / influxd など)
- program_name:実行スクリプト名(Pythonの場合)
- program_description:スクリプトの説明(__description__変数)
- program_group:スクリプトのグループ(__group__変数)
- service_name:systemdサービス名
- フィールド(fields)
- pid、priority、nice_value、state(プロセス状態)
- children_pids(子プロセスID一覧)、parent_pid、is_parent、is_child
- application_path(実行アプリパス)、program_path(スクリプトパス)
- service_loadstate、service_activestate、service_substate、service_description
- virtual_memory(仮想メモリ)、resident_memory(常駐メモリ)、shared_memory(共有メモリ)
- cpu_percent_raw(生のCPU使用率)、cpu_percent(正規化後)、memory_percent
- cpu_time_total(累積CPU時間・秒)
systemdによる自動起動設定
[Unit]
Description=Process Status Record to InfluxDB
After=network.target influxd.service
[Service]
ExecStart=/usr/bin/python3 /home/<username>/projects/smarthome/core/record/raspi/process_status_record.py
Restart=always
RestartSec=10
User=<username>
[Install]
WantedBy=multi-user.targetsudo cp process_status_record.service /etc/systemd/system/
sudo systemctl daemon-reload
sudo systemctl enable process_status_record
sudo systemctl start process_status_record動作確認
直接実行で確認
python3 process_status_record.pyInfluxDB Data Explorerで確認
from(bucket: "your-bucket")
|> range(start: -1m)
|> filter(fn: (r) => r._measurement == "process_status_log")
|> filter(fn: (r) => r._field == "cpu_percent")各プロセスの cpu_percent が5秒ごとに記録されていれば正常動作しています。
監視対象コマンドごとにデータが存在するか、タグ(program_name・service_nameなど)が正しく付与されているかも確認してください。
Grafanaでの活用例
service_nameタグでフィルタリング → 特定サービスのCPU/メモリ推移グラフprogram_groupタグでグループ化 → グループ別のリソース使用量の比較is_parent = trueに絞り込み → 子プロセス合算済みの正確なリソース使用量のみ表示cpu_time_totalの増分 → プロセスごとのCPU消費速度の比較
まとめ
本記事では、topコマンド・psutil・systemctlの3つを組み合わせて、サーバー上の指定プロセスのリソース情報・サービス情報を5秒ごとにInfluxDB v2へ一括記録する方法を解説しました。
- top × psutil の組み合わせで「スナップショット的な全プロセス情報取得」と「親子関係・コマンドライン詳細」の両方を効率的に取得できる
- systemctl –type=service の一括取得 でサービス情報のAPIコストを最小化し、PID→サービス情報の辞書引きで高速処理を実現
- 子プロセスのリソース合算 により、マルチプロセス型サービス(nginx worker など)の本当の負荷を正確に把握できる
- write_dict_batch によるバッチ書き込み で複数プロセスのデータを1回のAPIコールで効率的にInfluxDBへ保存できる
Grafanaと組み合わせることで、各サービス・各スクリプトのリソース使用傾向を時系列で可視化でき、異常なプロセスの早期検知やサーバーのチューニングに役立てることができます。

