日本の製造業の中の人が,「ベンダーやコンサルに頼らない」をモットーに,一人で/一週間の真夜中だけで/手持ちの機材で,手軽にAI/IoTシステムを構築できることを証明するために挑みました.
当該連載記事はその記録で,今回は,IoT デバイスに実装する中身であったり,モニタリングシステムに用いるプログラミング,及びその実装について記載していきます.
(※完成度が荒目な部分もあるかと存じますがご容赦ください)
この記事は,以下の記事「ハード/ソフト準備編」の続きです.
今回のコードやデータは,次の github にも掲載しています.
全体的な流れ
実装がパソコン側の python コードだったり,M5Stack 上の MicroPython コードだったりしてややこしいので,全体的な流れを記載します.
以下の順序で進めていきます.
- パソコン側で USB 経由で送られてくる M5Stack からの信号を取得し表示・保存するプログラムの作成とテスト
- M5Stack に温度・湿度データを表示してパソコンに送るプログラムを書き込む
- 温度・湿度データをパソコンへ送ることができているかテスト/データ取得
- データ分析
- 異常検知手法の検討
- 異常検知プログラムの実装
- リアルタイムで温度・湿度データと異常判定を可視化してモニタリングするアプリの作成・実行
PoC するかのごとく,各フェーズで実装/テストを繰り返しながらシステムを構築していきました.
パソコン側で USB 経由で送られてくる M5Stack からの信号を取得し表示・保存するプログラムの作成とテスト
M5Stack からのデータは,今回は USB を介してパソコン側で取得します.
これには,シリアルポートにアクセスすることができる pyserial を用います.
パソコン上のテキストエディタで serial_monitor.py
という名称で新規ファイルを作成し以下をコピペするか,github の
をお使いください.serial_monitor.py
"""Serial monitor @ python.
Usage:
- py serial_monitor.py
---
KazutoMakino
"""
import argparse
import csv
import logging
import sys
import typing
from datetime import datetime
from pathlib import Path
import serial
######################################################################
# main
######################################################################
def main():
# get parser
parser = argparse.ArgumentParser(description="Serial monitor @ python.")
parser.add_argument(
"--port", "-p", type=str, default="COM3", help="serial port name"
)
parser.add_argument(
"--baudrate", "-b", type=int, default=115200, help="device's baudrate"
)
parser.add_argument(
"--timeout", "-t", type=float, default=1.0, help="serial's timeout"
)
parser.add_argument(
"--logpath",
"-l",
type=str,
default=Path(__file__).parent / "log.log",
help="logging file path",
)
args = parser.parse_args()
# set serial monitor parameters
ser = SerialMonitor(
port=args.port,
baudrate=args.baudrate,
timeout=args.timeout,
logpath=args.logpath,
)
# run serial monitor
ser.run()
######################################################################
# class
######################################################################
class SerialMonitor:
"""Serial monitor class."""
def __init__(
self,
port: str = "COM3",
baudrate: int = 115200,
timeout: float = 1,
logpath: typing.Union[Path, str] = Path(__file__).parent / "log.log",
) -> None:
"""Set serial monitoring parameters.
Args:
port (str, optional): A port name. Defaults to "COM3".
baudrate (int, optional): A baudrate of micro computer.
Defaults to 115200.
timeout (float, optional): A timeout time. Defaults to 1.
logpath (Path, optional): A log data file path.
=="auto": {timestamp}.log
Defaults to Path(__file__).parent/"log.log".
"""
# get logpath
log_dir = Path(__file__).resolve().parent / "log"
if not log_dir.exists():
log_dir.mkdir()
if logpath == "auto":
logpath = log_dir / f"{Timer.get_timestamp(fmt_date='str')}.log"
self.logpath = logpath
if not isinstance(self.logpath, Path):
self.logpath = Path(self.logpath)
# serial open
self.ser = serial.Serial(
port=port,
baudrate=baudrate,
timeout=timeout,
xonxoff=False,
rtscts=False,
write_timeout=None,
dsrdtr=False,
inter_byte_timeout=None,
exclusive=None,
)
def run(self, is_return: bool = False) -> str:
"""Run serial monitor.
Args:
is_return (bool, optional): Return values or not.
Defaults to False.
Returns:
str: A data from the IoT device.
"""
try:
if is_return:
# # serial return mode
txt = self.ser.readline()
txt = txt.decode(encoding="utf-8")
# write into file obj if txt is not None
if txt:
# open file obj
with self.logpath.open(
mode="a", encoding="utf-8", newline="\n"
) as f:
f.write(txt)
return txt
else:
# # logging mode
# open file obj
with self.logpath.open(mode="w", encoding="utf-8", newline="\n") as f:
# set csv.writer obj
w = csv.writer(f)
while True:
# load data from serial print
txt = self.ser.readall()
# decode
txt = txt.decode(encoding="utf-8")
print(txt)
# write into file obj if txt is not None
if txt:
w.writerow([txt])
except KeyboardInterrupt:
# manual stop -> serial close
self.ser.close()
class Timer:
"""Timer class."""
@staticmethod
def get_timestamp(fmt_date: str = "datetime") -> str:
"""Get time stamp.
Args:
fmt_date (str, optional):
=="datetime": returned format is ""%Y/%m/%d %H:%M:%S.%f"",
== "str" or "text"or "txt": returned format is ""%Y%m%d%H%M%S%f",
or you can define a returned format manually.
Defaults to "datetime".
Returns:
str: The formatted time stamp.
"""
# get now
now = datetime.now()
# set format
if fmt_date == "datetime":
# datetime
fmt = "%Y/%m/%d %H:%M:%S.%f"
elif (fmt_date == "str") or (fmt_date == "text") or (fmt_date == "txt"):
# only figures
fmt = "%Y%m%d%H%M%S%f"
else:
# direct setting
fmt = fmt_date
return now.strftime(fmt)
######################################################################
if __name__ == "__main__":
try:
main()
except Exception as err:
logging.error(msg=err, exc_info=True)
sys.exit()
プログラムの細かい説明について,ご要望等ありましたら追記させていただきますが,割愛いたします.
コマンドライン引数にて,USB 接続しているポート,ボーレート,通信エラーかどうかを判定する制限時間,取得データ保存ファイルパスを指定できます.
気をつけるのは COM3 以外ではポートを指定する必要があるところで,分からなければ windows であればデバイスマネージャーにて調べることができます(https://slash-z.com/temp-hum-monitoring-preparation/ にて確認方法を記載しています).
では,M5Stack を繋げて実行してみましょう.
例えば,COM4 に繋げていた場合は,次のように実行します.
py serial_monitor.py -p COM4
そうすると,以下のような出力が得られると思います(APIKEY は一応隠しています).
ets Jul 29 2019 12:21:46
rst:0x1 (POWERON_RESET),boot:0x17 (SPI_FAST_FLASH_BOOT)
configsip: 0, SPIWP:0xee
clk_drv:0x00,q_drv:0x00,d_drv:0x00,cs0_drv:0x00,hd_drv:0x00,wp_drv:0x00
mode:DIO, clock div:1
load:0x3fff0018,len:4
load:0x3fff001c,len:5228
load:0x40078000,len:12908
ho 0 tail 12 room 4
load:0x40080400,len:3512
entry 0x4008063c
_ __ _
_ _(_)/ _| | _____ __
| | | | | |_| |/ _ \ \ /\ / /
| |_| | | _| | (_) \ V V /
\__,_|_|_| |_|\___/ \_/\_/
APIKEY: hogefuga
SD card mounted at "/sd"
MicroPython b19425502-dirty on 2022-04-22; M5Stack with ESP32
Type "help()" for more information.
>>>
もし,空の行が送られていくだけでしたら,左側面の赤いリセットボタンを押してみてください.
これでダメでしたら,ポートがあっているか確認ください.
上記の画面出力は,ファイルにも出力しており,コマンドライン引数で設定していなければ,serial_monitor.py と同階層の log.log にテキストとして保存するようにしています.
また,例えば,コマンドライン引数で -l auto
と与えれば,同階層の ./log
ディレクトリの タイムスタンプ.log
にテキストとして保存します.
M5Stack に温度・湿度データを表示してパソコンに送るプログラムを書き込む
パソコン上のテキストエディタで M5Stack に書き込むプログラムを temp_humi.py
という名称で作成します.
以下をコピペするか,github の temp_humi.py
をお使いください.
"""Get temperature / humidity using SHT35 (Groove I2C).
Refs.:
- https://www.switch-science.com/catalog/5337/
- https://www.seeedstudio.com/Grove-I2C-High-Accuracy-Temp-Humi-Sensor-SHT3-p-3182.html
- https://ambidata.io/samples/m5stack/m5stack-micropython/
- https://github.com/kfricke/micropython-sht31
---
KazutoMakino
"""
import socket
import struct
import time
import wifiCfg
from m5stack import lcd
from machine import I2C, RTC, Pin
from micropython import const
######################################################################
# settings
######################################################################
# set sensor settings
R_HIGH = const(1)
R_MEDIUM = const(2)
R_LOW = const(3)
# time.sleep
SLEEPTIME = 1
######################################################################
# main
######################################################################
def main():
# get i2c
i2c = I2C(scl=Pin(22), sda=Pin(21))
# get sensor module
# sensor = SHT31(i2c=i2c, addr=0x44)
sensor = SHT31(i2c=i2c, addr=0x45)
# init: rtc
mcclock = MiConClock()
# endless loop
while True:
# get now
now_txt = mcclock.return_times_of_day(ret_type="str")
# get elapsed time [ms] / 1000
elapsed = time.ticks_ms() * 1e-3
# get temperature and humidity
t, h = sensor.get_temp_humi()
# print @ serial monitor
print(
"TimeStamp: {0}, ElapsedTime[s]: {1}, Temperature[degC]: {2}, Humidity[%]: {3}".format(
now_txt, elapsed, t, h
)
)
# clear the monitor window
lcd.clear(lcd.BLACK)
# print @ physical monitor
lcd.text(0, 0, "{0}".format(now_txt))
lcd.text(0, 20, "Temp: {0:.3f} [degC]".format(t))
lcd.text(0, 40, "Humi: {0:.3f} [%]".format(h))
# wait
time.sleep(SLEEPTIME)
class MiConClock:
"""Clock for micro computer."""
def __init__(self) -> None:
"""Set wi-fi and get collect datetime."""
# wi-fi connection
wifiCfg.autoConnect(lcdShow=True)
# get collect datetime (JPN := UTC + 9 [h])
MyNTPTime.set_local_datetime(offset=9 * 60 * 60)
def return_times_of_day(self, ret_type: str = "tuple") -> tuple:
"""Return times of day.
Args:
ret_type (str, optional): Return type.
=="tuple": return tuple(year, month, day, weekday,
hours, minutes, seconds, subseconds).
=="str": return as datetime format.
Defaults to "tuple".
Returns:
tuple: tuple(year, month, day, weekday, hours, minutes, seconds, subseconds)
"""
# get now / unpack
# RTC().datetime() returns:
# year, month, day, weekday, hours, minutes, seconds, subseconds
(
year,
month,
day,
_weekday,
hours,
minutes,
seconds,
subseconds,
) = RTC().datetime()
# set return data
if ret_type == "tuple":
ret = (
year,
month,
day,
_weekday,
hours,
minutes,
seconds,
subseconds,
)
elif ret_type == "str":
# datetime.now to txt
ret = "{0}/{1}/{2} {3}:{4}:{5}.{6}".format(
year,
PseudoPython.zfill(month, 2),
PseudoPython.zfill(day, 2),
PseudoPython.zfill(hours, 2),
PseudoPython.zfill(minutes, 2),
PseudoPython.zfill(seconds, 2),
subseconds,
)
return ret
def show_times_of_day(self) -> None:
"""Show times of day on LCD."""
# endless
while True:
# get t_start
t_start = time.ticks_ms()
# get datetime.datetime.now()
now_is = self.return_times_of_day(ret_type="str")
# show
print(now_is)
# clear the monitor window and show on m5stack monitor
lcd.clear(lcd.BLACK)
lcd.text(lcd.CENTER, lcd.CENTER, "{0}".format(now_is))
# calc about 1 sec
adaptive_sleep = 1.0 - (time.ticks_ms() - t_start) * 1e-3
# sleep
if adaptive_sleep <= 0:
continue
else:
time.sleep(adaptive_sleep)
class MyNTPTime:
@staticmethod
def set_local_datetime(offset: int = 9 * 60 * 60) -> None:
"""Set local datetime.
Refs.:
- https://www.pool.ntp.org/zone/jp
Args:
offset (int, optional): Offset time [s].
Defaults to 9 * 60 * 60.
"""
def _time():
# (date(2000, 1, 1) - date(1900, 1, 1)).days * 24*60*60 [s]
NTP_DELTA = 3155673600
# host address
host = "pool.ntp.org"
# set query
NTP_QUERY = bytearray(48)
NTP_QUERY[0] = 0x1B
# set address
addr = socket.getaddrinfo(host, 123)[0][-1]
# connection
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
s.settimeout(1)
# send the query to the address
_ = s.sendto(NTP_QUERY, addr)
msg = s.recv(48)
# close connection
s.close()
# unpack binary data
val = struct.unpack("!I", msg[40:44])[0]
return val - NTP_DELTA
# There's currently no timezone support in MicroPython, so
# utime.localtime() will return UTC time (as if it was .gmtime())
#
# offset: timezone offset (sec)
# settime(9*60*60) for JST
t = _time() + offset
tm = time.localtime(t)
tm = tm[0:3] + (0,) + tm[3:6] + (0,)
RTC().datetime(tm)
class PseudoPython:
@staticmethod
def zfill(data, width: int) -> str:
"""Pseudo python's str.zfill(width) method.
Descriptions:
zfill(data, width) means str(data).zfill(width).
Args:
data (object): A numerical object.
width (int): Width of the padding field.
Returns:
str: Returns the numeric string left filled with zeros
in a string of specified length.
"""
# cast
txt = str(data)
# str.zfill
if len(txt) < width:
return ("0" * (width - len(txt))) + txt
else:
return txt
######################################################################
# class
######################################################################
class SHT31:
"""
This class implements an interface to the SHT31 temperature and humidity
sensor from Sensirion.
"""
# This static map helps keeping the heap and program logic cleaner
_map_cs_r = {
True: {R_HIGH: b"\x2c\x06", R_MEDIUM: b"\x2c\x0d", R_LOW: b"\x2c\x10"},
False: {R_HIGH: b"\x24\x00", R_MEDIUM: b"\x24\x0b", R_LOW: b"\x24\x16"},
}
def __init__(self, i2c, addr=0x44):
"""
Initialize a sensor object on the given I2C bus and accessed by the
given address.
"""
if i2c is None:
raise ValueError("I2C object needed as argument!")
self._i2c = i2c
self._addr = addr
def _send(self, buf):
"""
Sends the given buffer object over I2C to the sensor.
"""
self._i2c.writeto(self._addr, buf)
def _recv(self, count):
"""
Read bytes from the sensor using I2C. The byte count can be specified
as an argument.
Returns a bytearray for the result.
"""
return self._i2c.readfrom(self._addr, count)
def _raw_temp_humi(self, r=R_HIGH, cs=True):
"""
Read the raw temperature and humidity from the sensor and skips CRC
checking.
Returns a tuple for both values in that order.
"""
if r not in (R_HIGH, R_MEDIUM, R_LOW):
raise ValueError("Wrong repeatabillity value given!")
self._send(self._map_cs_r[cs][r])
time.sleep_ms(50)
raw = self._recv(6)
return (raw[0] << 8) + raw[1], (raw[3] << 8) + raw[4]
def get_temp_humi(self, resolution=R_HIGH, clock_stretch=True, celsius=True):
"""
Read the temperature in degree celsius or fahrenheit and relative
humidity. Resolution and clock stretching can be specified.
Returns a tuple for both values in that order.
"""
t, h = self._raw_temp_humi(resolution, clock_stretch)
if celsius:
temp = -45 + (175 * (t / 65535))
else:
temp = -49 + (315 * (t / 65535))
return temp, 100 * (h / 65535)
######################################################################
if __name__ == "__main__":
try:
main()
except OSError as err:
# please check I2C connection and reboot...
print(err)
print("please check I2C connection and reboot...")
# clear the monitor window
lcd.clear(lcd.BLACK)
# print @ physical monitor
lcd.text(0, 0, "error: {0}".format(err))
lcd.text(0, 20, "please check I2C connection and reboot...")
上記は MicroPython のコードで,Python と似ていますが,使えないモジュールやメソッドがあるので注意が必要です.
これについては,バージョンが更新されていっているので,色々試したり,公式ドキュメントを参照ください(http://docs.micropython.org/en/latest/).
プログラムの書き込みには,adafruit-ampy を用います.
上記コードを保存したディレクトリにてシェルを開き,M5Stack を繋げて,例えば COM4 でしたら次を実行します.
ampy -p COM4 put temp_humi.py apps/temp_humi.py
構文としてはこんな感じですね.
ampy -p {ポート} put {パソコンから送りたいファイルパス} {M5Stack 上に置きたいパス}
書き込みの際に画面が再起動したりしますが,少し待つと上記コマンドが終了し,M5Stack が再起動して元のアプリが起動したら書き込みが完了しています.
もしうまくいかない場合は,
- M5Stack のリセットボタンを押す
- APP モードにする
- コマンドを実行し直す(シェルのコマンド中断は break ボタンか ctrl + c です)
- ポートが正しいかどうか確認する
などを試してみてください.
ちなみに,ampy の使い方はコマンドラインから ampy --help
にて調べることが可能です.
他にもファイル操作やデバッグに使える機能など便利な使い方があるので,ご興味あればご覧ください.
プログラムについて,M5Stack 上の apps/
というところに保存しましたが,UIFLOW で選択可能なプログラムはここのディレクトリを参照しており,ここに置くことでアプリの選択が自由にできます.
(実は,apps/temp_humi.py
の代わりに main.py
とすることで,書き込み後再起動してそのままアプリが実行されるのですが,UIFLOW の APP モードで選択できないので apps/ に置きました.)
環境構築編にて Wi-Fi の設定を飛ばしましたが,M5Stack では時間が保持されない関係上,現在時刻を Wi-Fi 経由にて取得するため,以下のように設定します.
まずは,M5Stack をパソコンにつなげ,M5Burner を立ち上げます.
ポートが正しいかどうかも確認しましょう.
下図の Configuration を押下します.
次のポップアップウィンドウにて,いつもお使いの Wi-Fi の SSID とパスワードを入力します(スマートフォンのテザリングでも可).
それでは,M5Stack 側で今書き込んだアプリを立ち上げてみましょう.
SHT35 を M5Stack に繋げ,こちらの画面遷移を参考に,APP Mode のアプリ一覧から temp_humi.py
を選択します.
temp_humi.py
を起動すると,現在時刻取得のために,最初に先ほど設定した Wi-Fi 接続を試みます(左図).
Wi-Fi 接続と時刻取得が完了すると,時刻・温度・湿度が画面出力します.
もし,Wi-Fi 接続がエラーになる場合は,真ん中ボタンの Retry を押すか,左側面のリセットボタンを押下してください.
何度やってもダメな場合は,Wi-Fi のパスワードなどの設定や,機器そのものについてご確認ください.
温度・湿度データをパソコンへ送ることができているかテスト/データ取得
とりあえずこれだけで,現在の温度と湿度については簡素ですがモニタリングすることが可能になりました.
次は,このデータをパソコンに送り,データを取ってみましょう.
M5Stack をパソコンに USB 接続した状態で,パソコンのシェルにて serial_monitor.py
を以下コマンドライン引数を用いて,
py serial_monitor.py -p COM? -l auto
と実行してください.
ここでCOMポートは,現在 M5Stack に繋いでいるポートを指定ください.
実行すると M5Stack は再起動され,Wi-Fi 接続を行い,OKの場合は温度と湿度を画面表示し続けます.
これと同時に,パソコンのシェル上においても,タイムスタンプ/温度/湿度が出力され,同時に ./log/{タイムスタンプ}.log
にもデータが出力されていきます../log/{タイムスタンプ}.log
の中身は,以下のようなデータです.
ets Jul 29 2019 12:21:46
rst:0x1 (POWERON_RESET),boot:0x17 (SPI_FAST_FLASH_BOOT)
configsip: 0, SPIWP:0xee
clk_drv:0x00,q_drv:0x00,d_drv:0x00,cs0_drv:0x00,hd_drv:0x00,wp_drv:0x00
mode:DIO, clock div:1
load:0x3fff0018,len:4
load:0x3fff001c,len:5228
load:0x40078000,len:12908
ho 0 tail 12 room 4
load:0x40080400,len:3512
entry 0x4008063c
_ __ _
_ _(_)/ _| | _____ __
| | | | | |_| |/ _ \ \ /\ / /
| |_| | | _| | (_) \ V V /
\__,_|_|_| |_|\___/ \_/\_/
APIKEY: hogefuga
"
"SD card mounted at ""/sd""
"
"TimeStamp: 2022/05/25 01:42:34.176, ElapsedTime[s]: 7.261, Temperature[degC]: 27.44602, Humidity[%]: 52.19654
TimeStamp: 2022/05/25 01:42:35.346392, ElapsedTime[s]: 8.604, Temperature[degC]: 27.44602, Humidity[%]: 52.20722
TimeStamp: 2022/05/25 01:42:36.694197, ElapsedTime[s]: 9.952001, Temperature[degC]: 27.46204, Humidity[%]: 52.1828
"
"TimeStamp: 2022/05/25 01:42:38.43336, ElapsedTime[s]: 11.301, Temperature[degC]: 27.46204, Humidity[%]: 52.1294
TimeStamp: 2022/05/25 01:42:39.390944, ElapsedTime[s]: 12.648, Temperature[degC]: 27.4754, Humidity[%]: 52.13245
"
これで,無事にデータが取れる環境を構築することができました.
データ分析
前の章のように長時間に渡ってデータを取得した後,./log
というディレクトリを ./data
に書き換え,どういったデータなのか分析を行いました.
github のファイル名は,./eda.ipynb
です.
はじめに,取得したテキストデータを扱いやすくするために,以下手順で pandas.DataFrame 化しました.
split("\n")
- 先頭が
TimeStamp:
でないものを除外 split(", ")
split(": ", maxsplit1)
- 上記を pandas.DataFrame 化
コードとしては以下です.
from pathlib import Path
cur_dir = Path("").resolve()
data_dir = cur_dir / "data"
flist = [v for v in data_dir.glob("*.txt")]
# init
dfs = {}
# loop: files
for fpath in flist:
# file open
with fpath.open(mode="r", encoding="utf-8") as f:
# read
txt = f.read()
# split with "\n"
lines = txt.split("\n")
# exclude not line.startswith("TimeStamp: ")
lines = [v for v in lines if v.startswith("TimeStamp: ")]
# split with ", "
mat = [v.split(", ") for v in lines if v]
# split with ": ", maxsplit=1
item_tuples = [[vv.split(": ", maxsplit=1) for vv in v if vv] for v in mat if v]
columns = [v[0] for v in item_tuples[0]]
values = [[vv[1] for vv in v] for v in item_tuples]
# make dataframe
df = pd.DataFrame(data=values, columns=columns)
# cast
for col in df.columns:
if col=="TimeStamp":
df[col] = pd.to_datetime(df[col])
else:
df[col] = df[col].astype(float)
# add to dfs
dfs[fpath.stem] = df
これで,{ファイル名: pandas.DataFrame} とした辞書ができました.
次に,可視化を行います.
まずは,時系列の生データについてです.
import matplotlib.pyplot as plt
import seaborn as sns
sns.set()
xcol = "TimeStamp"
ycol1 = "Temperature[degC]"
ycol2 = "Humidity[%]"
for k,df in dfs.items():
fig = plt.figure(figsize=(12, 5))
ax1 = fig.add_subplot()
ax2 = ax1.twinx()
ax1.set_xlabel(xcol)
ax1.set_ylabel(ycol1)
ax2.set_ylabel(ycol2)
ax1.plot(df[xcol], df[ycol1], color="red", label=ycol1)
ax2.plot(df[xcol], df[ycol2], color="blue", label=ycol2)
ax1.legend(loc="upper left")
ax2.legend(loc="upper right")
plt.title(k)
fig.tight_layout()
plt.show()
plt.clf()
plt.close("all")
- データは,2022/01 中旬の暖房や加湿器などが ON/OFF される部屋の中で取得した温度・湿度です
- 横軸はデータ取得時刻,赤の実線(第1縦軸)は居室内温度 [℃],青の実線(第2縦軸)は居室内湿度 [%] を表します
- 温度と湿度は,おおよそトレード・オフの関係であることが示されます(湿度 [%] は絶対量でなく飽和水蒸気量に対する割合なので)
- 暖房/加湿器/換気扇の ON / OFF や窓の開閉をしているため,30 分や 1 時間単位でうねるような大きな変化が現れています
- 上から2番めのプロットについて,
18 19:30
において温度と湿度の大きなピークが見られますが,これは,わざと加湿器吐出部近傍にセンサを近づけて取得したデータです.なぜこのようにしたかというと,いつもの居室内雰囲気を正常とした場合,擬似的な異常としてデータが欲しかったためです.これ以外の日時ではこのようなデータは取っていません.
次に,データの偏りについて考察するために,各日程における温度と湿度について,度数分布を棒グラフで,及びカーネル密度推定を折れ線グラフで,この度数の平均を一点鎖線にて可視化します.
xcol1 = "Temperature[degC]"
xcol2 = "Humidity[%]"
ycol = "Count"
# init
color = ["blue", "green", "orange", "red"]
bins = 20
# temperature
fig = plt.figure(figsize=(12, 5))
ax = fig.add_subplot()
ax.set_xlabel(xcol1)
ax.set_ylabel(ycol)
for i, k_df in enumerate(dfs.items()):
# unpack
k, df = k_df
# ax.hist(df[xcol1], bins=50)
ax = sns.histplot(df[xcol1], bins=bins, kde=True, label=k, color=color[i])
ax.axvline(df[xcol1].mean(), color=color[i], linestyle="-.")
ax.legend(loc="upper left")
#ax.set_yscale("log")
#ax.set_ylim(0.9)
plt.title(xcol1)
fig.tight_layout()
plt.show()
plt.close("all")
# humidity
fig = plt.figure(figsize=(12, 5))
ax = fig.add_subplot()
ax.set_xlabel(xcol2)
ax.set_ylabel(ycol)
for i, k_df in enumerate(dfs.items()):
# unpack
k, df = k_df
# ax.hist(df[xcol1], bins=50)
ax = sns.histplot(df[xcol2], bins=bins, kde=True, label=k, color=color[i])
ax.axvline(df[xcol2].mean(), color=color[i], linestyle="-.")
ax.legend(loc="upper left")
#ax.set_yscale("log")
#ax.set_ylim(0.9)
plt.title(xcol2)
fig.tight_layout()
plt.show()
度数分布はレコード数が大きい順に,赤 > オレンジ > 青 > 緑 となっていますが,上図の様に正規化されていないデータの度数分布は潰れてしまっています.
また,正規化しても棒グラフが重なり合っては見にくいので,正規化したカーネル密度推定のみをプロットします.
xcol1 = "Temperature[degC]"
xcol2 = "Humidity[%]"
ycol = "Kernel Density Estimate plot"
# init
color = ["blue", "green", "orange", "red"]
# temperature
fig = plt.figure(figsize=(12, 5))
ax = fig.add_subplot()
ax.set_xlabel(xcol1)
ax.set_ylabel(ycol)
for i, k_df in enumerate(dfs.items()):
# unpack
k, df = k_df
ax = sns.kdeplot(df[xcol1], label=k, color=color[i])
ax.axvline(df[xcol1].mean(), color=color[i], linestyle="-.")
ax.legend(loc="upper left")
plt.title(xcol1)
fig.tight_layout()
plt.show()
plt.close("all")
# humidity
fig = plt.figure(figsize=(12, 5))
ax = fig.add_subplot()
ax.set_xlabel(xcol2)
ax.set_ylabel(ycol)
for i, k_df in enumerate(dfs.items()):
# unpack
k, df = k_df
ax = sns.kdeplot(df[xcol2], label=k, color=color[i])
ax.axvline(df[xcol2].mean(), color=color[i], linestyle="-.")
ax.legend(loc="upper left")
plt.title(xcol2)
fig.tight_layout()
plt.show()
- 温度について,青と緑は裾野が狭く,オレンジと赤は裾野が広くなっています.これは,温度変化はエアコンや換気扇の ON / OFF によって大きく左右されますが,前者はデータ取得した時間が短く,エアコンや換気扇操作しなかったことが原因です.校舎は,測定中にエアコンや換気扇の操作をしていました.
- 湿度について,高い温度を示したオレンジと赤にて,低湿度の期間があったことが示されます
- 湿度の緑について,分布の裾野は 80 [%] を越えていますが,これは,擬似的な異常データとしてセンサを加湿器に近づけてデータ取得したためです
今まで測定ファイル別でプロットしていましたが,同種のデータなので,全データを一括でプロットします.
# init
df_all = pd.DataFrame()
# concat
for df in dfs.values():
df_all = pd.concat([df_all, df])
df_all.reset_index(drop=True, inplace=True)
xcol1 = "Temperature[degC]"
xcol2 = "Humidity[%]"
ycol = "Count"
# init
bins = 50
# plot
for xcol in [xcol1, xcol2]:
fig = plt.figure(figsize=(12, 4))
ax = fig.add_subplot()
ax.set_xlabel(xcol)
ax.set_ylabel(ycol)
ax = sns.histplot(df_all[xcol], bins=bins, kde=True)
ax.axvline(df_all[xcol].mean(), linestyle="-.")
fig.tight_layout()
plt.show()
plt.close("all")
# calc delta t
delta_t = []
for df in dfs.values():
v = df["ElapsedTime[s]"]
delta_t.extend([v.iloc[i] - v.iloc[i - 1] for i in range(len(df)) if i!=0])
# plot
fig = plt.figure(figsize=(12, 4))
ax = fig.add_subplot()
ax.set_xlabel("Sampling rate [s]")
ax.set_ylabel(ycol)
ax = sns.histplot(delta_t, bins=bins, kde=True)
ax.axvline(sum(delta_t)/len(delta_t), linestyle="-.")
fig.tight_layout()
plt.show()
- データ取得期間は冬ですが,比較的過ごしやすいように温度調整しているので,大体 18 [℃] ~ 22 [℃] の割合が大きくなっています
- 湿度データについては,ピークが3つ存在しています.擬似的な異常として取得した湿度 80 [%] 超のデータは,この分布から見ても外れ値であると言えます
- 下段はサンプリングレートを示していおり,大体 1.33 [s] おきにデータ取得されています
湿度について3つのピークがありましたが,これらについて温度との関係性を次で可視化します.
sns.jointplot(
data=df_all,
x="Temperature[degC]",
y="Humidity[%]",
kind="hist",
cmap="turbo",
height=8,
bins=100
)
plt.show()
- 横軸は温度の度数分布,縦軸は湿度の度数分布をそれぞれ表します
- 左下のプロットは,温度と湿度の度数の大きさを色で表していて,これが大きくなるに従い,寒色から暖色へと変化させてプロットしています
- 湿度が約 60 [%] では,温度が低い場合に飽和水蒸気量が低くなることによって,相対湿度が上昇するため度数が高くなっています
- 上記と逆に,湿度約 40 [%] では,暖房を入れたことによる相対湿度の減少によって度数が高くなっています
- 湿度約 50 [%] では,温度を約 20 [℃] を目指して暖房を ON / OFF することが多いみたいで,このことと加湿器やセンサの位置関係によって度数が高くなっているようです
異常検知手法の検討
データの傾向について前の章にて可視化を行いましたが,外れ値が分かりやすかったので,これの検知も実装しやすそうです.
次は,異常検知手法について検討していきます.
github のファイル名は,trial_training.ipynb
です.
データについては,./data
ディレクトリをまるごとコピーして ./traindata
という名前にしておきます.
まずは,以下でデータセットを準備します.
from pathlib import Path
import pandas as pd
curdir = Path("").resolve()
datadir = curdir / "traindata"
flist = list(datadir.glob("*.txt"))
# init
dfs = {}
df_all = pd.DataFrame()
# loop: files
for fpath in flist:
# file open
with fpath.open(mode="r", encoding="utf-8") as f:
# read
txt = f.read()
# split with "\n"
lines = txt.split("\n")
# exclude not line.startswith("TimeStamp: ")
lines = [v for v in lines if v.startswith("TimeStamp: ")]
# split with ", "
mat = [v.split(", ") for v in lines if v]
# split with ": ", maxsplit=1
item_tuples = [[vv.split(": ", maxsplit=1) for vv in v if vv] for v in mat if v]
columns = [v[0] for v in item_tuples[0]]
values = [[vv[1] for vv in v] for v in item_tuples]
# make dataframe
df = pd.DataFrame(data=values, columns=columns)
# cast
for col in df.columns:
if col=="TimeStamp":
df[col] = pd.to_datetime(df[col])
else:
df[col] = df[col].astype(float)
# show
print(f"*** {fpath.name} ***")
print(df.info())
print()
# add to dfs
dfs[fpath.stem] = df
# add to df_all
df_all = pd.concat([df_all, df])
# reset index
df_all.reset_index(drop=True, inplace=True)
外れ値が分かりやすかったので,演算が簡単なホテリング T-squared 法を用いて異常検知できるかを試していきます.
最初は,正常と異常のどちらも含まれるデータについて検討します.
import numpy as np
from scipy import stats
import matplotlib.pyplot as plt
import seaborn as sns
sns.set()
# set data
sample = df_all["Humidity[%]"].to_numpy()
# sample mean
sample_mean = np.mean(sample)
# sample variance
sample_var = np.var(sample, ddof=0)
# init
anomaly_scores = []
for s in sample:
# calc anomaly score
t = (s - sample_mean)**2 / sample_var
# append
anomaly_scores.append(t)
# get threshold from 1% of chi-square distribution
threshold = stats.chi2.interval(0.99, 1)[1]
# plot
plt.scatter(x=list(range(len(anomaly_scores))), y=anomaly_scores, marker=".")
plt.axhline(y=threshold, linestyle="--", color="gray")
plt.xlabel("Index")
plt.ylabel("Anomaly score\n(Hotelling T-squared distribution)")
plt.show()
- 横軸は
df_all
の行番号,縦軸はホテリング T-squared 法に基づく異常度です - 破線は,カイ2乗分布における 1 % 区間から計算した閾値で,この破線より下側を正常,上側を異常とします
- Index = 3000 あたりで閾値を超えるデータ点が多く現れています.この部分は,擬似的な異常として定義したデータで,簡易的ですが異常として検知できているようです
上記の計算は,異常が含まれるデータを用いていました.
しかし,現実の運用においては,異常という状態が極稀でデータが取れないケースもあるため,バックデータは正常データのみであることが望ましいと考えられます.
従って,異常データが存在するファイル 20220118192914980961.txt
以外のデータで標本平均と標本分散を計算し,異常度の計算はこれに対して行ってみます.
まずは,正常と異常を含むデータセットをそれぞれ分けて作成します.
sample_norm, sample_anom = [], []
for k,df in dfs.items():
if k=="20220118192914980961":
sample_anom.extend(df["Humidity[%]"].to_list())
else:
sample_norm.extend(df["Humidity[%]"].to_list())
np.shape(sample_anom),np.shape(sample_norm)
# ((520,), (27776,))
次に統計量を計算します.
# sample mean
smean = np.mean(sample_norm)
# sample variance
svar = np.var(sample_norm, ddof=0)
この統計量を用いて,統計量の計算に用いていない異常値を含むデータについて異常度を計算し,可視化します
# init
scores = []
for s in sample_anom:
# calc anomaly score
t = (s - smean)**2 / svar
# append
scores.append(t)
# get threshold from 1% of chi-square distribution
threshold = stats.chi2.interval(0.99, 1)[1]
# plot
plt.scatter(x=list(range(len(scores))), y=scores, marker=".")
plt.axhline(y=threshold, linestyle="--", color="gray")
plt.xlabel("Index")
plt.ylabel("Anomaly score\n(Hotelling T-squared distribution)")
plt.show()
このように,正常データのみ教師としても異常検知できそうなので,最終的なシステムで用いるためにこれら統計量や設定値について,./param_HotellingTSquare.json
として保存しておきます.
# set save dict
savedict = {
"mean": smean,
"variance": svar,
"alpha": 0.99,
"df": 1.0,
"threshold": threshold,
"memo": "Hotelling T-squared distribution",
"timestamp": datetime.now().strftime("%Y/%m/%d-%H:%M:%S.%f"),
"sourcedir": datadir.name,
"sourcefiles": [v.name for v in flist],
}
# set save path
savepath = curdir / "param_HotellingTSquare.json"
# save
with savepath.open(mode="w") as f:
json.dump(savedict, f, indent=4)
異常検知プログラムの実装
異常検知にホテリング T-squared 法が使えそうということが分かったので,次は,モニタリングシステムから呼び出すための異常検知を行う部分について用意します.
パソコン上のテキストエディタで anomaly_detection.py
という名称で新規ファイルを作成し以下をコピペするか,github の
をお使いください.anomaly_detection.py
"""Anomaly detection module.
---
KazutoMakino
"""
import json
from datetime import datetime
from pathlib import Path
import numpy as np
from scipy import stats
######################################################################
# class
######################################################################
class HotellingTSquare:
"""Hotelling T-squared distribution class."""
def __init__(
self, param_path: Path = Path(__file__).parent / "./param_HotellingTSquare.json"
) -> None:
"""Get parameters from param_HotellingTSquare.json.
Args:
param_path (Path, optional): A parameter file.
Defaults to Path(__file__).parent / "./param_HotellingTSquare.json".
"""
# cast
self.param_path = Path(param_path)
# load parameters
if self.param_path.exists():
with self.param_path.open(mode="r", encoding="utf-8") as f:
self.params = json.load(fp=f)
else:
self.params = {"mean": None, "variance": None}
def fit(
self, dataset: list, alpha: float = 0.99, df: float = 1.0, memo_dict: dict = {}
) -> None:
"""Parameter fitting.
Args:
dataset (list): An input 1d-array data.
alpha (float, optional): A degree of reliability.
Defaults to 0.99.
df (float, optional): A degree of freedom.
Defaults to 1.0.
memo_dict (dict, optional): A memo dictionary.
Defaults to {}.
"""
# calc sample mean
s_mean = np.mean(dataset)
# calc sample variance
s_var = np.var(dataset, ddof=0)
# calc threshold
threshold = stats.chi2.interval(alpha=alpha, df=df, loc=0, scale=1)[1]
# set self.params and update
self.params = {
"mean": s_mean,
"variance": s_var,
"alpha": alpha,
"df": df,
"threshold": threshold,
"memo": "Hotelling T-squared distribution",
"timestamp": datetime.now().strftime("%Y/%m/%d-%H:%M:%S.%f"),
}
self.params.update(memo_dict)
# save to json
with self.param_path.open(mode="w", encoding="utf-8") as f:
json.dump(obj=self.params, fp=f, indent=4, sort_keys=False)
def get_anomaly_score(self, data: float) -> float:
"""Calculating anomaly score.
Args:
data (float): An input 1d-array data.
Returns:
float: An anomaly score.
"""
return data - self.params["mean"] ** 2 / self.params["variance"]
def is_normal(self, anomaly_score: float) -> bool:
"""Return True (normal) or False (anomaly) using the threshold.
Args:
anomaly_score (float): A calculated anomaly score.
Returns:
bool: True (normal) or False (anomaly).
"""
# return normal (True) or anomaly (False)
if anomaly_score <= self.params["threshold"]:
return True
else:
return False
リアルタイムで温度・湿度データと異常判定を可視化してモニタリングするアプリの作成・実行
これまで,データ取得,異常検知と来て,残すはパソコン上でモニタリングするアプリの作成です.
今回は,ローコードで web アプリが簡単に作成できる streamlit というライブラリを用いてアプリを作成します.
パソコン上のテキストエディタで real_time_monitoring.py
という名称で新規ファイルを作成し以下をコピペするか,github の
をお使いください.real_time_monitoring
.py
"""The sample code of the real time monitoring system using by streamlit.
Usage:
- streamlit run real_time_monitoring.py
---
KazutoMakino
"""
######################################################################
# import
######################################################################
import gc
import sys
import time
from datetime import datetime
from logging import info
from pathlib import Path
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import seaborn as sns
import streamlit as st
import yaml
sns.set()
# import my pkgs
if True:
from anomaly_detection import HotellingTSquare
from serial_monitor import SerialMonitor
######################################################################
# global settings
######################################################################
# show parser arguments
print(sys.argv)
# # analyze parser arguments
# streamlit run or not
if sys.argv[0] in ["py", "python", "python3"]:
raise AttributeError(
"""
usage:
- 'streamlit run real_time_monitoring.py'
or
- 'streamlit run real_time_monitoring.py debug'
"""
)
else:
# debug or not
if "debug" in sys.argv:
DBG = True
else:
DBG = False
# get settings.yml abspath
SETS_PATH = Path(__file__).resolve().parent / "settings.yml"
if not SETS_PATH.exists():
raise FileNotFoundError(f"not exists: {SETS_PATH}")
######################################################################
# main
######################################################################
def main():
monitoring_app = MonitoringApp()
monitoring_app.run()
######################################################################
# class
######################################################################
class MonitoringApp:
"""The real time monitoring system."""
def __init__(self) -> None:
# get instance of DataStream
self.ds = DataStream()
# get settings from ./settings.yml
with SETS_PATH.open(mode="r", encoding="utf-8") as f:
self.sets = yaml.safe_load(stream=f)
self.param_monitor = self.sets["Monitoring"]
def run(self) -> None:
"""Run the simple real time monitoring system."""
# write the title and some contents
title = """\
### リアルタイムプロットの可視化と異常検知のサンプル
---
データの種類:
- 室内温度
- 室内湿度
デモ内容:
- 温度/湿度についてリアルタイムモニタリング
- 加湿器の吐出孔に近づけている場合とそうでない場合のデータを取得
- 近づけている時を疑似的な異常として定義し,これを検知する
---
"""
st.markdown(body=title, unsafe_allow_html=False)
# # make place holders
# predicted score
ph_pred = st.empty()
# plot area
ph_plot = st.empty()
# json style parameters
st.markdown(
"""
---
parameters:
"""
)
st.json(self.sets)
# # init
# plot data length
if DBG:
data_length = 10
else:
data_length = self.param_monitor["DataLength"]
# set void
data = {}
# init index
i = 0
# set anomaly detection method
hts = HotellingTSquare()
# running until getting KeyboardInterrupt
# (Which does code catch the KeyboardInterrupt ?)
while True:
# load data
data_dict = self.ds.run()
# count continue flag -> continue
if data_dict == "continue":
continue
# show @ debug
if DBG:
if not data_dict:
break
else:
print(data_dict)
# cast values except "TimeStamp"
for k, v in data_dict.items():
if (k in ["tstamp", "TimeStamp"]) and (
self.param_monitor["PlotType"] == "streamlit"
):
# except "TimeStamp" if "PlotType" == "streamlit"
continue
elif (k in ["tstamp", "TimeStamp"]) and (
self.param_monitor["PlotType"] == "matplotlib"
):
# if "PlotType" == "matplotlib", cast to datetime obj
data_dict[k] = datetime.strptime(
data_dict[k], "%Y/%m/%d %H:%M:%S.%f"
)
else:
# cast values
data_dict[k] = float(v)
print(data_dict)
# get formated data set (xmax,xmin, )
if i == 0:
# initial: value -> list
data = {k: [v] for k, v in data_dict.items()}
else:
# updating data
for k, v in data_dict.items():
# remove oldest component
# if list length is larger than setting length
if len(data[k]) >= data_length:
data[k].remove(data[k][0])
# append
data[k].append(v)
# set dataframe
df = pd.DataFrame(data=data)
# make line plot (Which is faster streamlit.line_chart or matplotlib ?)
if self.param_monitor["PlotType"] == "streamlit":
# set index to "timestamp" column for
# using streamlit.line_chart x-label.
df.set_index(keys=list(data.keys())[0], drop=True, inplace=True)
# streamlit.line_chart()'s data: pandas.DataFrame, xaxis<-index
# see:
# https://docs.streamlit.io/library/api-reference/charts/st.line_chart
ph_plot.line_chart(data=df, width=0, height=0, use_container_width=True)
elif self.param_monitor["PlotType"] == "matplotlib":
# get x column name
xcol = list(df.columns)[0]
ycol1 = "Temperature[degC]"
ycol2 = "Humidity[%]"
# plot
if i == 0:
fig = plt.figure(figsize=(12, 5))
ax1 = fig.add_subplot()
ax2 = ax1.twinx()
else:
ax1.cla()
ax2.cla()
ax1.set_xlabel(xcol)
ax2.set_xlabel(xcol)
ax1.set_ylabel(ycol1)
ax2.set_ylabel(ycol2)
ax1.plot(df[xcol], df[ycol1], marker="o", color="red", label=ycol1)
ax2.plot(df[xcol], df[ycol2], marker="o", color="blue", label=ycol2)
ax1.legend(loc="upper left")
ax2.legend(loc="upper right")
fig.tight_layout()
# set to place holder
ph_plot.pyplot(fig)
# plt.close
plt.close("all")
else:
raise AttributeError(
f"monitoring plot type is invalid: {self.param_monitor}"
)
# preprocessings for prediction
pass
# predict score
anomaly_score = hts.get_anomaly_score(data=data_dict["Humidity[%]"])
norm_anom = hts.is_normal(anomaly_score=anomaly_score)
# write result of prediction (0 or 1, percentages, predicted lifetime, ...)
if norm_anom:
ph_pred.success(
f"状態: 正常 (異常度: {anomaly_score:.3f}, "
+ f"閾値: {hts.params['threshold']:.3f})"
)
else:
ph_pred.error(
f"状態: 異常 (異常度: {anomaly_score:.3f}, "
+ f"閾値: {hts.params['threshold']:.3f})"
)
# increment
i += 1
# gc
gc.collect()
# show
st.info("fin.")
class DataStream:
"""Data stream class."""
def __init__(self) -> None:
"""Get serial monitoring parameters and connect to the IoT device."""
# init
self.tstamp = None
self.tstart_ds = time.perf_counter()
# get settings from ./settings.yml
with SETS_PATH.open(mode="r", encoding="utf-8") as f:
self.param_serial = yaml.safe_load(stream=f)
self.param_serial = self.param_serial["Serial"]
# get instance of SerialMonitor
if not DBG:
self.seri = SerialMonitor(
port=self.param_serial["port"],
baudrate=self.param_serial["baudrate"],
timeout=self.param_serial["timeout[s]"],
logpath=self.param_serial["logpath"],
)
def run(self) -> dict:
"""Run the data stream.
Returns:
dict: contains data type keys and values with time stamp.
(Return type is a json-like object as seen in cloud services.)
"""
# init
data_dict = {}
# get real time data
if DBG:
# pseudo latency
time.sleep(0.1)
# get dummy data
if time.perf_counter() - self.tstart_ds < 30:
data_dict = {
"tstamp": Timer.get_timestamp(fmt_date="datetime"),
"dummy_0[ ]": np.random.rand(),
"dummy_1[ ]": np.random.rand(),
}
else:
data_dict = None
else:
# get data from serial port (other module)
data_txt = self.seri.run(is_return=True)
# if text header is invalid, return "continue"
if not data_txt.startswith("TimeStamp"):
print("now restarting...")
return "continue"
# txt to dict
data_txt = data_txt.replace("\r\n", "").split(", ")
data_dict = {k: v for k, v in [w.split(": ", maxsplit=1) for w in data_txt]}
# show
print(data_dict)
return data_dict
class Timer:
"""Timer class."""
@staticmethod
def get_timestamp(fmt_date: str = "datetime") -> str:
"""Get time stamp.
Args:
fmt_date (str, optional):
=="datetime": returned format is "%Y/%m/%d %H:%M:%S.%f",
== "str" or "text"or "txt": returned format is ""%Y%m%d%H%M%S%f",
or you can define a returned format manually.
Defaults to "datetime".
Returns:
str: The formatted time stamp.
"""
# get now
now = datetime.now()
# set format
if fmt_date == "datetime":
# datetime
fmt = "%Y/%m/%d %H:%M:%S.%f"
elif (fmt_date == "str") or (fmt_date == "text") or (fmt_date == "txt"):
# only figures
fmt = "%Y%m%d%H%M%S%f"
else:
# direct setting
fmt = fmt_date
return now.strftime(fmt)
######################################################################
if __name__ == "__main__":
# logging
info(msg="start")
# run
main()
また,ポートなどについては設定ファイルを読み込んでくるので,パソコン上のテキストエディタで settings.yml
という名称で新規ファイルを作成し以下をコピペするか,github の settings.yml
をお使いください.
######################################################################
######################################################################
# monitoring settings
Monitoring:
# plot type (streamlit or matplotlib)
PlotType: matplotlib
# plot data length
DataLength: 30
# serial port settings
Serial:
# serial port name
port: COM3
# device baudrate
baudrate: 115200
# timeout [s]
timeout[s]: 1
# logging file
logpath: "auto"
######################################################################
※ シリアルポートはご自身の環境に適合するように書き換えてください
すべての準備が整いました.
M5Stack/SHT35/パソコンを接続し,シェルを開いて real_time_monitoring.py
のあるディレクトリに移動し,次を実行しましょう.
streamlit run real_time_monitoring.py
そうすると web アプリが立ち上がり,M5Stack における Wi-Fi 通信が上手く行けば,次のよう動画の左側のように表示されると思います.
(streamlit の初回実行時は,シェル上で E-mail アドレスを聞かれますが,無視して空欄で Enter でOKです)
(もし,通信エラーの場合は,”M5Stack に温度・湿度データを表示してパソコンに送るプログラムを書き込む” の章に記載の手順をお試しください)
このように,温度と湿度がリアルタイムでグラフでモニタリングでき,湿度が異常である場合について検知した結果を表示するシステムができました.
しかしながら,みなさんが試される環境ですと,ちゃんと異常検知できていない(誤検知している)かもしれません.
これはなぜかというと,今まで用いたデータは冬の部屋の中で取得したもので,こちらの動画も同時期/同一環境化にて撮影しましたが,今回の異常検知の計算においてはバックデータとほぼ同一の環境でないと,正常でも異常と判定されてしまうためです.
ちなみに,当該記事執筆現在の 5 月では,同一環境化においてもずっと異常と判定されていました.
今回は,異常検知の内容にはこだわっていなかったので全くロバストではなかったのですが,このようにモニタリングと異常検知について実装することができました.
異常検知について是正する方法としては,
- 湿度だけをみるのではなく温度と組み合わせたデータ(例えば不快指数)を用いる
- 季節性を考慮に入れる
- データのパターンを増やす
- 手法を変える
等が挙げられます.
以上,ベンダーやコンサルに頼らず,一人で/一週間の真夜中だけで/手持ちの機材で,AI/IoTシステムを構築する でした.
今回,M5Stack からパソコンへのデータ転送は USB を介したものでしたが,Bluetooth / Wi-Fi を用いた実装について検討し,今後掲載したいです.
コメント