1. はじめに — なぜ最初に学ぶべきは Modbus なのか

「PLC や産業機器を Python から扱いたい」と考えたとき、最初に学ぶプロトコルとしてもっとも費用対効果が高いのが Modbus です。理由はシンプルで、多くのメーカーがメーカー固有プロトコルとは別に Modbus 対応をオプションで持っているため、メーカー横断で同じ知識が使い回せるからです。

三菱、キーエンス、オムロン、シーメンス、シュナイダー、富士電機、横河——主要 PLC メーカーは何らかの形で Modbus に対応しています。インバータ、温調器、電力計、流量計のような周辺機器でも Modbus 対応は標準的です。

本記事は、Modbus が初めての方でも自宅 PC だけで実装と動作確認まで完了できるように構成しています。シミュレータ pymodbus.server を使い、サーバーとクライアントを同一 PC で動かす形で全コードを再現できます。

2. Modbus プロトコルの基礎

Modbus はリクエスト・レスポンス型のシンプルなプロトコルです。「どのアドレスのデータを、どれだけ、どの操作で」を 1 リクエストで送り、サーバー側が結果を返します。

2.1 TCP / RTU / ASCII の違い

  • Modbus TCP: Ethernet 上で動く。ホスト名・ポート(既定 502)でアクセス。初学者はこれから始めるのが一番楽
  • Modbus RTU: シリアル(RS-232C / RS-485)で動くバイナリ形式。CRC 付き。物理層がシリアルなので、USB-シリアル変換ケーブルなどが必要
  • Modbus ASCII: シリアルでテキスト形式。実用では RTU のほうが一般的

2.2 4 種類のデータ領域

Modbus には扱うデータの種類が 4 つあり、それぞれ別のファンクションコードでアクセスします。

  • コイル(Coil): 1 ビット、読み書き可能。主にデジタル出力
  • ディスクリート入力(Discrete Input): 1 ビット、読み取り専用。主にデジタル入力
  • 保持レジスタ(Holding Register): 16 ビット、読み書き可能。主にアナログ出力・設定値
  • 入力レジスタ(Input Register): 16 ビット、読み取り専用。主にアナログ入力

業務でもっとも頻繁に使うのは保持レジスタです。「設定値の書き込み」「測定値の読み取り」「制御指令の書き込み」がすべて保持レジスタで完結することが多いはずです。

2.3 主要なファンクションコード

  • FC01: コイル読み取り
  • FC02: ディスクリート入力読み取り
  • FC03: 保持レジスタ読み取り
  • FC04: 入力レジスタ読み取り
  • FC05: 単一コイル書き込み
  • FC06: 単一保持レジスタ書き込み
  • FC15: 複数コイル書き込み
  • FC16: 複数保持レジスタ書き込み

pymodbus ではこれらが read_holding_registerswrite_register のようにメソッド名で表現されており、ファンクションコードを覚えていなくても使えます。

3. 環境構築

3.1 pymodbus のインストール

python -m pip install "pymodbus>=3.0"

pymodbus は 3.x 系で API が大きく変わっています。古い記事のサンプルが動かないことがあるため、本記事では 3.x 系を前提に記述します。

3.2 動作確認用シミュレータ

pymodbus には Modbus サーバー機能が同梱されており、サーバーとクライアントを同一 PC で起動できます。実機がなくても、本記事のコードはすべて自宅 PC で動作確認できます。

4. シミュレータサーバーの起動

まずは「クライアントから接続して読み書きできる」サーバーを 1 つ立ち上げます。テスト用に保持レジスタとコイルにダミーデータを入れておきます。

# server.py — Modbus TCP サーバー(シミュレータ)
import logging

from pymodbus.datastore import (
    ModbusSequentialDataBlock,
    ModbusSlaveContext,
    ModbusServerContext,
)
from pymodbus.server import StartTcpServer

logging.basicConfig(level=logging.INFO)


def build_context() -> ModbusServerContext:
    # 0 番地から 100 個のレジスタ/コイルを用意し、初期値を設定
    holding = ModbusSequentialDataBlock(0, [0] * 100)
    coils = ModbusSequentialDataBlock(0, [False] * 100)
    inputs = ModbusSequentialDataBlock(0, [0] * 100)
    discrete = ModbusSequentialDataBlock(0, [False] * 100)

    # 動作確認用の初期値
    holding.setValues(0, [25, 30, 50])  # 温度っぽい値
    coils.setValues(0, [True, False, True])

    slave = ModbusSlaveContext(
        di=discrete, co=coils, hr=holding, ir=inputs,
    )
    return ModbusServerContext(slaves=slave, single=True)


if __name__ == "__main__":
    StartTcpServer(context=build_context(), address=("127.0.0.1", 5020))

標準ポート 502 は管理者権限が必要なため、ローカル動作確認では 5020 を使うのがおすすめです。このスクリプトを実行すると、ターミナルで Modbus TCP サーバーが起動します。

5. クライアント側の実装パターン

5.1 接続と切断の基本形

# client_basic.py
from pymodbus.client import ModbusTcpClient

client = ModbusTcpClient("127.0.0.1", port=5020)
if not client.connect():
    raise ConnectionError("failed to connect")

try:
    # 0 番地から 3 個読む
    result = client.read_holding_registers(address=0, count=3, slave=1)
    if result.isError():
        print(f"modbus error: {result}")
    else:
        print(f"values: {result.registers}")
finally:
    client.close()

ポイント:

  • client.connect() の戻り値を必ず確認。False なら通信前にエラーで止める
  • レスポンスに isError() が用意されている。例外ではなくこちらで判定する
  • finallyclose()。例外時にも socket を閉じる

5.2 単一レジスタの書き込み

client.write_register(address=10, value=100, slave=1)

たとえば「アドレス 10 に温度しきい値 100 を書き込む」のような単純なケース。書き込み後は読み戻して値が反映されたことを確認すると安全です。

5.3 複数レジスタの一括書き込み

# 32bit 値(例: 設備の稼働時間カウンタ)を 2 レジスタに分けて書く
seconds = 123456
high = (seconds >> 16) & 0xFFFF
low = seconds & 0xFFFF
client.write_registers(address=20, values=[high, low], slave=1)

16 ビットでは表現しきれない値(32 ビット整数、浮動小数)を扱う際は 2 レジスタに分けます。エンディアンとワード順は機器仕様により異なるため、必ず仕様書を確認してください。

5.4 32 ビット浮動小数の読み書き

製造機器のアナログ値は IEEE 754 単精度浮動小数(4 バイト = 2 レジスタ)で扱われることがよくあります。pymodbus には専用のヘルパーがあります。

from pymodbus.client import ModbusTcpClient
from pymodbus.payload import (
    BinaryPayloadBuilder,
    BinaryPayloadDecoder,
)
from pymodbus.constants import Endian

client = ModbusTcpClient("127.0.0.1", port=5020)
client.connect()

# 書き込み
builder = BinaryPayloadBuilder(byteorder=Endian.BIG, wordorder=Endian.BIG)
builder.add_32bit_float(123.45)
client.write_registers(address=30, values=builder.to_registers(), slave=1)

# 読み取り
result = client.read_holding_registers(address=30, count=2, slave=1)
decoder = BinaryPayloadDecoder.fromRegisters(
    result.registers, byteorder=Endian.BIG, wordorder=Endian.BIG,
)
value = decoder.decode_32bit_float()
print(f"float: {value}")

client.close()

byteorderwordorder の組み合わせは機器ごとに異なります(Big-Big、Big-Little、Little-Big、Little-Little の 4 通り)。値が異常に大きい・小さい・NaN になる場合は、まずここを疑ってください。

6. 現場で使える実装パターン

パターン A: 定周期ポーリング

1 秒に 1 回温度・圧力・回転数を読みに行き、CSV に書き出す典型的なロガーです。

# logger.py
import csv
import datetime as dt
import time
from pathlib import Path

from pymodbus.client import ModbusTcpClient

INTERVAL = 1.0
LOG_PATH = Path("data.csv")


def main() -> None:
    client = ModbusTcpClient("127.0.0.1", port=5020)
    client.connect()

    write_header = not LOG_PATH.exists()
    with LOG_PATH.open("a", encoding="utf-8", newline="") as f:
        writer = csv.writer(f)
        if write_header:
            writer.writerow(["timestamp", "temperature", "pressure", "rpm"])

        try:
            while True:
                t0 = time.monotonic()
                res = client.read_holding_registers(address=0, count=3, slave=1)
                if res.isError():
                    print(f"error: {res}")
                else:
                    writer.writerow([
                        dt.datetime.now().isoformat(timespec="seconds"),
                        *res.registers,
                    ])
                    f.flush()
                # 周期維持(処理時間を差し引いて sleep)
                elapsed = time.monotonic() - t0
                time.sleep(max(0.0, INTERVAL - elapsed))
        finally:
            client.close()


if __name__ == "__main__":
    main()

ポイント:

  • time.monotonic() 基準で「処理時間を差し引いた sleep」にすることで周期がずれにくい
  • encoding="utf-8" 明示。CP932 環境でも UTF-8 で統一すれば後段の解析が楽になる
  • f.flush() で書き込みを即時反映。アプリが落ちてもデータが残る

パターン B: しきい値監視 → アラート

異常値を検知してアラートを出す監視。チャタリング対策と通知重複防止が現場の鍵です。

import logging
import time

from pymodbus.client import ModbusTcpClient

THRESHOLD = 80
COOLDOWN = 60  # アラート間の最小間隔(秒)
HYSTERESIS = 5  # 復帰しきい値(80 で発報、75 で復帰)

logger = logging.getLogger(__name__)


def monitor_loop() -> None:
    client = ModbusTcpClient("127.0.0.1", port=5020)
    client.connect()
    in_alert = False
    last_alert = 0.0

    try:
        while True:
            res = client.read_holding_registers(address=0, count=1, slave=1)
            if res.isError():
                logger.warning("read error: %s", res)
                time.sleep(1.0)
                continue

            value = res.registers[0]
            now = time.time()

            if not in_alert and value > THRESHOLD and (now - last_alert) > COOLDOWN:
                logger.warning("ALERT: value=%s exceeds threshold=%s", value, THRESHOLD)
                in_alert = True
                last_alert = now
            elif in_alert and value < (THRESHOLD - HYSTERESIS):
                logger.info("RECOVERED: value=%s back to normal", value)
                in_alert = False

            time.sleep(1.0)
    finally:
        client.close()

ヒステリシス(発報しきい値と復帰しきい値を分ける)と、クールダウン(連続発報の抑止)の組み合わせは、現場運用で必須の設計です。これがないと、しきい値付近で値が振動するたびにアラートが大量発報し、現場の信頼を失います。

パターン C: レシピ書き込み(一括設定)

生産品種切替時に、しきい値や設定値を一気に書き込みたいケース。「人間が間違えやすい順番で書き込まないこと」がポイントです。

RECIPE_A = {
    10: 100,   # 温度しきい値
    11: 50,    # 圧力しきい値
    12: 1500,  # 設備回転数
}


def apply_recipe(client, recipe: dict[int, int]) -> None:
    for addr, value in recipe.items():
        rsp = client.write_register(address=addr, value=value, slave=1)
        if rsp.isError():
            raise RuntimeError(f"failed to write addr={addr}: {rsp}")
    # 書き込み後の読み戻しで検算
    addrs = sorted(recipe.keys())
    rsp = client.read_holding_registers(address=addrs[0], count=len(addrs), slave=1)
    for addr, expected in recipe.items():
        actual = rsp.registers[addr - addrs[0]]
        if actual != expected:
            raise RuntimeError(f"verify failed at addr={addr}: expected={expected} actual={actual}")

レシピ書き込みは現場の安全に直結するため、書き込み後の読み戻し検算は必ず実装してください。書き込み成功でも、機器側で値域外の値が拒否されるケースがあります。

7. RTU(シリアル)の場合

pymodbus は RTU もほぼ同じインターフェースで使えます。違うのは接続先の指定です。

from pymodbus.client import ModbusSerialClient

client = ModbusSerialClient(
    port="COM3",          # Linux なら "/dev/ttyUSB0"
    baudrate=9600,
    bytesize=8,
    parity="N",
    stopbits=1,
    timeout=1,
)
client.connect()
res = client.read_holding_registers(address=0, count=3, slave=1)
client.close()

シリアル通信特有の注意点:

  • 機器側の通信設定(ボーレート・パリティ・ストップビット・スレーブアドレス)と完全一致させること
  • RS-485 の場合は終端抵抗の有無、半二重の方向制御を確認
  • USB-シリアル変換ケーブルのドライバ(FTDI 系Prolific 系など)を現場 PC にインストール
  • COM ポート番号は再起動・差し替えで変わることがある。固定するなら Windows のデバイスマネージャから設定

8. 24 時間運用設計のチェックリスト

本記事のサンプルは「動く」段階のコードです。長期運用にもっていくには、以下を加えていきます。

  • 例外時の自動再接続: connect 失敗・タイムアウト・通信エラー時にバックオフ付きで再接続
  • ログのファイル出力 + ローテーション: logging.handlers.TimedRotatingFileHandler
  • 同時接続制限: 多くの PLC は同時接続数に制限あり。ポーリングプロセスは 1 つに集約する
  • ポーリング周期の調整: 機器の応答性能を超える周期を設定すると、レスポンスが詰まり全体が遅くなる
  • 書き込みの権限分離: 「読み取り専用クライアント」と「書き込み権限ありクライアント」を別プロセスにし、誤書き込みリスクを下げる
  • サマータイム / 時刻同期: 海外現場では時刻ずれが集計に影響。NTP 同期を必ず入れる

9. おわりに

Modbus は仕様がシンプルで、Python との相性も極めて良いプロトコルです。一度 pymodbus でこのパターンを身に付けておけば、PLC・インバータ・電力計・温調器・流量計など、現場のほぼ全ての機器が同じインターフェースで扱えるようになります。

本記事のサンプルは、自宅環境のシミュレータで全ステップを再現できる構成にしています。「実機が触れない」という制約は、もはや学習を止める理由にはなりません。

関連記事