From 4f5e76a1ce5861ce45270a16328a1ee73d8199e3 Mon Sep 17 00:00:00 2001 From: Dennis Eisold <de@itstall.de> Date: Tue, 28 Jan 2020 08:16:33 +0100 Subject: [PATCH] MQTT working and debug added --- MqttClient.h | 141 +++++++++++++++++++++++++++++----------------- backend.cpp | 34 ++++++++--- compatibility.h | 7 --- dbSqlite.h | 12 ++-- openweathermap.h | 22 ++++---- weatherstation.db | Bin 552960 -> 552960 bytes 6 files changed, 135 insertions(+), 81 deletions(-) delete mode 100644 compatibility.h diff --git a/MqttClient.h b/MqttClient.h index e30bcfb..dcc37d4 100644 --- a/MqttClient.h +++ b/MqttClient.h @@ -1,60 +1,99 @@ #pragma once -#include <mosquittopp.h> +#include <iostream> #include <cstring> #include <cstdio> +#include <mosquittopp.h> +#include "dbSqlite.h" #define MAX_PAYLOAD 50 -#define DEFAULT_KEEP_ALIVE 60 -#define PUBLISH_TOPIC "EXAMPLE_TOPIC" +#define DEBUG true class MqttClient : public mosqpp::mosquittopp { +private: + const char* host; + const char* id; + const char* user; + const char* pass; + int port; + int keepalive; + dbSqlite* db; + public: - MqttClient(const char* id, const char* host, int port) : mosquittopp(id) { - int keepalive = DEFAULT_KEEP_ALIVE; - connect(host, port, keepalive); - } - - ~MqttClient() {} - - void on_connect(int rc) { - if (!rc) std::cout << "Connected - code " << rc << std::endl; - } - - void on_subscribe(int mid, int qos_count, const int* granted_qos){ - std::cout << "Subscription succeeded." << std::endl; - } - - void on_message(const struct mosquitto_message* message) { - int payload_size = MAX_PAYLOAD + 1; - char buf[payload_size]; - - if (!strcmp(message->topic, PUBLISH_TOPIC)) { - memset(buf, 0, payload_size * sizeof(char)); - - /* Copy N-1 bytes to ensure always 0 terminated. */ - memcpy(buf, message->payload, MAX_PAYLOAD * sizeof(char)); - - std::cout << buf << std::endl; - - // Examples of messages for M2M communications... - if (!strcmp(buf, "STATUS")) { - snprintf(buf, payload_size, "This is a Status Message..."); - publish(NULL, PUBLISH_TOPIC, strlen(buf), buf); - std::cout << "Status Request Recieved." << std::endl; - } - - if (!strcmp(buf, "ON")) { - snprintf(buf, payload_size, "Turning on..."); - publish(NULL, PUBLISH_TOPIC, strlen(buf), buf); - std::cout << "Request to turn on." << std::endl; - } - - if (!strcmp(buf, "OFF")) { - snprintf(buf, payload_size, "Turning off..."); - publish(NULL, PUBLISH_TOPIC, strlen(buf), buf); - std::cout << "Request to turn off." << std::endl; - } - } - } -} \ No newline at end of file + MqttClient(const char* id, const char* host, int port, const char* user, const char* pass, const char* topic) : mosquittopp(id) { + if(DEBUG) std::cout << "MqttClient::MqttClient()" << std::endl; + db = new dbSqlite(); + this->keepalive = 60; + this->id = id; + this->port = db->getSettings().mqtt_port; + this->host = db->getSettings().mqtt_host.c_str(); + this->user = db->getSettings().mqtt_user.c_str(); + this->pass = db->getSettings().mqtt_pass.c_str(); + // Benutzername und Passwort setzen + username_pw_set(user, pass); + // Asynchrone Verbindung aufbauen + connect(host, port, keepalive); + // Subscribe zu Backend Topic + subscribe(NULL, db->getSettings().mqtt_topic_backend.c_str()); + // Startet den Thread + loop_start(); + } + + ~MqttClient() { + // Stoppt den Thread + loop_stop(); + // Mosuqitto library cleanup + mosqpp::lib_cleanup(); + } + + void on_connect(int rc) { + if (DEBUG) std::cout << "MqttClient::on_connect()" << std::endl; + if (!rc) std::cout << "Connected - code " << rc << std::endl; + } + + void on_subscribe(int mid, int qos_count, const int* granted_qos) { + if (DEBUG) std::cout << "MqttClient::on_subscribe()" << std::endl; + std::cout << "Subscription succeeded." << std::endl; + } + + bool send_message(const char* message, const char* topic) { + // Send message - depending on QoS, mosquitto lib managed re-submission this the thread + // + // * NULL : Message Id (int *) this allow to latter get status of each message + // * topic : topic to be used + // * lenght of the message + // * message + // * qos (0,1,2) + // * retain (boolean) - indicates if message is retained on broker or not + // Should return MOSQ_ERR_SUCCESS + int ret = publish(NULL, topic, sizeof(message), message, db->getSettings().mqtt_qos, true); + return (ret == MOSQ_ERR_SUCCESS); + } + + void on_publish(int mid){ + if(DEBUG) std::cout << "MqttClient::on_publish(" << mid << ") published " << std::endl; + } + + void on_message(const mosquitto_message* message) { + if (DEBUG) std::cout << "MqttClient::on_message()" << " received message of topic: " << message->topic << " Data: " << reinterpret_cast<char*>(message->payload) << std::endl; + const char payload_size = MAX_PAYLOAD + 1; + char buf[payload_size]; + + if (!strcmp(message->topic, db->getSettings().mqtt_topic_backend.c_str())) { + memset(buf, 0, payload_size * sizeof(char)); + + /* Copy N-1 bytes to ensure always 0 terminated. */ + memcpy(buf, message->payload, MAX_PAYLOAD * sizeof(char)); + + std::string str(buf); + + if (str.find("Request_") != std::string::npos) { + std::cout << "MqttClient::on_message() - Request found" << std::endl; + std::string client = str.substr(str.find("_") + 1); + std::string message = "Client " + client + " requested update"; + snprintf(buf, payload_size, message.c_str()); + publish(NULL, db->getSettings().mqtt_topic_frontend.c_str(), strlen(buf), buf); + } + } + } +}; \ No newline at end of file diff --git a/backend.cpp b/backend.cpp index 310bb33..f3deedb 100644 --- a/backend.cpp +++ b/backend.cpp @@ -1,19 +1,20 @@ #include <iostream> #include <thread> #include <vector> -#include "compatibility.h" +#include <chrono> +#include <ctime> #include "openweathermap.h" -#include "dbSqlite.h" #include "MqttClient.h" +#include "dbSqlite.h" -using namespace std; +#define DEBUG true // Database object dbSqlite* db; // Thread for openweathermap:getWeather void schedulerWeather(int time) { - std::cout << "Wetterstation::schedulerWeather" << std::endl; + if (DEBUG) std::cout << "Wetterstation::schedulerWeather()" << std::endl; while (1) { openweathermap owmw; @@ -24,7 +25,7 @@ void schedulerWeather(int time) { // Thread for openweathermap:getForecast void schedulerForecast(int time) { - std::cout << "Wetterstation::schedulerForecast" << std::endl; + if (DEBUG) std::cout << "Wetterstation::schedulerForecast()" << std::endl; while (1) { openweathermap owmw; @@ -33,9 +34,19 @@ void schedulerForecast(int time) { } } +void mqttStart() { + if (DEBUG) std::cout << "Wetterstation::mqttStart()" << std::endl; + + mosqpp::lib_init(); + + MqttClient* mqttClient = new MqttClient("Wetterstation", db->getSettings().mqtt_host.c_str(), db->getSettings().mqtt_port, db->getSettings().mqtt_user.c_str(), db->getSettings().mqtt_pass.c_str(), db->getSettings().mqtt_topic_backend.c_str()); + + mosqpp::lib_cleanup(); +} + // Main method int main() { - std::cout << "Wetterstation::main" << std::endl; + if (DEBUG) std::cout << "Wetterstation::main()" << std::endl; // Sockets aktivieren WSADATA wsaData; @@ -44,14 +55,19 @@ int main() { printf("WSAStartup failed: %d\n", iResult); return 1; } + + // Initialisieren der Datenbank db = new dbSqlite(); + // Thread für getWeather starten std::thread thrWeather{ schedulerWeather, 60000 }; - csleep(50); + std::this_thread::sleep_for(std::chrono::milliseconds(50)); + // Thread für getForecast starten std::thread thrForecast{ schedulerForecast, 900000 }; - csleep(100); + std::this_thread::sleep_for(std::chrono::milliseconds(50)); - //mqttStart(); + // MQTT starten + mqttStart(); std::vector<WeatherConditions> cond = db->getConditions(); diff --git a/compatibility.h b/compatibility.h deleted file mode 100644 index e776f42..0000000 --- a/compatibility.h +++ /dev/null @@ -1,7 +0,0 @@ -void csleep(int time) { -#if defined(WIN32) || defined(WIN64) - Sleep(time); -#else - usleep((long) time); -#endif -} \ No newline at end of file diff --git a/dbSqlite.h b/dbSqlite.h index 19d439c..c319d27 100644 --- a/dbSqlite.h +++ b/dbSqlite.h @@ -11,6 +11,8 @@ using Poco::Data::Statement; using std::string; using std::cout; +#define DEBUG false + struct WeatherConditions { int condition_id; std::string main; @@ -33,6 +35,7 @@ struct Settings { std::string mqtt_topic_backend; int mqtt_qos; std::string mqtt_timeout; + int mqtt_port; }; class dbSqlite { @@ -44,7 +47,7 @@ public: string dbFile; dbSqlite() { - std::cout << "sbSqlite()" << std::endl; + if (DEBUG) std::cout << "sbSqlite()" << std::endl; Poco::Data::SQLite::Connector::registerConnector(); this->dbFile = "weatherstation.db"; this->weatherConditions = this->queryConditions(); @@ -52,7 +55,7 @@ public: }; Settings getSettings() { - std::cout << "dbSqlite::getSettings()" << std::endl; + if (DEBUG) std::cout << "dbSqlite::getSettings()" << std::endl; return this->settings; } @@ -61,7 +64,7 @@ public: } void querySettings() { - std::cout << "dbSqlite::querySettings()" << std::endl; + if (DEBUG) std::cout << "dbSqlite::querySettings()" << std::endl; Session session("SQLite", this->dbFile); Statement select(session); @@ -81,12 +84,13 @@ public: into(this->settings.mqtt_topic_backend), into(this->settings.mqtt_qos), into(this->settings.mqtt_timeout), + into(this->settings.mqtt_port), range(0, 1); // iterate over result set one row at a time select.execute(); } std::vector<WeatherConditions> queryConditions() { - std::cout << "dbSqlite::queryConditions()" << std::endl; + if (DEBUG) std::cout << "dbSqlite::queryConditions()" << std::endl; Session session("SQLite", this->dbFile); std::vector<WeatherConditions> result; WeatherConditions wc; diff --git a/openweathermap.h b/openweathermap.h index 7920267..2b1fd6a 100644 --- a/openweathermap.h +++ b/openweathermap.h @@ -19,6 +19,8 @@ #include "dbSqlite.h" #include "json.h" +#define DEBUG false + struct weatherData { std::string plz; std::string lngCode; @@ -52,12 +54,12 @@ private: dbSqlite* db; public: openweathermap() { - std::cout << "openweathermap::openweathermap()" << std::endl; + if (DEBUG) std::cout << "openweathermap::openweathermap()" << std::endl; db = new dbSqlite(); } std::string str_tolower(std::string s) { - std::cout << "openweathermap::str_tolower()" << std::endl; + if (DEBUG) std::cout << "openweathermap::str_tolower()" << std::endl; std::transform(s.begin(), s.end(), s.begin(), [](unsigned char c) { return std::tolower(c); } ); @@ -65,17 +67,17 @@ public: } weatherData getSWeather() { - std::cout << "openweathermap::getSWeather()" << std::endl; + if (DEBUG) std::cout << "openweathermap::getSWeather()" << std::endl; return this->sWeather; } std::vector<weatherData> getSForecast() { - std::cout << "openweathermap::getSForecast()" << std::endl; + if (DEBUG) std::cout << "openweathermap::getSForecast()" << std::endl; return this->vecForecast; } void setSWeather(json::JSON jObj, std::string plz, std::string lngCode) { - std::cout << "openweathermap::setSWeather(" << plz << ")" << std::endl; + if (DEBUG) std::cout << "openweathermap::setSWeather(" << plz << ")" << std::endl; this->sWeather.plz = plz; this->sWeather.lngCode = lngCode; this->sWeather.visibility = jObj["visibility"].ToInt(); @@ -98,7 +100,7 @@ public: } void setSForecast(json::JSON jObj, std::string plz, std::string lngCode) { - std::cout << "openweathermap::setSForecast(" << plz << ")" << std::endl; + if (DEBUG) std::cout << "openweathermap::setSForecast(" << plz << ")" << std::endl; for (int i = 0; i < jObj["cnt"].ToInt(); i++) { weatherData item; item.plz = plz; @@ -126,7 +128,7 @@ public: } std::string getResponse(std::string url) { - std::cout << "openweathermap::getResponse()" << std::endl; + if (DEBUG) std::cout << "openweathermap::getResponse()" << std::endl; try { // prepare session Poco::URI uri(url); @@ -157,11 +159,11 @@ public: } void getWeather(std::string plz, std::string lngCode) { - std::cout << "openweathermap::getWeather(" << plz << ", " << lngCode << ")" << std::endl; + if (DEBUG) std::cout << "openweathermap::getWeather(" << plz << ", " << lngCode << ")" << std::endl; lngCode = str_tolower(lngCode); this->setSWeather(json::JSON::Load(this->getResponse("http://api.openweathermap.org/data/2.5/weather?zip=" + plz + "," + lngCode + "&appid=" + db->getSettings().owm_appid + "&mode=json&units=metric&lang=de")), plz, lngCode); - std::cout << "owm_appid: " << db->getSettings().owm_appid << std::endl; + if (DEBUG) std::cout << "owm_appid: " << db->getSettings().owm_appid << std::endl; influxdb_cpp::server_info si(db->getSettings().influx_host, db->getSettings().influx_port, db->getSettings().influx_db, db->getSettings().influx_user, db->getSettings().influx_pass); influxdb_cpp::builder() @@ -189,7 +191,7 @@ public: } void getForecast(std::string plz, std::string lngCode) { - std::cout << "openweathermap::getForecast(" << plz << ", " << lngCode << ")" << std::endl; + if (DEBUG) std::cout << "openweathermap::getForecast(" << plz << ", " << lngCode << ")" << std::endl; lngCode = str_tolower(lngCode); this->setSForecast(json::JSON::Load(this->getResponse("http://api.openweathermap.org/data/2.5/forecase?zip=" + plz + "," + lngCode + "&appid=" + db->getSettings().owm_appid + "&mode=json&units=metric&lang=de")), plz, lngCode); } diff --git a/weatherstation.db b/weatherstation.db index a8ae43969964530748227b2ae3656c445b4e2a2a..0480848860acbce2f325143c95034c0a5b63b980 100644 GIT binary patch delta 191 zcmZp8pxE$0ae}m<90LPGI}pPF??fGAVL1jpjZ9vk5Hn9R1AjQ*Q=aDSf(aagJX|gD z%<ST_vP_-)o9FO&F!Ads<QA5c#24fjl_+H9l{7!$Yk$JW2*gZ4%nZaVK+FonY(UJu z{Rtn3QXq>a1HUH7JOTcR0X*V&eZ?C?7zKUB1&x{1)z!7t)oragC7Gt1<Z~qRgG|@U WEGaHY%*oM9Nu9nipTnPtJsJR)r80s5 delta 178 zcmZp8pxE$0ae}m<Gy?-eI}pPF&qN(#VQB_EjZ9vk5Hrst2L5oqr#zFk3np+V@Nl&R zFtdxx$})8nZtmsrU}}EC*Zzc$5r~<9m>Gy!fS47C*?^dR`x8D6r9c)%27X15eo=l= z28PBMMgd=OL1QL$b#+H|bz5sr$?3NF9LZ88$pu#W`niQAC3=}9#U+V3IeICnR)!W9 L#?#m5bNB-Q!ND!I -- GitLab