diff --git a/mqtt2influx.py b/mqtt2influx.py new file mode 100644 index 0000000000000000000000000000000000000000..a536888a46693a0459bcf89d9eb7402fa4d68dc5 --- /dev/null +++ b/mqtt2influx.py @@ -0,0 +1,92 @@ +#!/usr/bin/python3 +############################################### +# Copyright by Dennis Eisold 2020 # +# Author: Dennis Eisold # +# Created: 27.01.2020 # +############################################### + +import json +import time +import paho.mqtt.client as mqtt +from decimal import Decimal +from influxdb import InfluxDBClient + +try: + with open('config.json') as f: + config = json.load(f) + print("config loaded") +except KeyError: + print("No config file found") + exit() + +config_set = "mqtt_to_influx" +mqtt_server = config[config_set]['mqtt_setting'] +influx_server = config[config_set]['influxdb_setting'] + +def iterate(dictionary): + for key, value in dictionary.items(): + if isinstance(value, dict): + iterate(value) + continue + print('key {!r} -> value {!r}'.format(key, value)) + +def on_connect(client, userdata, flags, rc): + client.subscribe(config[mqtt_server]['subscribe']) + +def on_message(client, userdata, msg): + # Use utc as timestamp + loaded_json = json.loads(msg.payload.decode("utf-8")) + isfloatValue=False + + if("wetterstation" in msg.topic and "SENSOR" in msg.topic): + print("write to influxDB: wetterstation") + print("Sensor: " + msg.topic.split("/wetterstation/")[1].split("/SENSOR")[0]) + + for key in loaded_json.keys(): + if("PressureUnit" not in key and "TempUnit" not in key and "Time" not in key): + print("Key: " + key) + + json_body = {} + json_body["measurement"] = "Sensor" + json_body["tags"] = {} + json_body["tags"]["client"] = msg.topic.split("/wetterstation/")[1].split("/SENSOR")[0] + json_body["tags"]["sensor"] = key + json_body["fields"] = {} + if ("Temperature" in loaded_json[key]): + json_body["fields"]["temperature"] = loaded_json[key]["Temperature"] + if ("Humidity" in loaded_json[key]): + json_body["fields"]["humidity"] = loaded_json[key]["Humidity"] + if ("Pressure" in loaded_json[key]): + json_body["fields"]["pressure"] = loaded_json[key]["Pressure"] + + print(json.dumps(json_body)) + dbclient.write_points(json_body) + +# Set up a client for InfluxDB +print("influx: connect to Server: "+ config[influx_server]['host']) +dbclient = InfluxDBClient(config[influx_server]['host'], config[influx_server]['port'], config[influx_server]['username'], config[influx_server]['password'], config[influx_server]['database']) +print("influx: connected") + +# Initialize the MQTT client that should connect to the Mosquitto broker +if(mqtt_server != 0): + print("MQTT: connect to Server: " + config[mqtt_server]['host']) + client = mqtt.Client(client_id="", clean_session=True, protocol=eval("mqtt.MQTTv311")) + client.on_connect = on_connect + client.on_message = on_message + + connOK = False + while(connOK == False): + try: + client.connect(config[mqtt_server]['host'], config[mqtt_server]['port'], config[mqtt_server]['timeout']) + if config[mqtt_server]['username'] is not 0 and config[mqtt_server]['password'] is not 0: + client.username_pw_set(config[mqtt_server]['username'], config[mqtt_server]['password']) + print("MQTT: connected") + connOK = True + except: + connOK = False + time.sleep(2) + + # Blocking loop to the Mosquitto broker + client.loop_forever() +else: + print("No MQTT Server") \ No newline at end of file