ラズパイAIカメラ(IMX500)で玄関の人物検知サーバーを自作

Raspberry Pi

「誰かが玄関に来たとき、すぐに気づきたい」——そんな要望をスマートホームで実現したくて、ラズパイ AI カメラ(IMX500)を使った人物検知サーバーを自作しました。
IMX500 はカメラチップ自体に AI 推論プロセッサが内蔵されており、Raspberry Pi 本体の CPU をほとんど使わずにリアルタイムで人物検知が行えます。
本記事では Flask でライブ映像ストリームと検知画像を同時に配信するサーバーをゼロから構築する手順を解説します。

なぜこの機能を作ったのか?

家族の帰宅や宅配便の来訪など、玄関への人の出入りをリアルタイムで把握したいシーンは意外と多いです。
市販の防犯カメラやスマートドアベルも存在しますが、クラウド依存・月額費用・カスタマイズの難しさという課題があります。
そこで Raspberry Pi AI カメラ(IMX500)を活用し、完全ローカルで動作する人物検知サーバーを自作することにしました。
検知結果はほかのスマートホームシステムに API で連携できるため、Google Home でのアナウンスや LINE 通知など様々な応用が可能です。

玄関に誰か来たらすぐ気づける仕組みを、クラウド不要・完全ローカルで作りたい!

実現したいこと

  • IMX500 の AI 推論機能を使い、玄関の人物をリアルタイムで検知する
  • Flask サーバーを立て、ブラウザからライブ映像をストリーミング表示できる
  • 人物を検知したときに画像を自動保存し、最新の検知画像もブラウザで確認できる
  • 検知結果(ステータス・画像ファイル名・画像データ)を外部 API に POST して連携できる
  • ライブ映像の通信量(Mbps)をリアルタイムでモニタリングできる
  • Raspberry Pi 起動時から systemd サービスとして自動稼働する

この記事でわかること

  • Raspberry Pi AI カメラ(IMX500)の初期化方法と推論結果の取得方法
  • picamera2 と IMX500 を組み合わせた人物検知の実装方法
  • Flask を使ったライブ映像ストリーミングサーバーの実装方法
  • マルチスレッドで「フレーム取得」「AI 推論」「ストリーム配信」を並列動作させる方法
  • 人物検知時に画像を保存して最新検知画像をブラウザで表示する方法
  • systemd サービスとして登録しラズパイ起動時から自動稼働させる方法

必要な準備と用意するもの

ハードウェア
  • Raspberry Pi 5
  • Raspberry Pi AI カメラ(IMX500 搭載)
  • カメラ接続用フラットケーブル(FPC)
ソフトウェア/サービス
  • Raspberry Pi OS (64-bit 推奨)
  • Python
    • picamera2
    • libcamera
    • opencv-python(cv2)
    • Flask
    • numpy
    • requests
  • imx500-models(推論モデルパッケージ)
    • imx500_network_ssd_mobilenetv2_fpnlite_320x320_pp.rpk
  • systemd(常駐サービス化)

完成イメージ

ブラウザで Raspberry Pi のIPアドレス(ポート5000)にアクセスすると、下記のような画面が表示されます。

  • 左側:玄関カメラのライブ映像(リアルタイムストリーミング)と現在の通信量(Mbps)
  • 右側:直近で人物を検知したときに保存した静止画像と、検知からの経過時間(「xx分前」などの表示)

人物を検知すると、自動的に画像ファイルが保存され、検知結果が外部 API にも POST されます。
ブラウザを開いたまま放置しておくだけで、最新の玄関の状況がひと目でわかります。

システムの仕組み

プログラムは4つのスレッドを並列で動かすことで、映像取得・AI 推論・ストリーム配信・帯域モニタリングを同時に処理しています。

  1. update_frame スレッド:Picamera2 からフレームを約 30FPS で取得してグローバル変数に格納する
  2. detect_and_post スレッド:0.5秒ごとに IMX500 の推論結果を取得し、人物(クラス0・スコア 0.57 以上)を検知したら画像保存と API 送信を行う
  3. monitor_bandwidth スレッド:1秒ごとにストリームの通信量(Mbps)を集計してグローバル変数に保存する
  4. Flask サーバー/video_feed(ライブストリーム)・/stats(通信量)・/latest_detected_info(最終検知時刻)・/(Web UI)の各エンドポイントを提供する

実装のポイント

IMX500 によるオンデバイス AI 推論

CPU 負荷がほぼゼロ
推論処理は IMX500 チップ内部で完結するため、Raspberry Pi 5 の CPU 使用率はほとんど上がらない。映像ストリーミングや API 送信と並行して動かしても安定して動作する

クラウド不要・完全ローカル動作
推論モデルはカメラチップに内蔵して動作するため、外部サービスへのデータ送信が不要でプライバシーが守られる

推論モデルの差し替えに手間がかかる
IMX500 用の推論モデルは専用の .rpk 形式にコンパイルする必要がある。独自モデルを使いたい場合は Sony の IMX500 SDK でのコンパイル作業が必要になる

マルチスレッドによる並列処理

フレーム取得・AI 推論・ストリーム配信をシングルスレッドで書くと、推論待ちでストリームがカクつく問題が発生します。
そのため、各処理を別スレッドに分離し、グローバル変数+ロック(threading.Lock())で安全にフレームデータを共有しています。

スムーズなストリーミング
推論処理(0.5秒ごと)とストリーム配信(約30FPS)が独立して動くため、推論の重さに引きずられずに映像が流れる

古い検知画像の自動削除

プログラム起動時に前日以前の画像を自動削除することで、ストレージを圧迫しないようにしています。

事前準備

imx500-models パッケージのインストール

Raspberry Pi AI カメラ用の推論モデルパッケージをインストールします。

sudo apt update
sudo apt install imx500-all

インストール後、下記パスに推論モデルファイルが配置されていることを確認します。

ls /usr/share/imx500-models/imx500_network_ssd_mobilenetv2_fpnlite_320x320_pp.rpk

Python ライブラリのインストール

pip install flask opencv-python numpy requests

picamera2 は Raspberry Pi OS に標準搭載されていますが、仮想環境(venv)を使う場合は --system-site-packages オプションをつけて作成するか、pip install picamera2 を実行してください。

画像保存ディレクトリの作成

mkdir -p /home/<username>/projects/smarthome/ai_camera/images
mkdir -p /home/<username>/projects/smarthome/ai_camera/static

実装方法

プログラムは1ファイルで完結しています。下記の構成で配置します。

smarthome/
└── ai_camera/
    ├── human_detection_record_and_server.py  # メインプログラム
    ├── images/                               # 検知画像の保存先
    └── static/
        └── latest_detected.jpg              # 最新検知画像(常に上書き)

メインプログラムの作成

# -*- coding: utf-8 -*-
import sys
import os
os.chdir(os.path.dirname(os.path.abspath(__file__)))

import cv2
import time
import datetime
import numpy as np
import threading
import requests
import base64
import shutil
from collections import deque
from flask import Flask, Response, jsonify, stream_with_context
from picamera2 import Picamera2
from picamera2.devices.imx500 import IMX500
from libcamera import Transform

app = Flask(__name__)

# --- グローバル変数 ---
frame = None
lock = threading.Lock()
detected_status = 'NotDetected'
is_detected_human = False
stream_bytes_queue = deque()
active_clients = 0
client_lock = threading.Lock()
latest_bandwidth = 0.0

# --- モデルと API 設定 ---
model_file = "/usr/share/imx500-models/imx500_network_ssd_mobilenetv2_fpnlite_320x320_pp.rpk"
record_api_url = 'http://192.168.xxx.xxx:5001/api/record_porch_ai_camera_data'

# --- 画像保存先 ---
images_dir = '/home/<username>/projects/smarthome/ai_camera/images'
latest_detected_path = '/home/<username>/projects/smarthome/ai_camera/static/latest_detected.jpg'

# --- IMX500 初期化 ---
imx500 = IMX500(model_file)

# --- Picamera2 初期化 ---
picam2 = Picamera2()
video_config = picam2.create_video_configuration(
    main={"format": "RGB888", "size": (int(4056/4), int(3040/4))},
    transform=Transform(hflip=True, vflip=True)
)
picam2.configure(video_config)
picam2.start()

# 起動時に前日以前の古い画像を削除
yesterday = datetime.datetime.now().date() - datetime.timedelta(days=1)
threshold_time = time.mktime(
    datetime.datetime.combine(yesterday, datetime.datetime.max.time()).timetuple()
)
for filename in os.listdir(images_dir):
    filepath = os.path.join(images_dir, filename)
    if os.path.isfile(filepath) and os.path.getmtime(filepath) < threshold_time:
        os.remove(filepath)

# --- フレーム取得スレッド(約30FPS)---
def update_frame():
    global frame
    while True:
        new_frame = picam2.capture_array()
        with lock:
            frame = new_frame
        time.sleep(0.03)

# --- AI 推論 & API 送信スレッド(0.5秒ごと)---
def detect_and_post():
    global detected_status, is_detected_human
    last_detect_time = 0

    while True:
        with lock:
            local_frame = frame.copy() if frame is not None else None

        if local_frame is None:
            time.sleep(0.1)
            continue

        now = time.time()
        if now - last_detect_time >= 0.5:
            last_detect_time = now
            metadata = picam2.capture_metadata()
            outputs = imx500.get_outputs(metadata)

            detected_status = 'NotDetected'
            is_detected_human = False
            image_filename = None
            jpg_base64 = None

            if outputs:
                boxes, scores, classes = outputs[0][0], outputs[1][0], outputs[2][0]
                for score, cls in zip(np.atleast_1d(scores), np.atleast_1d(classes)):
                    # クラス0 = 人物、スコアが0.57以上で検知とみなす
                    if cls == 0 and score > 0.57:
                        detected_status = 'Detected'
                        is_detected_human = True

                        # 検知画像を保存
                        timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
                        image_filename = f"detected_{timestamp}_[{score}].jpg"
                        save_path = f"{images_dir}/{image_filename}"
                        with lock:
                            cv2.imwrite(save_path, local_frame)
                        # 最新検知画像として static に上書きコピー
                        shutil.copy(save_path, latest_detected_path)

                        # Base64 変換(API 送信用)
                        _, buffer = cv2.imencode('.jpg', local_frame)
                        jpg_base64 = base64.b64encode(buffer).decode('utf-8')
                        break
            else:
                detected_status = 'FailedToGetResults'

            # 検知結果を外部 API に POST
            try:
                response = requests.post(record_api_url, json={
                    'detected_status': detected_status,
                    'is_detected_human': is_detected_human,
                    'image_filename': image_filename,
                    'image_base64': jpg_base64
                })
            except requests.exceptions.RequestException as e:
                print(f'リクエストエラー: {e}')
        else:
            time.sleep(0.05)

# --- ライブストリーム生成 ---
def gen_frames():
    global active_clients
    with client_lock:
        active_clients += 1
    try:
        while True:
            with lock:
                if frame is None:
                    time.sleep(0.01)
                    continue
                _, buffer = cv2.imencode('.jpg', frame, [int(cv2.IMWRITE_JPEG_QUALITY), 50])
                jpg_frame = buffer.tobytes()

            stream_bytes_queue.append(len(jpg_frame))
            try:
                yield (b'--frame\r\n'
                       b'Content-Type: image/jpeg\r\n\r\n' + jpg_frame + b'\r\n')
            except (BrokenPipeError, ConnectionResetError):
                break
            time.sleep(0.03)
    finally:
        with client_lock:
            active_clients = max(0, active_clients - 1)

# --- 通信量モニタリングスレッド(1秒ごと)---
def monitor_bandwidth():
    global latest_bandwidth
    while True:
        time.sleep(1.0)
        total_bytes = 0
        while stream_bytes_queue:
            total_bytes += stream_bytes_queue.popleft()
        latest_bandwidth = (total_bytes * 8) / 1_000_000

# --- Flask エンドポイント ---
@app.route('/video_feed')
def video_feed():
    return Response(stream_with_context(gen_frames()),
                    mimetype='multipart/x-mixed-replace; boundary=frame')

@app.route('/stats')
def stats():
    return jsonify({'clients': active_clients, 'bandwidth_mbps': round(latest_bandwidth, 2)})

@app.route('/latest_detected_info')
def latest_detected_info():
    if not os.path.exists(latest_detected_path):
        return jsonify({'timestamp': None})
    mtime = os.path.getmtime(latest_detected_path)
    return jsonify({'timestamp': datetime.datetime.fromtimestamp(mtime).isoformat()})

@app.route('/')
def index():
    return '''<html>
    <head>
      <style>
        html,body{margin:0;padding:0;height:100%;background:#000;overflow:hidden;display:flex;flex-direction:row;}
        #left{position:relative;height:100%;display:flex;justify-content:center;align-items:center;overflow:hidden;}
        #stream{width:100%;height:100%;object-fit:cover;display:block;}
        #stats{position:absolute;top:10px;right:10px;color:lime;background:rgba(0,0,0,0.6);padding:6px 12px;border-radius:6px;font-family:monospace;font-size:14px;}
        #right{height:100%;background:#222;display:flex;justify-content:flex-end;align-items:center;}
        #detected-container{position:relative;height:100%;display:flex;align-items:center;justify-content:center;}
        #latest-detected{height:100%;width:auto;object-fit:contain;display:block;}
        #timestamp{position:absolute;top:10px;right:10px;background:rgba(0,0,0,0.5);color:white;padding:4px 8px;font-size:14px;border-radius:4px;font-family:monospace;}
      </style>
      <script>
        function formatTimeAgo(t){
          const d=Math.floor((new Date()-new Date(t))/1000);
          if(d<60)return d+' 秒前';
          if(d<3600)return Math.floor(d/60)+' 分前';
          if(d<86400)return Math.floor(d/3600)+' 時間前';
          return Math.floor(d/86400)+' 日前';
        }
        setInterval(()=>{
          fetch('/latest_detected_info').then(r=>r.json()).then(d=>{
            document.getElementById('timestamp').innerText = d.timestamp ? formatTimeAgo(d.timestamp) : '検出時刻不明';
          });
          fetch('/stats').then(r=>r.json()).then(d=>{
            document.getElementById('stats').innerText = d.bandwidth_mbps.toFixed(2)+' Mbps';
          });
        },1000);
      </script>
    </head>
    <body>
      <div id="left">
        <img id="stream" src="/video_feed"/>
        <div id="stats">取得中...</div>
      </div>
      <div id="right">
        <div id="detected-container">
          <img id="latest-detected" src="/static/latest_detected.jpg"/>
          <div id="timestamp">検出時刻取得中...</div>
        </div>
      </div>
    </body>
    </html>'''

# --- メイン ---
if __name__ == "__main__":
    threading.Thread(target=update_frame,      daemon=True).start()
    threading.Thread(target=detect_and_post,   daemon=True).start()
    threading.Thread(target=monitor_bandwidth, daemon=True).start()
    app.run(host='0.0.0.0', port=5000, threaded=True)

systemd サービスとして登録する

下記のサービスファイルを /etc/systemd/system/ に配置します。

[Unit]
Description=Human Detection Record and Server

[Service]
ExecStart=sudo /usr/bin/python3 /home/<username>/projects/smarthome/ai_camera/human_detection_record_and_server.py
Restart=always
RestartSec=60
Type=simple
User=<username>

[Install]
WantedBy=multi-user.target
# サービスを有効化
sudo systemctl enable human-detection-record-and-server.service
# サービスを起動
sudo systemctl start human-detection-record-and-server.service
# 状態確認
sudo systemctl status human-detection-record-and-server.service

コードの解説

IMX500 の初期化と推論結果の取得

IMX500 は IMX500(model_file) でインスタンス化するだけで使えます。
推論結果は picam2.capture_metadata() でメタデータを取得したあと imx500.get_outputs(metadata) で取り出します。

metadata = picam2.capture_metadata()
outputs = imx500.get_outputs(metadata)

if outputs:
    boxes, scores, classes = outputs[0][0], outputs[1][0], outputs[2][0]
    for score, cls in zip(scores, classes):
        if cls == 0 and score > 0.57:   # クラス0 = 人物
            print("人を検出しました。スコア:", score)

classes の値はモデルの学習クラスに対応しており、SSD MobileNetV2 では クラス 0 が人物(person) です。
スコアの閾値(今回は 0.57)は環境に応じて調整してください。低すぎると誤検知が増え、高すぎると検知漏れが増えます。

MJPEG ストリーミングの仕組み

Flask で MJPEG ストリームを配信するには、multipart/x-mixed-replace という MIME タイプを使ってフレームを連続送信します。
クライアントが切断したときに BrokenPipeError が発生するため、try/except で捕捉してスレッドを安全に終了させています。

yield (b'--frame\r\n'
       b'Content-Type: image/jpeg\r\n\r\n' + jpg_frame + b'\r\n')

カメラ解像度の設定

IMX500 のフル解像度は 4056×3040(10fps)ですが、ストリーミングには重すぎます。
そのため 1/4 サイズ(1014×760)に落として約 30FPS で取得しています。

video_config = picam2.create_video_configuration(
    main={"format": "RGB888", "size": (int(4056/4), int(3040/4))},  # 1014 x 760
    transform=Transform(hflip=True, vflip=True)  # カメラの取り付け向きに合わせて反転
)

カメラの取り付け方向によって hflipvflip の値を変更してください。上下反転して映る場合は vflip=True、左右反転の場合は hflip=True を設定します。

JPEG 品質によるストリーム帯域の制御

ストリーム配信時の JPEG エンコードは品質50(IMWRITE_JPEG_QUALITY = 50)に設定しています。
品質を上げると画質は良くなりますが通信量が増えます。Wi-Fi 環境や用途に合わせて調整してください。

動作確認

サービス起動後、同じネットワーク内のブラウザで下記 URL にアクセスします。

http://<ラズパイのIPアドレス>:5000/

左側にライブ映像が表示され、右側に最新の検知画像が表示されれば成功です。
玄関の前に立つと人物が検知され、右側の画像が更新されます。

ログをリアルタイムで確認するには下記コマンドを使います。

sudo journalctl -u human-detection-record-and-server.service -f

サービス起動直後に IMX500 の初期化が完了するまで数秒かかります。ブラウザにアクセスしても映像が出ない場合は、10秒ほど待ってからリロードしてみてください。

まとめ

この記事では、Raspberry Pi AI カメラ(IMX500)を使って玄関の人物検知サーバーをゼロから構築しました。

  • IMX500 チップ内蔵 AI でオンデバイス人物検知(CPU 負荷ほぼゼロ)
  • マルチスレッドでフレーム取得・推論・ストリーミングを並列動作
  • Flask で MJPEG ライブストリーミングと検知画像表示の Web UI を提供
  • 検知結果を外部 API に POST してほかのスマートホームシステムと連携可能
  • systemd サービスで Raspberry Pi 起動時から自動稼働

検知結果の API 連携を活用すれば、Google Home でのアナウンスや LINE 通知、スマートロックとの連携など様々な応用が広がります。ぜひ試してみてください。

タイトルとURLをコピーしました