1. はじめに — なぜ TCP/IP の自前実装が必要なのか

産業機器との通信は、Modbus TCP や OPC UA のような標準プロトコルが使えれば話は早く、専用ライブラリで済ませられます。しかしながら、現場では「メーカー独自のバイナリプロトコル」「機器ベンダーの仕様書(PDF)通りに socket でバイト列を組み立てる必要がある」という案件が、いまだに数多く存在します。

たとえば、画像検査装置・特殊計測器・受託開発の制御装置・メーカー独自プロトコルの専用機など、汎用プロトコルを実装していない機器は珍しくありません。こうした機器との通信は、Python の標準 socket モジュールで自前実装するのが最も汎用性が高い解になります。

本記事は、最小サンプルから入って、現場で 24 時間運用するレベルまで段階的に積み上げる構成です。すべて自宅環境(Python の socket ベースの簡易シミュレータ)で再現確認できます。

2. TCP/IP 通信の基本構造

TCP/IP 通信は、サーバー側(受信待ち)とクライアント側(接続要求)の役割分担で動きます。産業機器との通信では、機器がサーバーになるパターンと、機器がクライアントになるパターンの両方があります。

2.1 サーバーとクライアントの役割

  • サーバー側: bind でアドレスを確保 → listen で待機 → accept で接続受付 → recv / send でやり取り
  • クライアント側: connect で接続 → send / recv でやり取り → close

機器が「常時待ち受けるサーバー」である場合は Python 側がクライアントとして接続しに行きます。一方、機器側がイベント発生時に通知を「送信してくる」場合は Python 側がサーバーとして待ち受ける必要があります。仕様書を読む際は、まずどちら側が bind するのかを確認してください。

2.2 最小実装(サーバーとクライアント)

まずは動かすことを優先した最小実装です。シミュレータとして、サーバー側スクリプトを 1 つ用意しておけば、クライアントの実装テストは PC 単体で完結します。

# server_simulator.py — 自宅 PC で動かす機器シミュレータ
import socket

HOST, PORT = "127.0.0.1", 9000

with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
    s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
    s.bind((HOST, PORT))
    s.listen()
    print(f"listening on {HOST}:{PORT}")
    conn, addr = s.accept()
    with conn:
        print(f"connected: {addr}")
        while True:
            data = conn.recv(1024)
            if not data:
                break
            print(f"recv: {data!r}")
            conn.sendall(b"ACK\n")
# client.py — Python から機器に接続する側
import socket

HOST, PORT = "127.0.0.1", 9000

with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
    s.connect((HOST, PORT))
    s.sendall(b"PING\n")
    response = s.recv(1024)
    print(f"response: {response!r}")

この時点で、Python での TCP/IP 通信は「動く」状態になります。問題はこの先です。現場ではこのコードのままでは、ほぼ確実に運用に耐えません。

3. 現場でハマる 5 つの落とし穴

本セクションでは、業務で実際に遭遇する代表的な失敗パターンを 5 つに整理します。いずれも「最小サンプルでは見えないが、長時間運用で必ず顕在化する」種類の問題です。

落とし穴 1: パケット境界が保証されない

TCP はストリームプロトコルです。送信側で sendall(b"FRAME1") / sendall(b"FRAME2") と 2 回送っても、受信側に b"FRAME1FRAME2" として 1 回でまとめて届く(あるいは逆に、1 つのフレームが分割して届く)可能性があります。「1 回の send = 1 回の recv」が成立すると思って実装すると、必ず破綻します。

対策: フレーム境界をプロトコル側で明示する。代表的な方式は次の 2 つです。

  • 区切り文字方式: 改行 \n や独自バイトでフレーム終端を表す(テキスト系プロトコルで多用)
  • 長さプレフィックス方式: 先頭に「以降のデータ長」を入れる(バイナリ系プロトコルの定石)

落とし穴 2: recv は要求バイト数を返してくれない

recv(1024) は「最大 1024 バイト読む」のであって「ちょうど 1024 バイト返す」わけではありません。実際には、1 バイトしか返ってこない場合もあります。

対策: 必要なバイト数が揃うまでループで読む受信ヘルパーを用意します。

def recv_exact(sock: socket.socket, n: int) -> bytes:
    """正確に n バイト受信するまでブロックする。途中切断は例外で通知。"""
    buf = bytearray()
    while len(buf) < n:
        chunk = sock.recv(n - len(buf))
        if not chunk:
            raise ConnectionError("connection closed during recv")
        buf.extend(chunk)
    return bytes(buf)


def recv_frame(sock: socket.socket) -> bytes:
    """長さプレフィックス(4 バイト big-endian)方式でフレームを受信する。"""
    header = recv_exact(sock, 4)
    length = int.from_bytes(header, "big")
    return recv_exact(sock, length)

この recv_exact / recv_frame をすべての受信箇所で使うことで、パケット境界とバイト数の問題が同時に解決します。

落とし穴 3: タイムアウトを設定しないと永遠にブロックする

デフォルトの socket はブロッキングモードです。機器が応答しないまま無音になった場合、recv が永遠にブロックし、監視アプリが「ハングしているのか動いているのか」分からなくなります。

対策: 接続直後に settimeout を必ず指定します。タイムアウト値は機器仕様の応答時間 + マージン(一般に 2〜5 倍)が目安です。

sock.settimeout(5.0)  # 5 秒で socket.timeout 例外
try:
    data = recv_frame(sock)
except socket.timeout:
    # タイムアウトを業務ログに記録し、必要なら再送・再接続へ
    logger.warning("recv timeout, will reconnect")
    raise

落とし穴 4: 切断検知が遅い・できない

ケーブルが抜けた、ハブの電源が落ちた、相手機器が再起動した——こうした物理切断は、TCP の仕様上「すぐには検知されない」ことがあります。OS のデフォルト設定では、無通信状態が 2 時間以上続いてようやく検知されるケースもあります。

対策: 「アプリ層での Keep-Alive(定期 PING)」または「TCP Keep-Alive のチューニング」のいずれか、可能なら両方を入れます。

def enable_tcp_keepalive(sock: socket.socket,
                         idle: int = 30,
                         interval: int = 10,
                         count: int = 3) -> None:
    sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
    # Linux 固有のオプション。Windows は別 API が必要
    if hasattr(socket, "TCP_KEEPIDLE"):
        sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPIDLE, idle)
    if hasattr(socket, "TCP_KEEPINTVL"):
        sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPINTVL, interval)
    if hasattr(socket, "TCP_KEEPCNT"):
        sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPCNT, count)

OS 依存性を気にする場合は、アプリ層で 30 秒おきに PING を送り、応答が来なければ再接続する設計のほうが移植性が高くなります。

落とし穴 5: スレッド設計を間違えると全体が止まる

受信ループと UI(tkinter など)を同じスレッドで回すと、受信が詰まった瞬間に画面が固まります。逆に、安易にスレッドを増やすと、socket への並行アクセスでデータが混ざります。

対策: 「socket 専用スレッド 1 本 + queue.Queue で UI 側に渡す」が基本形です。socket には 1 スレッドしかアクセスしない設計にします。

4. 24 時間運用に耐える完全実装

落とし穴対策をまとめた、再接続・タイムアウト・フレーム境界・スレッド分離をすべて含む実装パターンを示します。これは 24 時間運用に耐える監視アプリの典型的な設計を、自宅環境向けに整理したものです。

4.1 接続マネージャ

import logging
import queue
import socket
import threading
import time
from typing import Optional

logger = logging.getLogger(__name__)


class DeviceClient:
    """産業機器との TCP 接続を管理するクライアント。

    設計方針:
        - 切断は例外ではなく状態として扱う(自動再接続)
        - 受信は専用スレッド、データは queue 経由で外に渡す
        - 外部からは start / stop / send だけ呼べばよい
    """

    def __init__(self, host: str, port: int,
                 reconnect_interval: float = 3.0,
                 recv_timeout: float = 5.0):
        self.host = host
        self.port = port
        self.reconnect_interval = reconnect_interval
        self.recv_timeout = recv_timeout
        self.recv_queue: queue.Queue[bytes] = queue.Queue(maxsize=1000)
        self._sock: Optional[socket.socket] = None
        self._stop_event = threading.Event()
        self._thread: Optional[threading.Thread] = None
        self._send_lock = threading.Lock()

    def start(self) -> None:
        self._stop_event.clear()
        self._thread = threading.Thread(target=self._run, daemon=True)
        self._thread.start()

    def stop(self) -> None:
        self._stop_event.set()
        self._close_socket()
        if self._thread:
            self._thread.join(timeout=5)

    def send(self, payload: bytes) -> None:
        with self._send_lock:
            if not self._sock:
                raise ConnectionError("not connected")
            length = len(payload).to_bytes(4, "big")
            self._sock.sendall(length + payload)

    def _run(self) -> None:
        while not self._stop_event.is_set():
            try:
                self._connect()
                self._recv_loop()
            except (ConnectionError, OSError, socket.timeout) as e:
                logger.warning("connection error: %s", e)
            finally:
                self._close_socket()
            if not self._stop_event.is_set():
                time.sleep(self.reconnect_interval)

    def _connect(self) -> None:
        sock = socket.create_connection((self.host, self.port), timeout=5.0)
        sock.settimeout(self.recv_timeout)
        self._sock = sock
        logger.info("connected to %s:%s", self.host, self.port)

    def _recv_loop(self) -> None:
        assert self._sock
        while not self._stop_event.is_set():
            frame = self._recv_frame()
            try:
                self.recv_queue.put_nowait(frame)
            except queue.Full:
                logger.warning("recv_queue is full, dropping frame")

    def _recv_frame(self) -> bytes:
        assert self._sock
        header = self._recv_exact(4)
        length = int.from_bytes(header, "big")
        if length > 1024 * 1024:
            raise ConnectionError(f"frame too large: {length}")
        return self._recv_exact(length)

    def _recv_exact(self, n: int) -> bytes:
        assert self._sock
        buf = bytearray()
        while len(buf) < n:
            chunk = self._sock.recv(n - len(buf))
            if not chunk:
                raise ConnectionError("peer closed")
            buf.extend(chunk)
        return bytes(buf)

    def _close_socket(self) -> None:
        if self._sock:
            try:
                self._sock.close()
            except OSError:
                pass
            self._sock = None

このクラスは、外から見ると start / stop / send / recv_queue の 4 つだけで使えます。再接続・タイムアウト・フレーム境界処理・例外処理は、内部で完結しています。

4.2 利用側のコード

logging.basicConfig(level=logging.INFO)
client = DeviceClient("127.0.0.1", 9000)
client.start()

try:
    while True:
        try:
            frame = client.recv_queue.get(timeout=1.0)
        except queue.Empty:
            continue
        # 業務処理(パース、ログ記録、UI 更新の通知)
        print(f"frame: {frame!r}")
except KeyboardInterrupt:
    pass
finally:
    client.stop()

UI(tkinter / PyQt / Streamlit)から使う場合は、UI 側の定期コールバックで recv_queue.get_nowait() を呼ぶ設計にすると、画面側はノンブロッキングで動き続けます。

5. シミュレータでの動作確認

本記事のクライアント実装は、対応するサーバーシミュレータと組み合わせると、PC 単体で完全に再現確認できます。シミュレータは長さプレフィックス方式に合わせた送受信をするだけのシンプルな構造です。

# server_with_length_prefix.py
import socket
import struct

HOST, PORT = "127.0.0.1", 9000

with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
    s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
    s.bind((HOST, PORT))
    s.listen()
    while True:
        conn, addr = s.accept()
        with conn:
            print(f"connected: {addr}")
            counter = 0
            try:
                while True:
                    payload = f"DATA-{counter}".encode("utf-8")
                    conn.sendall(struct.pack(">I", len(payload)) + payload)
                    counter += 1
                    import time
                    time.sleep(0.5)
            except (BrokenPipeError, ConnectionResetError):
                print("disconnected")

このシミュレータを起動した上で前述の DeviceClient を動かすと、0.5 秒ごとに DATA-0, DATA-1, ...recv_queue に積まれていきます。試しにシミュレータを Ctrl+C で落とし、再起動してみてください。クライアント側がログに「connection error → reconnect」を出しながら、自動復旧することを確認できます。

6. 24 時間運用設計のチェックリスト

長期運用を見据えるとき、上記の実装に加えて確認すべきポイントを整理します。

  • ログ: logging でファイル出力、ローテーション(RotatingFileHandler もしくは TimedRotatingFileHandler)、文字コード明示(Windows なら encoding="utf-8" を必ず指定)
  • メモリ: queue.Queue(maxsize=...) で上限を必ず設定。無制限にすると、消費側が止まったときにメモリリーク
  • 例外: 受信ループで「予想外の例外でスレッドが死ぬ」と無音故障になる。最外で try/except Exception を入れて再接続フローへ流す
  • 監視: 「最後に受信した時刻」を別スレッドで監視し、無通信が一定時間続いたらアラート
  • 停止: KeyboardInterrupt やサービス停止シグナルで、open している socket を確実に閉じる(finally 句必須)
  • 時刻: 受信ログには「受信時刻」と「フレームの送信時刻(ペイロードに含まれているなら)」の両方を残す。遅延解析の必須情報

7. パフォーマンスの考え方

「Python の socket は遅いのでは」という質問をよく受けますが、現場でボトルネックになることはほとんどありません。実測で 1 秒あたり数千〜数万フレームの送受信は標準 socket でも可能です。

むしろパフォーマンス問題の多くは次の 3 点に起因します。

  • UI スレッドで重い処理(描画、ファイル IO、pandas 集計)を同期的に実行している
  • queue を経由せずに socket から直接 UI を更新している
  • 不要な printlogger.debug が高頻度で動き、コンソールがボトルネックになっている

1 秒あたり 1 万フレーム以上扱う必要があるケースでは、asyncio ベースに切り替える、またはバッファリングしてバッチ処理するなどの設計検討に入ります。それ以下のレートでは、本記事の同期スレッド設計で十分実用になります。

8. おわりに

TCP/IP 通信は「動かす」のと「24 時間止まらず動かし続ける」の間に、見えにくいが大きなギャップがあります。本記事の DeviceClient パターン(接続マネージャ + 受信スレッド + queue + 自動再接続)は、現場で使えるサイズの最小骨格です。実プロジェクトでは、ここに「フレーム種別ごとのパーサ」「メトリクス収集」「監視アラート連携」を足していくことになります。

本サイトの監視アプリ記事と組み合わせると、socket 通信から UI 表示まで一気通貫で構築できます。

関連記事