M5Stack / MicroPython / streamlit を用いた簡易的な異常検知モニタリングシステムの構築(プログラミング/実装編)

temp-hum-monitoring IoT
temp-hum-monitoring



日本の製造業の中の人が,「ベンダーやコンサルに頼らない」をモットーに,一人で/一週間の真夜中だけで/手持ちの機材で,手軽にAI/IoTシステムを構築できることを証明するために挑みました.
当該連載記事はその記録で,今回は,IoT デバイスに実装する中身であったり,モニタリングシステムに用いるプログラミング,及びその実装について記載していきます.
(※完成度が荒目な部分もあるかと存じますがご容赦ください)

この記事は,以下の記事「ハード/ソフト準備編」の続きです.

今回のコードやデータは,次の github にも掲載しています.

IoT/DemoMonitoringTempHumi at main · KazutoMakino/IoT
IoT. Contribute to KazutoMakino/IoT development by creating an account on GitHub.

全体的な流れ

実装がパソコン側の python コードだったり,M5Stack 上の MicroPython コードだったりしてややこしいので,全体的な流れを記載します.
以下の順序で進めていきます.

  1. パソコン側で USB 経由で送られてくる M5Stack からの信号を取得し表示・保存するプログラムの作成とテスト
  2. M5Stack に温度・湿度データを表示してパソコンに送るプログラムを書き込む
  3. 温度・湿度データをパソコンへ送ることができているかテスト/データ取得
  4. データ分析
  5. 異常検知手法の検討
  6. 異常検知プログラムの実装
  7. リアルタイムで温度・湿度データと異常判定を可視化してモニタリングするアプリの作成・実行

PoC するかのごとく,各フェーズで実装/テストを繰り返しながらシステムを構築していきました.

パソコン側で USB 経由で送られてくる M5Stack からの信号を取得し表示・保存するプログラムの作成とテスト

M5Stack からのデータは,今回は USB を介してパソコン側で取得します.
これには,シリアルポートにアクセスすることができる pyserial を用います.

パソコン上のテキストエディタで serial_monitor.py という名称で新規ファイルを作成し以下をコピペするか,github の serial_monitor.py をお使いください.

"""Serial monitor @ python.

Usage:
- py serial_monitor.py

---

KazutoMakino

"""

import argparse
import csv
import logging
import sys
import typing
from datetime import datetime
from pathlib import Path

import serial

######################################################################
# main
######################################################################


def main():
    # get parser
    parser = argparse.ArgumentParser(description="Serial monitor @ python.")
    parser.add_argument(
        "--port", "-p", type=str, default="COM3", help="serial port name"
    )
    parser.add_argument(
        "--baudrate", "-b", type=int, default=115200, help="device's baudrate"
    )
    parser.add_argument(
        "--timeout", "-t", type=float, default=1.0, help="serial's timeout"
    )
    parser.add_argument(
        "--logpath",
        "-l",
        type=str,
        default=Path(__file__).parent / "log.log",
        help="logging file path",
    )
    args = parser.parse_args()

    # set serial monitor parameters
    ser = SerialMonitor(
        port=args.port,
        baudrate=args.baudrate,
        timeout=args.timeout,
        logpath=args.logpath,
    )
    # run serial monitor
    ser.run()


######################################################################
# class
######################################################################


class SerialMonitor:
    """Serial monitor class."""

    def __init__(
        self,
        port: str = "COM3",
        baudrate: int = 115200,
        timeout: float = 1,
        logpath: typing.Union[Path, str] = Path(__file__).parent / "log.log",
    ) -> None:
        """Set serial monitoring parameters.

        Args:
            port (str, optional): A port name. Defaults to "COM3".
            baudrate (int, optional): A baudrate of micro computer.
                Defaults to 115200.
            timeout (float, optional): A timeout time. Defaults to 1.
            logpath (Path, optional): A log data file path.
                =="auto": {timestamp}.log
                Defaults to Path(__file__).parent/"log.log".
        """
        # get logpath
        log_dir = Path(__file__).resolve().parent / "log"
        if not log_dir.exists():
            log_dir.mkdir()
        if logpath == "auto":
            logpath = log_dir / f"{Timer.get_timestamp(fmt_date='str')}.log"
        self.logpath = logpath
        if not isinstance(self.logpath, Path):
            self.logpath = Path(self.logpath)

        # serial open
        self.ser = serial.Serial(
            port=port,
            baudrate=baudrate,
            timeout=timeout,
            xonxoff=False,
            rtscts=False,
            write_timeout=None,
            dsrdtr=False,
            inter_byte_timeout=None,
            exclusive=None,
        )

    def run(self, is_return: bool = False) -> str:
        """Run serial monitor.

        Args:
            is_return (bool, optional): Return values or not.
                Defaults to False.

        Returns:
            str: A data from the IoT device.
        """
        try:
            if is_return:
                # # serial return mode
                txt = self.ser.readline()
                txt = txt.decode(encoding="utf-8")

                # write into file obj if txt is not None
                if txt:
                    # open file obj
                    with self.logpath.open(
                        mode="a", encoding="utf-8", newline="\n"
                    ) as f:
                        f.write(txt)

                return txt

            else:
                # # logging mode
                # open file obj
                with self.logpath.open(mode="w", encoding="utf-8", newline="\n") as f:
                    # set csv.writer obj
                    w = csv.writer(f)

                    while True:
                        # load data from serial print
                        txt = self.ser.readall()

                        # decode
                        txt = txt.decode(encoding="utf-8")
                        print(txt)

                        # write into file obj if txt is not None
                        if txt:
                            w.writerow([txt])

        except KeyboardInterrupt:
            # manual stop -> serial close
            self.ser.close()


class Timer:
    """Timer class."""

    @staticmethod
    def get_timestamp(fmt_date: str = "datetime") -> str:
        """Get time stamp.

        Args:
            fmt_date (str, optional):
                =="datetime": returned format is ""%Y/%m/%d %H:%M:%S.%f"",
                == "str" or "text"or "txt": returned format is ""%Y%m%d%H%M%S%f",
                or you can define a returned format manually.
                Defaults to "datetime".

        Returns:
            str: The formatted time stamp.
        """
        # get now
        now = datetime.now()

        # set format
        if fmt_date == "datetime":
            # datetime
            fmt = "%Y/%m/%d %H:%M:%S.%f"

        elif (fmt_date == "str") or (fmt_date == "text") or (fmt_date == "txt"):
            # only figures
            fmt = "%Y%m%d%H%M%S%f"

        else:
            # direct setting
            fmt = fmt_date

        return now.strftime(fmt)


######################################################################

if __name__ == "__main__":
    try:
        main()
    except Exception as err:
        logging.error(msg=err, exc_info=True)
    sys.exit()

プログラムの細かい説明について,ご要望等ありましたら追記させていただきますが,割愛いたします.

コマンドライン引数にて,USB 接続しているポート,ボーレート,通信エラーかどうかを判定する制限時間,取得データ保存ファイルパスを指定できます.
気をつけるのは COM3 以外ではポートを指定する必要があるところで,分からなければ windows であればデバイスマネージャーにて調べることができます(https://slash-z.com/temp-hum-monitoring-preparation/ にて確認方法を記載しています).
では,M5Stack を繋げて実行してみましょう.
例えば,COM4 に繋げていた場合は,次のように実行します.

py serial_monitor.py -p COM4

そうすると,以下のような出力が得られると思います(APIKEY は一応隠しています).

ets Jul 29 2019 12:21:46

rst:0x1 (POWERON_RESET),boot:0x17 (SPI_FAST_FLASH_BOOT)
configsip: 0, SPIWP:0xee
clk_drv:0x00,q_drv:0x00,d_drv:0x00,cs0_drv:0x00,hd_drv:0x00,wp_drv:0x00
mode:DIO, clock div:1
load:0x3fff0018,len:4
load:0x3fff001c,len:5228
load:0x40078000,len:12908
ho 0 tail 12 room 4
load:0x40080400,len:3512
entry 0x4008063c
       _  __ _
 _   _(_)/ _| | _____      __
| | | | | |_| |/ _ \ \ /\ / /
| |_| | |  _| | (_) \ V  V /
 \__,_|_|_| |_|\___/ \_/\_/

APIKEY: hogefuga
SD card mounted at "/sd"
MicroPython b19425502-dirty on 2022-04-22; M5Stack with ESP32
Type "help()" for more information.
>>>

もし,空の行が送られていくだけでしたら,左側面の赤いリセットボタンを押してみてください.
これでダメでしたら,ポートがあっているか確認ください.

上記の画面出力は,ファイルにも出力しており,コマンドライン引数で設定していなければ,serial_monitor.py と同階層の log.log にテキストとして保存するようにしています.
また,例えば,コマンドライン引数で -l auto と与えれば,同階層の ./log ディレクトリの タイムスタンプ.log にテキストとして保存します.

M5Stack に温度・湿度データを表示してパソコンに送るプログラムを書き込む

パソコン上のテキストエディタで M5Stack に書き込むプログラムを temp_humi.py という名称で作成します.
以下をコピペするか,github の temp_humi.py をお使いください.

"""Get temperature / humidity using SHT35 (Groove I2C).

Refs.:
- https://www.switch-science.com/catalog/5337/
- https://www.seeedstudio.com/Grove-I2C-High-Accuracy-Temp-Humi-Sensor-SHT3-p-3182.html
- https://ambidata.io/samples/m5stack/m5stack-micropython/
- https://github.com/kfricke/micropython-sht31

---

KazutoMakino

"""

import socket
import struct
import time

import wifiCfg
from m5stack import lcd
from machine import I2C, RTC, Pin
from micropython import const

######################################################################
# settings
######################################################################
# set sensor settings
R_HIGH = const(1)
R_MEDIUM = const(2)
R_LOW = const(3)

# time.sleep
SLEEPTIME = 1

######################################################################
# main
######################################################################


def main():
    # get i2c
    i2c = I2C(scl=Pin(22), sda=Pin(21))

    # get sensor module
    # sensor = SHT31(i2c=i2c, addr=0x44)
    sensor = SHT31(i2c=i2c, addr=0x45)

    # init: rtc
    mcclock = MiConClock()

    # endless loop
    while True:
        # get now
        now_txt = mcclock.return_times_of_day(ret_type="str")

        # get elapsed time [ms] / 1000
        elapsed = time.ticks_ms() * 1e-3

        # get temperature and humidity
        t, h = sensor.get_temp_humi()

        # print @ serial monitor
        print(
            "TimeStamp: {0}, ElapsedTime[s]: {1}, Temperature[degC]: {2}, Humidity[%]: {3}".format(
                now_txt, elapsed, t, h
            )
        )

        # clear the monitor window
        lcd.clear(lcd.BLACK)

        # print @ physical monitor
        lcd.text(0, 0, "{0}".format(now_txt))
        lcd.text(0, 20, "Temp: {0:.3f} [degC]".format(t))
        lcd.text(0, 40, "Humi: {0:.3f} [%]".format(h))

        # wait
        time.sleep(SLEEPTIME)


class MiConClock:
    """Clock for micro computer."""

    def __init__(self) -> None:
        """Set wi-fi and get collect datetime."""
        # wi-fi connection
        wifiCfg.autoConnect(lcdShow=True)

        # get collect datetime (JPN := UTC + 9 [h])
        MyNTPTime.set_local_datetime(offset=9 * 60 * 60)

    def return_times_of_day(self, ret_type: str = "tuple") -> tuple:
        """Return times of day.

        Args:
            ret_type (str, optional): Return type.
                =="tuple": return tuple(year, month, day, weekday,
                    hours, minutes, seconds, subseconds).
                =="str": return as datetime format.
                Defaults to "tuple".

        Returns:
            tuple: tuple(year, month, day, weekday, hours, minutes, seconds, subseconds)
        """
        # get now / unpack
        # RTC().datetime() returns:
        #   year, month, day, weekday, hours, minutes, seconds, subseconds
        (
            year,
            month,
            day,
            _weekday,
            hours,
            minutes,
            seconds,
            subseconds,
        ) = RTC().datetime()

        # set return data
        if ret_type == "tuple":
            ret = (
                year,
                month,
                day,
                _weekday,
                hours,
                minutes,
                seconds,
                subseconds,
            )

        elif ret_type == "str":
            # datetime.now to txt
            ret = "{0}/{1}/{2} {3}:{4}:{5}.{6}".format(
                year,
                PseudoPython.zfill(month, 2),
                PseudoPython.zfill(day, 2),
                PseudoPython.zfill(hours, 2),
                PseudoPython.zfill(minutes, 2),
                PseudoPython.zfill(seconds, 2),
                subseconds,
            )

        return ret

    def show_times_of_day(self) -> None:
        """Show times of day on LCD."""
        # endless
        while True:
            # get t_start
            t_start = time.ticks_ms()

            # get datetime.datetime.now()
            now_is = self.return_times_of_day(ret_type="str")

            # show
            print(now_is)

            # clear the monitor window and show on m5stack monitor
            lcd.clear(lcd.BLACK)
            lcd.text(lcd.CENTER, lcd.CENTER, "{0}".format(now_is))

            # calc about 1 sec
            adaptive_sleep = 1.0 - (time.ticks_ms() - t_start) * 1e-3

            # sleep
            if adaptive_sleep <= 0:
                continue
            else:
                time.sleep(adaptive_sleep)


class MyNTPTime:
    @staticmethod
    def set_local_datetime(offset: int = 9 * 60 * 60) -> None:
        """Set local datetime.

        Refs.:
            - https://www.pool.ntp.org/zone/jp

        Args:
            offset (int, optional): Offset time [s].
                Defaults to 9 * 60 * 60.
        """

        def _time():
            # (date(2000, 1, 1) - date(1900, 1, 1)).days * 24*60*60 [s]
            NTP_DELTA = 3155673600

            # host address
            host = "pool.ntp.org"

            # set query
            NTP_QUERY = bytearray(48)
            NTP_QUERY[0] = 0x1B

            # set address
            addr = socket.getaddrinfo(host, 123)[0][-1]

            # connection
            s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
            s.settimeout(1)

            # send the query to the address
            _ = s.sendto(NTP_QUERY, addr)
            msg = s.recv(48)

            # close connection
            s.close()

            # unpack binary data
            val = struct.unpack("!I", msg[40:44])[0]

            return val - NTP_DELTA

        # There's currently no timezone support in MicroPython, so
        # utime.localtime() will return UTC time (as if it was .gmtime())
        #
        # offset: timezone offset (sec)
        #         settime(9*60*60) for JST
        t = _time() + offset
        tm = time.localtime(t)
        tm = tm[0:3] + (0,) + tm[3:6] + (0,)
        RTC().datetime(tm)


class PseudoPython:
    @staticmethod
    def zfill(data, width: int) -> str:
        """Pseudo python's str.zfill(width) method.

        Descriptions:
            zfill(data, width) means str(data).zfill(width).

        Args:
            data (object): A numerical object.
            width (int): Width of the padding field.

        Returns:
            str: Returns the numeric string left filled with zeros
            in a string of specified length.
        """
        # cast
        txt = str(data)

        # str.zfill
        if len(txt) < width:
            return ("0" * (width - len(txt))) + txt
        else:
            return txt


######################################################################
# class
######################################################################


class SHT31:
    """
    This class implements an interface to the SHT31 temperature and humidity
    sensor from Sensirion.
    """

    # This static map helps keeping the heap and program logic cleaner
    _map_cs_r = {
        True: {R_HIGH: b"\x2c\x06", R_MEDIUM: b"\x2c\x0d", R_LOW: b"\x2c\x10"},
        False: {R_HIGH: b"\x24\x00", R_MEDIUM: b"\x24\x0b", R_LOW: b"\x24\x16"},
    }

    def __init__(self, i2c, addr=0x44):
        """
        Initialize a sensor object on the given I2C bus and accessed by the
        given address.
        """
        if i2c is None:
            raise ValueError("I2C object needed as argument!")
        self._i2c = i2c
        self._addr = addr

    def _send(self, buf):
        """
        Sends the given buffer object over I2C to the sensor.
        """
        self._i2c.writeto(self._addr, buf)

    def _recv(self, count):
        """
        Read bytes from the sensor using I2C. The byte count can be specified
        as an argument.
        Returns a bytearray for the result.
        """
        return self._i2c.readfrom(self._addr, count)

    def _raw_temp_humi(self, r=R_HIGH, cs=True):
        """
        Read the raw temperature and humidity from the sensor and skips CRC
        checking.
        Returns a tuple for both values in that order.
        """
        if r not in (R_HIGH, R_MEDIUM, R_LOW):
            raise ValueError("Wrong repeatabillity value given!")
        self._send(self._map_cs_r[cs][r])
        time.sleep_ms(50)
        raw = self._recv(6)
        return (raw[0] << 8) + raw[1], (raw[3] << 8) + raw[4]

    def get_temp_humi(self, resolution=R_HIGH, clock_stretch=True, celsius=True):
        """
        Read the temperature in degree celsius or fahrenheit and relative
        humidity. Resolution and clock stretching can be specified.
        Returns a tuple for both values in that order.
        """
        t, h = self._raw_temp_humi(resolution, clock_stretch)
        if celsius:
            temp = -45 + (175 * (t / 65535))
        else:
            temp = -49 + (315 * (t / 65535))
        return temp, 100 * (h / 65535)


######################################################################

if __name__ == "__main__":
    try:
        main()
    except OSError as err:
        # please check I2C connection and reboot...
        print(err)
        print("please check I2C connection and reboot...")
        # clear the monitor window
        lcd.clear(lcd.BLACK)
        # print @ physical monitor
        lcd.text(0, 0, "error: {0}".format(err))
        lcd.text(0, 20, "please check I2C connection and reboot...")

上記は MicroPython のコードで,Python と似ていますが,使えないモジュールやメソッドがあるので注意が必要です.
これについては,バージョンが更新されていっているので,色々試したり,公式ドキュメントを参照ください(http://docs.micropython.org/en/latest/).

プログラムの書き込みには,adafruit-ampy を用います.
上記コードを保存したディレクトリにてシェルを開き,M5Stack を繋げて,例えば COM4 でしたら次を実行します.

ampy -p COM4 put temp_humi.py apps/temp_humi.py

構文としてはこんな感じですね.

ampy -p {ポート} put {パソコンから送りたいファイルパス} {M5Stack 上に置きたいパス}

書き込みの際に画面が再起動したりしますが,少し待つと上記コマンドが終了し,M5Stack が再起動して元のアプリが起動したら書き込みが完了しています.
もしうまくいかない場合は,

  • M5Stack のリセットボタンを押す
  • APP モードにする
  • コマンドを実行し直す(シェルのコマンド中断は break ボタンか ctrl + c です)
  • ポートが正しいかどうか確認する

などを試してみてください.
ちなみに,ampy の使い方はコマンドラインから ampy --help にて調べることが可能です.
他にもファイル操作やデバッグに使える機能など便利な使い方があるので,ご興味あればご覧ください.

プログラムについて,M5Stack 上の apps/ というところに保存しましたが,UIFLOW で選択可能なプログラムはここのディレクトリを参照しており,ここに置くことでアプリの選択が自由にできます.
(実は,apps/temp_humi.py の代わりに main.py とすることで,書き込み後再起動してそのままアプリが実行されるのですが,UIFLOW の APP モードで選択できないので apps/ に置きました.)

環境構築編にて Wi-Fi の設定を飛ばしましたが,M5Stack では時間が保持されない関係上,現在時刻を Wi-Fi 経由にて取得するため,以下のように設定します.
まずは,M5Stack をパソコンにつなげ,M5Burner を立ち上げます.
ポートが正しいかどうかも確認しましょう.
下図の Configuration を押下します.

uiflow-cfg-1
Configuration をクリック

次のポップアップウィンドウにて,いつもお使いの Wi-Fi の SSID とパスワードを入力します(スマートフォンのテザリングでも可).

uiflow-cfg-3
Wifi のところの SSID / Password を設定

それでは,M5Stack 側で今書き込んだアプリを立ち上げてみましょう.
SHT35 を M5Stack に繋げ,こちらの画面遷移を参考に,APP Mode のアプリ一覧から temp_humi.py を選択します.

uiflow-flow
uiflow-flow

temp_humi.py を起動すると,現在時刻取得のために,最初に先ほど設定した Wi-Fi 接続を試みます(左図).

temp_humi-wifi
Wi-Fi 接続
lcd-output
Wi-Fi 接続と時刻取得が完了し,時刻・温度・湿度が画面出力される

Wi-Fi 接続と時刻取得が完了すると,時刻・温度・湿度が画面出力します.
もし,Wi-Fi 接続がエラーになる場合は,真ん中ボタンの Retry を押すか,左側面のリセットボタンを押下してください.
何度やってもダメな場合は,Wi-Fi のパスワードなどの設定や,機器そのものについてご確認ください.

温度・湿度データをパソコンへ送ることができているかテスト/データ取得

とりあえずこれだけで,現在の温度と湿度については簡素ですがモニタリングすることが可能になりました.
次は,このデータをパソコンに送り,データを取ってみましょう.

M5Stack をパソコンに USB 接続した状態で,パソコンのシェルにて serial_monitor.py を以下コマンドライン引数を用いて,

py serial_monitor.py -p COM? -l auto

と実行してください.
ここでCOMポートは,現在 M5Stack に繋いでいるポートを指定ください.
実行すると M5Stack は再起動され,Wi-Fi 接続を行い,OKの場合は温度と湿度を画面表示し続けます.
これと同時に,パソコンのシェル上においても,タイムスタンプ/温度/湿度が出力され,同時に ./log/{タイムスタンプ}.log にもデータが出力されていきます.
./log/{タイムスタンプ}.log の中身は,以下のようなデータです.

ets Jul 29 2019 12:21:46

rst:0x1 (POWERON_RESET),boot:0x17 (SPI_FAST_FLASH_BOOT)
configsip: 0, SPIWP:0xee
clk_drv:0x00,q_drv:0x00,d_drv:0x00,cs0_drv:0x00,hd_drv:0x00,wp_drv:0x00
mode:DIO, clock div:1
load:0x3fff0018,len:4
load:0x3fff001c,len:5228
load:0x40078000,len:12908
ho 0 tail 12 room 4
load:0x40080400,len:3512
entry 0x4008063c
       _  __ _
 _   _(_)/ _| | _____      __
| | | | | |_| |/ _ \ \ /\ / /
| |_| | |  _| | (_) \ V  V /
 \__,_|_|_| |_|\___/ \_/\_/

APIKEY: hogefuga
"
"SD card mounted at ""/sd""
"
"TimeStamp: 2022/05/25 01:42:34.176, ElapsedTime[s]: 7.261, Temperature[degC]: 27.44602, Humidity[%]: 52.19654
TimeStamp: 2022/05/25 01:42:35.346392, ElapsedTime[s]: 8.604, Temperature[degC]: 27.44602, Humidity[%]: 52.20722        
TimeStamp: 2022/05/25 01:42:36.694197, ElapsedTime[s]: 9.952001, Temperature[degC]: 27.46204, Humidity[%]: 52.1828      
"
"TimeStamp: 2022/05/25 01:42:38.43336, ElapsedTime[s]: 11.301, Temperature[degC]: 27.46204, Humidity[%]: 52.1294        
TimeStamp: 2022/05/25 01:42:39.390944, ElapsedTime[s]: 12.648, Temperature[degC]: 27.4754, Humidity[%]: 52.13245        
"

これで,無事にデータが取れる環境を構築することができました.

データ分析

前の章のように長時間に渡ってデータを取得した後,./log というディレクトリを ./data に書き換え,どういったデータなのか分析を行いました.
github のファイル名は,./eda.ipynb です.

はじめに,取得したテキストデータを扱いやすくするために,以下手順で pandas.DataFrame 化しました.

  1. split("\n")
  2. 先頭が TimeStamp: でないものを除外
  3. split(", ")
  4. split(": ", maxsplit1)
  5. 上記を pandas.DataFrame 化

コードとしては以下です.

from pathlib import Path

cur_dir = Path("").resolve()
data_dir = cur_dir / "data"
flist = [v for v in data_dir.glob("*.txt")]

# init
dfs = {}

# loop: files
for fpath in flist:
    # file open
    with fpath.open(mode="r", encoding="utf-8") as f:
        # read
        txt = f.read()

    # split with "\n"
    lines = txt.split("\n")

    # exclude not line.startswith("TimeStamp: ")
    lines = [v for v in lines if v.startswith("TimeStamp: ")]

    # split with ", "
    mat = [v.split(", ") for v in lines if v]

    # split with ": ", maxsplit=1
    item_tuples =  [[vv.split(": ", maxsplit=1) for vv in v if vv] for v in mat if v]
    columns = [v[0] for v in item_tuples[0]]
    values = [[vv[1] for vv in v] for v in item_tuples]

    # make dataframe
    df = pd.DataFrame(data=values, columns=columns)

    # cast
    for col in df.columns:
        if col=="TimeStamp":
            df[col] = pd.to_datetime(df[col])
        else:
            df[col] = df[col].astype(float)

    # add to dfs
    dfs[fpath.stem] = df

これで,{ファイル名: pandas.DataFrame} とした辞書ができました.
次に,可視化を行います.
まずは,時系列の生データについてです.

import matplotlib.pyplot as plt
import seaborn as sns
sns.set()

xcol = "TimeStamp"
ycol1 = "Temperature[degC]"
ycol2 = "Humidity[%]"

for k,df in dfs.items():
    fig = plt.figure(figsize=(12, 5))
    ax1 = fig.add_subplot()
    ax2 = ax1.twinx()
    ax1.set_xlabel(xcol)
    ax1.set_ylabel(ycol1)
    ax2.set_ylabel(ycol2)
    ax1.plot(df[xcol], df[ycol1], color="red", label=ycol1)
    ax2.plot(df[xcol], df[ycol2], color="blue", label=ycol2)
    ax1.legend(loc="upper left")
    ax2.legend(loc="upper right")
    plt.title(k)
    fig.tight_layout()
    plt.show()
    plt.clf()
    plt.close("all")
graph-1-1
graph-1-2
graph-1-3
graph-1-4
  • データは,2022/01 中旬の暖房や加湿器などが ON/OFF される部屋の中で取得した温度・湿度です
  • 横軸はデータ取得時刻,赤の実線(第1縦軸)は居室内温度 [℃],青の実線(第2縦軸)は居室内湿度 [%] を表します
  • 温度と湿度は,おおよそトレード・オフの関係であることが示されます(湿度 [%] は絶対量でなく飽和水蒸気量に対する割合なので)
  • 暖房/加湿器/換気扇の ON / OFF や窓の開閉をしているため,30 分や 1 時間単位でうねるような大きな変化が現れています
  • 上から2番めのプロットについて,18 19:30 において温度と湿度の大きなピークが見られますが,これは,わざと加湿器吐出部近傍にセンサを近づけて取得したデータです.なぜこのようにしたかというと,いつもの居室内雰囲気を正常とした場合,擬似的な異常としてデータが欲しかったためです.これ以外の日時ではこのようなデータは取っていません.

次に,データの偏りについて考察するために,各日程における温度と湿度について,度数分布を棒グラフで,及びカーネル密度推定を折れ線グラフで,この度数の平均を一点鎖線にて可視化します.

xcol1 = "Temperature[degC]"
xcol2 = "Humidity[%]"
ycol = "Count"

# init
color = ["blue", "green", "orange", "red"]
bins = 20

# temperature
fig = plt.figure(figsize=(12, 5))
ax = fig.add_subplot()
ax.set_xlabel(xcol1)
ax.set_ylabel(ycol)
for i, k_df in enumerate(dfs.items()):
    # unpack
    k, df = k_df
    # ax.hist(df[xcol1], bins=50)
    ax = sns.histplot(df[xcol1], bins=bins, kde=True, label=k, color=color[i])
    ax.axvline(df[xcol1].mean(), color=color[i], linestyle="-.")
ax.legend(loc="upper left")
#ax.set_yscale("log")
#ax.set_ylim(0.9)
plt.title(xcol1)
fig.tight_layout()
plt.show()
plt.close("all")

# humidity
fig = plt.figure(figsize=(12, 5))
ax = fig.add_subplot()
ax.set_xlabel(xcol2)
ax.set_ylabel(ycol)
for i, k_df in enumerate(dfs.items()):
    # unpack
    k, df = k_df
    # ax.hist(df[xcol1], bins=50)
    ax = sns.histplot(df[xcol2], bins=bins, kde=True, label=k, color=color[i])
    ax.axvline(df[xcol2].mean(), color=color[i], linestyle="-.")
ax.legend(loc="upper left")
#ax.set_yscale("log")
#ax.set_ylim(0.9)
plt.title(xcol2)
fig.tight_layout()
plt.show()
graph-2-1
温度
graph-2-2
湿度

度数分布はレコード数が大きい順に,赤 > オレンジ > 青 > 緑 となっていますが,上図の様に正規化されていないデータの度数分布は潰れてしまっています.
また,正規化しても棒グラフが重なり合っては見にくいので,正規化したカーネル密度推定のみをプロットします.

xcol1 = "Temperature[degC]"
xcol2 = "Humidity[%]"
ycol = "Kernel Density Estimate plot"

# init
color = ["blue", "green", "orange", "red"]

# temperature
fig = plt.figure(figsize=(12, 5))
ax = fig.add_subplot()
ax.set_xlabel(xcol1)
ax.set_ylabel(ycol)
for i, k_df in enumerate(dfs.items()):
    # unpack
    k, df = k_df
    ax = sns.kdeplot(df[xcol1], label=k, color=color[i])
    ax.axvline(df[xcol1].mean(), color=color[i], linestyle="-.")
ax.legend(loc="upper left")
plt.title(xcol1)
fig.tight_layout()
plt.show()
plt.close("all")

# humidity
fig = plt.figure(figsize=(12, 5))
ax = fig.add_subplot()
ax.set_xlabel(xcol2)
ax.set_ylabel(ycol)
for i, k_df in enumerate(dfs.items()):
    # unpack
    k, df = k_df
    ax = sns.kdeplot(df[xcol2], label=k, color=color[i])
    ax.axvline(df[xcol2].mean(), color=color[i], linestyle="-.")
ax.legend(loc="upper left")
plt.title(xcol2)
fig.tight_layout()
plt.show()
graph-2-3
温度
graph-2-4
湿度
  • 温度について,青と緑は裾野が狭く,オレンジと赤は裾野が広くなっています.これは,温度変化はエアコンや換気扇の ON / OFF によって大きく左右されますが,前者はデータ取得した時間が短く,エアコンや換気扇操作しなかったことが原因です.校舎は,測定中にエアコンや換気扇の操作をしていました.
  • 湿度について,高い温度を示したオレンジと赤にて,低湿度の期間があったことが示されます
  • 湿度の緑について,分布の裾野は 80 [%] を越えていますが,これは,擬似的な異常データとしてセンサを加湿器に近づけてデータ取得したためです

今まで測定ファイル別でプロットしていましたが,同種のデータなので,全データを一括でプロットします.

# init
df_all = pd.DataFrame()

# concat
for df in dfs.values():
    df_all = pd.concat([df_all, df])
df_all.reset_index(drop=True, inplace=True)

xcol1 = "Temperature[degC]"
xcol2 = "Humidity[%]"
ycol = "Count"

# init
bins = 50

# plot
for xcol in [xcol1, xcol2]:
    fig = plt.figure(figsize=(12, 4))
    ax = fig.add_subplot()
    ax.set_xlabel(xcol)
    ax.set_ylabel(ycol)
    ax = sns.histplot(df_all[xcol], bins=bins, kde=True)
    ax.axvline(df_all[xcol].mean(), linestyle="-.")
    fig.tight_layout()
    plt.show()
    plt.close("all")

# calc delta t
delta_t = []
for df in dfs.values():
    v = df["ElapsedTime[s]"]
    delta_t.extend([v.iloc[i] - v.iloc[i - 1] for i in range(len(df)) if i!=0])

# plot
fig = plt.figure(figsize=(12, 4))
ax = fig.add_subplot()
ax.set_xlabel("Sampling rate [s]")
ax.set_ylabel(ycol)
ax = sns.histplot(delta_t, bins=bins, kde=True)
ax.axvline(sum(delta_t)/len(delta_t), linestyle="-.")
fig.tight_layout()
plt.show()
graph-2-5
温度
graph-2-6
湿度
graph-2-7
サンプリングレート
  • データ取得期間は冬ですが,比較的過ごしやすいように温度調整しているので,大体 18 [℃] ~ 22 [℃] の割合が大きくなっています
  • 湿度データについては,ピークが3つ存在しています.擬似的な異常として取得した湿度 80 [%] 超のデータは,この分布から見ても外れ値であると言えます
  • 下段はサンプリングレートを示していおり,大体 1.33 [s] おきにデータ取得されています

湿度について3つのピークがありましたが,これらについて温度との関係性を次で可視化します.

sns.jointplot(
    data=df_all,
    x="Temperature[degC]",
    y="Humidity[%]",
    kind="hist",
    cmap="turbo",
    height=8,
    bins=100
)
plt.show()
graph-2-8
温度 x 湿度
  • 横軸は温度の度数分布,縦軸は湿度の度数分布をそれぞれ表します
  • 左下のプロットは,温度と湿度の度数の大きさを色で表していて,これが大きくなるに従い,寒色から暖色へと変化させてプロットしています
  • 湿度が約 60 [%] では,温度が低い場合に飽和水蒸気量が低くなることによって,相対湿度が上昇するため度数が高くなっています
  • 上記と逆に,湿度約 40 [%] では,暖房を入れたことによる相対湿度の減少によって度数が高くなっています
  • 湿度約 50 [%] では,温度を約 20 [℃] を目指して暖房を ON / OFF することが多いみたいで,このことと加湿器やセンサの位置関係によって度数が高くなっているようです

異常検知手法の検討

データの傾向について前の章にて可視化を行いましたが,外れ値が分かりやすかったので,これの検知も実装しやすそうです.
次は,異常検知手法について検討していきます.
github のファイル名は,trial_training.ipynb です.
データについては,./data ディレクトリをまるごとコピーして ./traindata という名前にしておきます.

まずは,以下でデータセットを準備します.

from pathlib import Path
import pandas as pd

curdir = Path("").resolve()
datadir = curdir / "traindata"
flist = list(datadir.glob("*.txt"))

# init
dfs = {}
df_all = pd.DataFrame()

# loop: files
for fpath in flist:
    # file open
    with fpath.open(mode="r", encoding="utf-8") as f:
        # read
        txt = f.read()

    # split with "\n"
    lines = txt.split("\n")

    # exclude not line.startswith("TimeStamp: ")
    lines = [v for v in lines if v.startswith("TimeStamp: ")]

    # split with ", "
    mat = [v.split(", ") for v in lines if v]

    # split with ": ", maxsplit=1
    item_tuples =  [[vv.split(": ", maxsplit=1) for vv in v if vv] for v in mat if v]
    columns = [v[0] for v in item_tuples[0]]
    values = [[vv[1] for vv in v] for v in item_tuples]

    # make dataframe
    df = pd.DataFrame(data=values, columns=columns)

    # cast
    for col in df.columns:
        if col=="TimeStamp":
            df[col] = pd.to_datetime(df[col])
        else:
            df[col] = df[col].astype(float)

    # show
    print(f"*** {fpath.name} ***")
    print(df.info())
    print()

    # add to dfs
    dfs[fpath.stem] = df

    # add to df_all
    df_all = pd.concat([df_all, df])

# reset index
df_all.reset_index(drop=True, inplace=True)

外れ値が分かりやすかったので,演算が簡単なホテリング T-squared 法を用いて異常検知できるかを試していきます.
最初は,正常と異常のどちらも含まれるデータについて検討します.

import numpy as np
from scipy import stats
import matplotlib.pyplot as plt
import seaborn as sns
sns.set()

# set data
sample = df_all["Humidity[%]"].to_numpy()

# sample mean
sample_mean = np.mean(sample)

# sample variance
sample_var = np.var(sample, ddof=0)

# init
anomaly_scores = []
for s in sample:
    # calc anomaly score
    t = (s - sample_mean)**2 / sample_var

    # append
    anomaly_scores.append(t)

# get threshold from 1% of chi-square distribution
threshold = stats.chi2.interval(0.99, 1)[1]

# plot
plt.scatter(x=list(range(len(anomaly_scores))), y=anomaly_scores, marker=".")
plt.axhline(y=threshold, linestyle="--", color="gray")
plt.xlabel("Index")
plt.ylabel("Anomaly score\n(Hotelling T-squared distribution)")
plt.show()
graph-3-1
異常度
  • 横軸は df_all の行番号,縦軸はホテリング T-squared 法に基づく異常度です
  • 破線は,カイ2乗分布における 1 % 区間から計算した閾値で,この破線より下側を正常,上側を異常とします
  • Index = 3000 あたりで閾値を超えるデータ点が多く現れています.この部分は,擬似的な異常として定義したデータで,簡易的ですが異常として検知できているようです

上記の計算は,異常が含まれるデータを用いていました.
しかし,現実の運用においては,異常という状態が極稀でデータが取れないケースもあるため,バックデータは正常データのみであることが望ましいと考えられます.
従って,異常データが存在するファイル 20220118192914980961.txt 以外のデータで標本平均と標本分散を計算し,異常度の計算はこれに対して行ってみます.

まずは,正常と異常を含むデータセットをそれぞれ分けて作成します.

sample_norm, sample_anom = [], []

for k,df in dfs.items():
    if k=="20220118192914980961":
        sample_anom.extend(df["Humidity[%]"].to_list())
    else:
        sample_norm.extend(df["Humidity[%]"].to_list())
np.shape(sample_anom),np.shape(sample_norm)

# ((520,), (27776,))

次に統計量を計算します.

# sample mean
smean = np.mean(sample_norm)

# sample variance
svar = np.var(sample_norm, ddof=0)

この統計量を用いて,統計量の計算に用いていない異常値を含むデータについて異常度を計算し,可視化します

# init
scores = []
for s in sample_anom:
    # calc anomaly score
    t = (s - smean)**2 / svar

    # append
    scores.append(t)

# get threshold from 1% of chi-square distribution
threshold = stats.chi2.interval(0.99, 1)[1]

# plot
plt.scatter(x=list(range(len(scores))), y=scores, marker=".")
plt.axhline(y=threshold, linestyle="--", color="gray")
plt.xlabel("Index")
plt.ylabel("Anomaly score\n(Hotelling T-squared distribution)")
plt.show()
graph-3-2
正常データのみを教師とした異常を含むデータについての異常度

このように,正常データのみ教師としても異常検知できそうなので,最終的なシステムで用いるためにこれら統計量や設定値について,./param_HotellingTSquare.json として保存しておきます.

# set save dict
savedict = {
    "mean": smean,
    "variance": svar,
    "alpha": 0.99,
    "df": 1.0,
    "threshold": threshold,
    "memo": "Hotelling T-squared distribution",
    "timestamp": datetime.now().strftime("%Y/%m/%d-%H:%M:%S.%f"),
    "sourcedir": datadir.name,
    "sourcefiles": [v.name for v in flist],
}

# set save path
savepath = curdir / "param_HotellingTSquare.json"

# save
with savepath.open(mode="w") as f:
    json.dump(savedict, f, indent=4)

異常検知プログラムの実装

異常検知にホテリング T-squared 法が使えそうということが分かったので,次は,モニタリングシステムから呼び出すための異常検知を行う部分について用意します.
パソコン上のテキストエディタで anomaly_detection.py という名称で新規ファイルを作成し以下をコピペするか,github の anomaly_detection.py をお使いください.

"""Anomaly detection module.

---

KazutoMakino

"""

import json
from datetime import datetime
from pathlib import Path

import numpy as np
from scipy import stats

######################################################################
# class
######################################################################


class HotellingTSquare:
    """Hotelling T-squared distribution class."""

    def __init__(
        self, param_path: Path = Path(__file__).parent / "./param_HotellingTSquare.json"
    ) -> None:
        """Get parameters from param_HotellingTSquare.json.

        Args:
            param_path (Path, optional): A parameter file.
                Defaults to Path(__file__).parent / "./param_HotellingTSquare.json".
        """
        # cast
        self.param_path = Path(param_path)

        # load parameters
        if self.param_path.exists():
            with self.param_path.open(mode="r", encoding="utf-8") as f:
                self.params = json.load(fp=f)
        else:
            self.params = {"mean": None, "variance": None}

    def fit(
        self, dataset: list, alpha: float = 0.99, df: float = 1.0, memo_dict: dict = {}
    ) -> None:
        """Parameter fitting.

        Args:
            dataset (list): An input 1d-array data.
            alpha (float, optional): A degree of reliability.
                Defaults to 0.99.
            df (float, optional): A degree of freedom.
                Defaults to 1.0.
            memo_dict (dict, optional): A memo dictionary.
                Defaults to {}.
        """
        # calc sample mean
        s_mean = np.mean(dataset)

        # calc sample variance
        s_var = np.var(dataset, ddof=0)

        # calc threshold
        threshold = stats.chi2.interval(alpha=alpha, df=df, loc=0, scale=1)[1]

        # set self.params and update
        self.params = {
            "mean": s_mean,
            "variance": s_var,
            "alpha": alpha,
            "df": df,
            "threshold": threshold,
            "memo": "Hotelling T-squared distribution",
            "timestamp": datetime.now().strftime("%Y/%m/%d-%H:%M:%S.%f"),
        }
        self.params.update(memo_dict)

        # save to json
        with self.param_path.open(mode="w", encoding="utf-8") as f:
            json.dump(obj=self.params, fp=f, indent=4, sort_keys=False)

    def get_anomaly_score(self, data: float) -> float:
        """Calculating anomaly score.

        Args:
            data (float): An input 1d-array data.

        Returns:
            float: An anomaly score.
        """
        return data - self.params["mean"] ** 2 / self.params["variance"]

    def is_normal(self, anomaly_score: float) -> bool:
        """Return True (normal) or False (anomaly) using the threshold.

        Args:
            anomaly_score (float): A calculated anomaly score.

        Returns:
            bool: True (normal) or False (anomaly).
        """
        # return normal (True) or anomaly (False)
        if anomaly_score <= self.params["threshold"]:
            return True
        else:
            return False

リアルタイムで温度・湿度データと異常判定を可視化してモニタリングするアプリの作成・実行

これまで,データ取得,異常検知と来て,残すはパソコン上でモニタリングするアプリの作成です.
今回は,ローコードで web アプリが簡単に作成できる streamlit というライブラリを用いてアプリを作成します.
パソコン上のテキストエディタで real_time_monitoring.py という名称で新規ファイルを作成し以下をコピペするか,github の real_time_monitoring.py をお使いください.

"""The sample code of the real time monitoring system using by streamlit.

Usage:
- streamlit run real_time_monitoring.py

---

KazutoMakino

"""

######################################################################
# import
######################################################################

import gc
import sys
import time
from datetime import datetime
from logging import info
from pathlib import Path

import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import seaborn as sns
import streamlit as st
import yaml

sns.set()

# import my pkgs
if True:
    from anomaly_detection import HotellingTSquare
    from serial_monitor import SerialMonitor

######################################################################
# global settings
######################################################################

# show parser arguments
print(sys.argv)

# # analyze parser arguments
# streamlit run or not
if sys.argv[0] in ["py", "python", "python3"]:
    raise AttributeError(
        """
        usage:
            - 'streamlit run real_time_monitoring.py'
            or
            - 'streamlit run real_time_monitoring.py debug'
        """
    )

else:
    # debug or not
    if "debug" in sys.argv:
        DBG = True
    else:
        DBG = False

# get settings.yml abspath
SETS_PATH = Path(__file__).resolve().parent / "settings.yml"
if not SETS_PATH.exists():
    raise FileNotFoundError(f"not exists: {SETS_PATH}")

######################################################################
# main
######################################################################


def main():
    monitoring_app = MonitoringApp()
    monitoring_app.run()


######################################################################
# class
######################################################################


class MonitoringApp:
    """The real time monitoring system."""

    def __init__(self) -> None:
        # get instance of DataStream
        self.ds = DataStream()

        # get settings from ./settings.yml
        with SETS_PATH.open(mode="r", encoding="utf-8") as f:
            self.sets = yaml.safe_load(stream=f)
        self.param_monitor = self.sets["Monitoring"]

    def run(self) -> None:
        """Run the simple real time monitoring system."""

        # write the title and some contents
        title = """\
            ### リアルタイムプロットの可視化と異常検知のサンプル

            ---

            データの種類:
            - 室内温度
            - 室内湿度

            デモ内容:
            - 温度/湿度についてリアルタイムモニタリング
            - 加湿器の吐出孔に近づけている場合とそうでない場合のデータを取得
            - 近づけている時を疑似的な異常として定義し,これを検知する

            ---
        """
        st.markdown(body=title, unsafe_allow_html=False)

        # # make place holders
        # predicted score
        ph_pred = st.empty()

        # plot area
        ph_plot = st.empty()

        # json style parameters
        st.markdown(
            """
            ---

            parameters:
            """
        )
        st.json(self.sets)

        # # init
        # plot data length
        if DBG:
            data_length = 10
        else:
            data_length = self.param_monitor["DataLength"]

        # set void
        data = {}

        # init index
        i = 0

        # set anomaly detection method
        hts = HotellingTSquare()

        # running until getting KeyboardInterrupt
        # (Which does code catch the KeyboardInterrupt ?)
        while True:
            # load data
            data_dict = self.ds.run()

            # count continue flag -> continue
            if data_dict == "continue":
                continue

            # show @ debug
            if DBG:
                if not data_dict:
                    break
                else:
                    print(data_dict)

            # cast values except "TimeStamp"
            for k, v in data_dict.items():
                if (k in ["tstamp", "TimeStamp"]) and (
                    self.param_monitor["PlotType"] == "streamlit"
                ):
                    # except "TimeStamp" if "PlotType" == "streamlit"
                    continue

                elif (k in ["tstamp", "TimeStamp"]) and (
                    self.param_monitor["PlotType"] == "matplotlib"
                ):
                    # if "PlotType" == "matplotlib", cast to datetime obj
                    data_dict[k] = datetime.strptime(
                        data_dict[k], "%Y/%m/%d %H:%M:%S.%f"
                    )

                else:
                    # cast values
                    data_dict[k] = float(v)

            print(data_dict)

            # get formated data set (xmax,xmin, )
            if i == 0:
                # initial: value -> list
                data = {k: [v] for k, v in data_dict.items()}
            else:
                # updating data
                for k, v in data_dict.items():
                    # remove oldest component
                    # if list length is larger than setting length
                    if len(data[k]) >= data_length:
                        data[k].remove(data[k][0])

                    # append
                    data[k].append(v)

            # set dataframe
            df = pd.DataFrame(data=data)

            # make line plot (Which is faster streamlit.line_chart or matplotlib ?)
            if self.param_monitor["PlotType"] == "streamlit":
                # set index to "timestamp" column for
                # using streamlit.line_chart x-label.
                df.set_index(keys=list(data.keys())[0], drop=True, inplace=True)

                # streamlit.line_chart()'s data: pandas.DataFrame, xaxis<-index
                # see:
                #   https://docs.streamlit.io/library/api-reference/charts/st.line_chart
                ph_plot.line_chart(data=df, width=0, height=0, use_container_width=True)

            elif self.param_monitor["PlotType"] == "matplotlib":
                # get x column name
                xcol = list(df.columns)[0]
                ycol1 = "Temperature[degC]"
                ycol2 = "Humidity[%]"

                # plot
                if i == 0:
                    fig = plt.figure(figsize=(12, 5))
                    ax1 = fig.add_subplot()
                    ax2 = ax1.twinx()
                else:
                    ax1.cla()
                    ax2.cla()
                ax1.set_xlabel(xcol)
                ax2.set_xlabel(xcol)
                ax1.set_ylabel(ycol1)
                ax2.set_ylabel(ycol2)
                ax1.plot(df[xcol], df[ycol1], marker="o", color="red", label=ycol1)
                ax2.plot(df[xcol], df[ycol2], marker="o", color="blue", label=ycol2)
                ax1.legend(loc="upper left")
                ax2.legend(loc="upper right")
                fig.tight_layout()

                # set to place holder
                ph_plot.pyplot(fig)

                # plt.close
                plt.close("all")

            else:
                raise AttributeError(
                    f"monitoring plot type is invalid: {self.param_monitor}"
                )

            # preprocessings for prediction
            pass

            # predict score
            anomaly_score = hts.get_anomaly_score(data=data_dict["Humidity[%]"])
            norm_anom = hts.is_normal(anomaly_score=anomaly_score)

            # write result of prediction (0 or 1, percentages, predicted lifetime, ...)
            if norm_anom:
                ph_pred.success(
                    f"状態: 正常 (異常度: {anomaly_score:.3f}, "
                    + f"閾値: {hts.params['threshold']:.3f})"
                )
            else:
                ph_pred.error(
                    f"状態: 異常 (異常度: {anomaly_score:.3f}, "
                    + f"閾値: {hts.params['threshold']:.3f})"
                )

            # increment
            i += 1

            # gc
            gc.collect()

        # show
        st.info("fin.")


class DataStream:
    """Data stream class."""

    def __init__(self) -> None:
        """Get serial monitoring parameters and connect to the IoT device."""
        # init
        self.tstamp = None
        self.tstart_ds = time.perf_counter()

        # get settings from ./settings.yml
        with SETS_PATH.open(mode="r", encoding="utf-8") as f:
            self.param_serial = yaml.safe_load(stream=f)
        self.param_serial = self.param_serial["Serial"]

        # get instance of SerialMonitor
        if not DBG:
            self.seri = SerialMonitor(
                port=self.param_serial["port"],
                baudrate=self.param_serial["baudrate"],
                timeout=self.param_serial["timeout[s]"],
                logpath=self.param_serial["logpath"],
            )

    def run(self) -> dict:
        """Run the data stream.

        Returns:
            dict: contains data type keys and values with time stamp.
                (Return type is a json-like object as seen in cloud services.)
        """

        # init
        data_dict = {}

        # get real time data
        if DBG:
            # pseudo latency
            time.sleep(0.1)

            # get dummy data
            if time.perf_counter() - self.tstart_ds < 30:
                data_dict = {
                    "tstamp": Timer.get_timestamp(fmt_date="datetime"),
                    "dummy_0[ ]": np.random.rand(),
                    "dummy_1[ ]": np.random.rand(),
                }
            else:
                data_dict = None

        else:
            # get data from serial port (other module)
            data_txt = self.seri.run(is_return=True)

            # if text header is invalid, return "continue"
            if not data_txt.startswith("TimeStamp"):
                print("now restarting...")
                return "continue"

            # txt to dict
            data_txt = data_txt.replace("\r\n", "").split(", ")
            data_dict = {k: v for k, v in [w.split(": ", maxsplit=1) for w in data_txt]}

            # show
            print(data_dict)

        return data_dict


class Timer:
    """Timer class."""

    @staticmethod
    def get_timestamp(fmt_date: str = "datetime") -> str:
        """Get time stamp.

        Args:
            fmt_date (str, optional):
                =="datetime": returned format is "%Y/%m/%d %H:%M:%S.%f",
                == "str" or "text"or "txt": returned format is ""%Y%m%d%H%M%S%f",
                or you can define a returned format manually.
                Defaults to "datetime".

        Returns:
            str: The formatted time stamp.
        """
        # get now
        now = datetime.now()

        # set format
        if fmt_date == "datetime":
            # datetime
            fmt = "%Y/%m/%d %H:%M:%S.%f"

        elif (fmt_date == "str") or (fmt_date == "text") or (fmt_date == "txt"):
            # only figures
            fmt = "%Y%m%d%H%M%S%f"

        else:
            # direct setting
            fmt = fmt_date

        return now.strftime(fmt)


######################################################################
if __name__ == "__main__":
    # logging
    info(msg="start")
    # run
    main()

また,ポートなどについては設定ファイルを読み込んでくるので,パソコン上のテキストエディタで settings.yml という名称で新規ファイルを作成し以下をコピペするか,github の settings.yml をお使いください.

######################################################################
######################################################################
# monitoring settings
Monitoring:
  # plot type (streamlit or matplotlib)
  PlotType: matplotlib

  # plot data length
  DataLength: 30

# serial port settings
Serial:
  # serial port name
  port: COM3

  # device baudrate
  baudrate: 115200

  # timeout [s]
  timeout[s]: 1

  # logging file
  logpath: "auto"
######################################################################

※ シリアルポートはご自身の環境に適合するように書き換えてください

すべての準備が整いました.
M5Stack/SHT35/パソコンを接続し,シェルを開いて real_time_monitoring.py のあるディレクトリに移動し,次を実行しましょう.

streamlit run real_time_monitoring.py

そうすると web アプリが立ち上がり,M5Stack における Wi-Fi 通信が上手く行けば,次のよう動画の左側のように表示されると思います.
(streamlit の初回実行時は,シェル上で E-mail アドレスを聞かれますが,無視して空欄で Enter でOKです)
(もし,通信エラーの場合は,”M5Stack に温度・湿度データを表示してパソコンに送るプログラムを書き込む” の章に記載の手順をお試しください)

このように,温度と湿度がリアルタイムでグラフでモニタリングでき,湿度が異常である場合について検知した結果を表示するシステムができました.

しかしながら,みなさんが試される環境ですと,ちゃんと異常検知できていない(誤検知している)かもしれません.
これはなぜかというと,今まで用いたデータは冬の部屋の中で取得したもので,こちらの動画も同時期/同一環境化にて撮影しましたが,今回の異常検知の計算においてはバックデータとほぼ同一の環境でないと,正常でも異常と判定されてしまうためです.
ちなみに,当該記事執筆現在の 5 月では,同一環境化においてもずっと異常と判定されていました.
今回は,異常検知の内容にはこだわっていなかったので全くロバストではなかったのですが,このようにモニタリングと異常検知について実装することができました.
異常検知について是正する方法としては,

  • 湿度だけをみるのではなく温度と組み合わせたデータ(例えば不快指数)を用いる
  • 季節性を考慮に入れる
  • データのパターンを増やす
  • 手法を変える

等が挙げられます.

以上,ベンダーやコンサルに頼らず,一人で/一週間の真夜中だけで/手持ちの機材で,AI/IoTシステムを構築する でした.

今回,M5Stack からパソコンへのデータ転送は USB を介したものでしたが,Bluetooth / Wi-Fi を用いた実装について検討し,今後掲載したいです.

コメント

タイトルとURLをコピーしました