MQTT: Add example to README and Python test client (#495)
This commit is contained in:
parent
df51115aa6
commit
7f54a2c09b
2 changed files with 108 additions and 0 deletions
|
@ -173,6 +173,7 @@ Examples:
|
|||
| `rtl_433 -a -t` | Will run in analyze mode and save a test file per detected signal (gfile###.data). Format is uint8, 2 channels.
|
||||
| `rtl_433 -r file_name` | Play back a saved data file.
|
||||
| `rtl_433 file_name` | Will save everything received from the rtl-sdr during the session into a single file. The saves file may become quite large depending on how long rtl_433 is left running. Note: saving signals into individual files wint `rtl_433 -a -t` is preferred.
|
||||
| `rtl_433 -F json -U | mosquitto_pub -t home/rtl_433 -l` | Will pipe the output to network as JSON formatted MQTT messages. A test MQTT client can be found in `tests/mqtt_rtl_433_test.py`.
|
||||
|
||||
This software is mostly useable for developers right now.
|
||||
|
||||
|
|
107
tests/mqtt_rtl_433_test.py
Executable file
107
tests/mqtt_rtl_433_test.py
Executable file
|
@ -0,0 +1,107 @@
|
|||
#!/usr/bin/env python3
|
||||
""" MQTT test client for receiving rtl_433 JSON data
|
||||
|
||||
Example program for receiving and parsing sensor data from rtl_433 sent
|
||||
as MQTT network messages. Recommended way of sending rtl_433 data on network is:
|
||||
|
||||
$ rtl_433 -F json -U | mosquitto_pub -t home/rtl_433 -l
|
||||
|
||||
An MQTT broker e.g. 'mosquitto' must be running on local compputer
|
||||
|
||||
Copyright (C) 2017 Tommy Vestermark
|
||||
This program is free software; you can redistribute it and/or modify
|
||||
it under the terms of the GNU General Public License as published by
|
||||
the Free Software Foundation; either version 2 of the License, or
|
||||
(at your option) any later version.
|
||||
"""
|
||||
|
||||
import multiprocessing as mp
|
||||
import logging
|
||||
import paho.mqtt.client as mqtt
|
||||
import json
|
||||
import sys, os
|
||||
import time, datetime
|
||||
|
||||
MQTT_SERVER = "127.0.0.1"
|
||||
MQTT_TOPIC_PREFIX = "home/rtl_433"
|
||||
TIMEOUT_STALE_SENSOR = 600 # Seconds before showing a timeout indicator
|
||||
|
||||
#log = logging.getLogger() # Single process logger
|
||||
log = mp.log_to_stderr() # Multiprocessing capable logger
|
||||
mqtt_client = mqtt.Client("RTL_433_Test")
|
||||
|
||||
sensor_state = dict() # Dictionary containing accumulated sensor state
|
||||
|
||||
|
||||
def print_sensor_state():
|
||||
""" Print accumulated sensor state """
|
||||
time_now = datetime.datetime.utcnow().replace(microsecond=0)
|
||||
print("\nUpdate per {} UTC".format(time_now.isoformat(sep=' ')))
|
||||
for model in sensor_state:
|
||||
print(model)
|
||||
for ID in sensor_state[model]:
|
||||
data = sensor_state[model][ID]['data'].copy()
|
||||
timestamp = data.pop('time')
|
||||
timedelta = (time_now-timestamp).total_seconds()
|
||||
indicator = "*" if (timedelta < 2) else "~" if (timedelta > TIMEOUT_STALE_SENSOR) else " " # Indicator for new and stale data
|
||||
print(" ID {:5} {}{} {}".format(ID, timestamp.isoformat(sep=' '), indicator, data))
|
||||
sys.stdout.flush() # Print in real-time
|
||||
|
||||
|
||||
def on_connect(client, userdata, rc):
|
||||
""" Callback for when the client receives a CONNACK response from the server. """
|
||||
log.info("MQTT Connection: " + mqtt.connack_string(rc))
|
||||
if rc != 0:
|
||||
log.error("Could not connect. RC: " + str(rc))
|
||||
exit()
|
||||
# Subscribing in on_connect() means that if we lose the connection and reconnect then subscriptions will be renewed.
|
||||
client.subscribe(MQTT_TOPIC_PREFIX + "/#")
|
||||
|
||||
|
||||
def on_disconnect(client, userdata, rc):
|
||||
if rc != 0:
|
||||
log.error("Unexpected disconnection. RC: " + str(rc))
|
||||
|
||||
|
||||
def on_message(client, userdata, msg):
|
||||
""" Callback for when a PUBLISH message is received from the server. """
|
||||
if msg.topic.startswith(MQTT_TOPIC_PREFIX):
|
||||
try:
|
||||
# Decode JSON payload
|
||||
d = json.loads(msg.payload.decode())
|
||||
except:
|
||||
log.warning("JSON decode error: " + msg.payload.decode())
|
||||
return
|
||||
|
||||
# Convert time string to datetime object
|
||||
time_str = d.get('time', "0000-00-00 00:00:00")
|
||||
time_utc = datetime.datetime.strptime(time_str, "%Y-%m-%d %H:%M:%S")
|
||||
d['time'] = time_utc
|
||||
# Update sensor_state
|
||||
sensor_model = d.pop('model', 'unknown')
|
||||
sensor_id = d.pop('id', 0)
|
||||
sensor_state.setdefault(sensor_model,{}).setdefault(sensor_id,{})['data'] = d
|
||||
print_sensor_state()
|
||||
else:
|
||||
log.info("Uknown topic: " + msg.topic + "\t" + msg.payload.decode())
|
||||
|
||||
|
||||
# Setup MQTT client
|
||||
mqtt_client.on_connect = on_connect
|
||||
mqtt_client.on_disconnect = on_disconnect
|
||||
mqtt_client.on_message = on_message
|
||||
mqtt_client.connect(MQTT_SERVER)
|
||||
mqtt_client.loop_start()
|
||||
|
||||
|
||||
def main():
|
||||
"""MQTT Test Client"""
|
||||
log.setLevel(logging.INFO)
|
||||
log.info("MQTT RTL_433 Test Client")
|
||||
|
||||
while True:
|
||||
time.sleep(1)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
Loading…
Add table
Add a link
Reference in a new issue