2021年8月15日日曜日

WxBeacon2とSwitchBotの温湿度計の取得python3スクリプトを改修

自宅にはWxBeacon2とSwitchBotの温湿度計が各々1台ずつあるのだが、 先日のスクリプトだと1台のセンサから情報取得するのに毎回5秒かかるので1分間には最大12個しか取れない。クラス定義のscanを読むとタイムアウトまでの時間で見つかったデバイス全てに処理をしているようだったので、チェックしたいデバイスのMacアドレスをリストで渡すと、その中で見つかったセンサのデータを返すようにしてみた。

センサからのデータを成形したりbluepyを使う部分は前回同様に @c60evaporatorさんの記事のコードそのまま。

  1. Omron環境センサの値をRaspberryPiで定期ロギングする
  2. SwitchBot温湿度計の値をRaspberryPiでロギング

WxBeacon2のブロードキャストモードへの変更やSwitchBotのMACアドレスの取得、周辺モジュールのインストールや設定は上記の記事を参照の事。

「env_broadcast.py」、「bluetoothsensor.py」を同一ディレクトリに書くのすれば動くはず。

「env_broadcast.py」は、リスト型で調査対象のMACアドレスを一覧で受け取る。スキャン時に一致するMACアドレスが見つかるとWxBeacon2かSwitchBotの温湿度計かを自動判別してセンサ情報を取得する。取得したデータはMACアドレスをキーにしたdict型(バリューも更にdict型)に格納する。同じMACアドレスで2度取れてしまった際は上書きしている。

env_broadcast.py

from bluepy import btle
import struct

class ScanDelegate(btle.DefaultDelegate):
    #コンストラクタ
    def __init__(self, macaddrs):
        btle.DefaultDelegate.__init__(self)
        #センサデータ保持用変数
        self.macaddr = macaddrs
        self.sensorValue = dict()
        for mac in macaddrs:
            self.sensorValue[mac] = {'SensorType':'None'}

    # スキャンハンドラー
    def handleDiscovery(self, dev, isNewDev, isNewData):  
        # 新しいデバイスが見つかったら
        if isNewDev or isNewData:  
            # ターゲットのMACアドレス(環境センサ)であるか
            # print('dev.addr in self.macaddr ->', dev.addr, self.macaddr, dev.addr in self.macaddr)
            if dev.addr in self.macaddr:
                # アドバタイズデータを取り出し
                for (adtype, desc, value) in dev.getScanData():  
                    # 環境センサのとき、データ取り出しを実行
                    # まずWxBeacon2/Omronの場合
                    if desc == 'Manufacturer' and value[0:4] == 'd502':
                        #センサの種類(EP or IM)を取り出し
                        sensorType = dev.scanData[dev.SHORT_LOCAL_NAME].decode(encoding='utf-8')
                        #EPのときのセンサデータ取り出し
                        if sensorType == 'EP':
                            self.decodeSensorData_EP(value, dev.addr)
                        #IMのときのセンサデータ取り出し
                        if sensorType == 'IM':
                            self.decodeSensorData_IM(value, dev.addr)
                    # SwitchBotの温湿度計の場合
                    else:
                        for (adtype, desc, value) in dev.getScanData():  
                            #環境センサのとき、データ取り出しを実行
                            if desc == '16b Service Data':
                                #センサデータ取り出し
                                self._decodeSensorData(value, dev.addr)

    # センサデータを取り出してdict形式に変換(EPモード時)
    def decodeSensorData_EP(self, valueStr, macaddr):
        #文字列からセンサデータ(6文字目以降)のみ取り出し、バイナリに変換
        valueBinary = bytes.fromhex(valueStr[6:])
        #バイナリ形式のセンサデータを整数型Tapleに変換
        (temp, humid, light, uv, press, noise, discomf, wbgt, rfu, batt) = struct.unpack('<hhhhhhhhhB', valueBinary)
        #単位変換した上でdict型に格納
        self.sensorValue[macaddr] = {
            'SensorType': 'EP',
            'Temperature': temp / 100,
            'Humidity': humid / 100,
            'Light': light,
            'UV': uv / 100,
            'Pressure': press / 10,
            'Noise': noise / 100,
            'Discomfort': discomf / 100,
            'WBGT': wbgt / 100,
            'BatteryVoltage': (batt + 100) / 100
        }

    # センサデータを取り出してdict形式に変換(IMモード時)
    def decodeSensorData_IM(self, valueStr, macaddr):
        #文字列からセンサデータ(6文字目以降)のみ取り出し、バイナリに変換
        valueBinary = bytes.fromhex(valueStr[6:])
        #バイナリ形式のセンサデータを整数型Tapleに変換
        (temp, humid, light, uv, press, noise, accelX, accelY, accelZ, batt) = struct.unpack('<hhhhhhhhhB', valueBinary)
        #単位変換した上でdict型に格納
        self.sensorValue[macaddr] = {
            'SensorType': 'IM',
            'Temperature': temp / 100,
            'Humidity': humid / 100,
            'Light': light,
            'UV': uv / 100,
            'Pressure': press / 10,
            'Noise': noise / 100,
            'AccelerationX': accelX / 10,
            'AccelerationY': accelY / 10,
            'AccelerationZ': accelZ / 10,
            'BatteryVoltage': (batt + 100) / 100
        }
    
    # センサデータを取り出してdict形式に変換
    def _decodeSensorData(self, valueStr, macaddr):
        #文字列からセンサデータ(4文字目以降)のみ取り出し、バイナリに変換
        valueBinary = bytes.fromhex(valueStr[4:])
        #バイナリ形式のセンサデータを数値に変換
        batt = valueBinary[2] & 0b01111111
        isTemperatureAboveFreezing = valueBinary[4] & 0b10000000
        temp = ( valueBinary[3] & 0b00001111 ) / 10 + ( valueBinary[4] & 0b01111111 )
        if not isTemperatureAboveFreezing:
            temp = -temp
        humid = valueBinary[5] & 0b01111111
        #dict型に格納
        self.sensorValue[macaddr] = {
            'SensorType': 'SwitchBot',
            'Temperature': temp,
            'Humidity': humid,
            'BatteryVoltage': batt
        }

「bluetoothsensor.py」は、メインルーチンで取得したいデバイスのMACアドレスをリスト型で列挙しておく。取得データの処理の際はMACアドレスをキーに、まずはSensorTypeの格納している値を確認しNoneだと取れていない、ほかの値だとセンサーごとの処理に分離させる。今はrrdtoolに突っ込むのに便利な表示にしている。
(よく考えるとScanDelegateの方で取得時間も返すのに含めておかないタイムアウトの秒数の間のどこで取得したか分からないな)

bluetoothsensor.py
#!/usr/bin/python3
from bluepy import btle
from env_broadcast import ScanDelegate
import time

while True:
    # 収集したいセンサのMACアドレスをリスト型で列挙(MACアドレスは小文字で記載)
    sensor_mac_list = ["c9:43:c3:XX:XX:XX","c5:d7:b1:XX:XX:XX"]

    # センサ値取得デリゲートを、スキャン時実行に設定
    scanner = btle.Scanner().withDelegate(ScanDelegate(sensor_mac_list))
    #スキャンしてセンサ値取得(タイムアウト10秒。10秒間で見つかったbluetoothデバイス全てにチェックを実施)
    scanner.scan(10.0)
    
    # 取得した値一覧
    # print(scanner.delegate.sensorValue)

    if scanner.delegate.sensorValue is not None:
        for macaddr in sensor_mac_list:
            if scanner.delegate.sensorValue[macaddr]['SensorType'] == 'EP':
                print("update /XXXX/RRD/wxbeacon2.rrd "
                    + str(int(time.time())) + ':'
                    + str(scanner.delegate.sensorValue[macaddr]['BatteryVoltage']) + ':'
                    + str(scanner.delegate.sensorValue[macaddr]['Temperature']) + ':'
                    + str(scanner.delegate.sensorValue[macaddr]['Humidity']) + ':'
                    + str(scanner.delegate.sensorValue[macaddr]['Light']) + ':'
                    + str(scanner.delegate.sensorValue[macaddr]['UV']) + ':'
                    + str(scanner.delegate.sensorValue[macaddr]['Pressure']) + ':'
                    + str(scanner.delegate.sensorValue[macaddr]['Noise']) + ':'
                    + str(scanner.delegate.sensorValue[macaddr]['Discomfort']) + ':'
                    + str(scanner.delegate.sensorValue[macaddr]['WBGT']), flush=True)
            elif scanner.delegate.sensorValue[macaddr]['SensorType'] == 'IM':
print("update /XXXX/NoName.rrd " + str(int(time.time())) + ':' + str(scanner.delegate.sensorValue[macaddr]['BatteryVoltage']) + ':' + str(scanner.delegate.sensorValue[macaddr]['Temperature']) + ':' + str(scanner.delegate.sensorValue[macaddr]['Humidity']) + ':' + str(scanner.delegate.sensorValue[macaddr]['Light']) + ':' + str(scanner.delegate.sensorValue[macaddr]['UV']) + ':' + str(scanner.delegate.sensorValue[macaddr]['Pressure']) + ':' + str(scanner.delegate.sensorValue[macaddr]['Noise']) + ':' + str(scanner.delegate.sensorValue[macaddr]['AccelerationX']) + ':' + str(scanner.delegate.sensorValue[macaddr]['AccelerationY']) + ':' + str(scanner.delegate.sensorValue[macaddr]['AccelerationZ']) + ':' + str(scanner.delegate.sensorValue[macaddr]['AccelerationX']), flush=True) elif scanner.delegate.sensorValue[macaddr]['SensorType'] == 'SwitchBot':
print('update /XXXX/RRD/swbot.rrd ' + str(int(time.time())) + ':' + str(scanner.delegate.sensorValue[macaddr]['Temperature']) + ':' + str(scanner.delegate.sensorValue[macaddr]['Humidity']) + ':' + str(scanner.delegate.sensorValue[macaddr]['BatteryVoltage']), flush=True) elif scanner.delegate.sensorValue.get[macaddr]['SensorType'] == 'None':
# 時間内にデバイスが見つからなかった pass else: # 想定外 print(scanner.delegate.sensorValue) time.sleep(50)

実行結果は下記の感じなので、rrdtoolが動いているサーバには「./bluetoothsensor.py |nc XXX.XXX.XXX.XXX 42217 >/dev/null &」な感じで飛ばしている。

$ ./bluetoothsensor.py
update /XXXX/RRD/swbot.rrd 1629028238:26.4:66:100
update /XXXX/RRD/wxbeacon2.rrd 1629028238:2.91:19.63:100.0:0:0.01:1010.1:31.9:67.33:23.03

0 件のコメント:

コメントを投稿