Skip to content
Snippets Groups Projects
Commit 4f5e76a1 authored by Dennis Eisold's avatar Dennis Eisold
Browse files

MQTT working and debug added

parent 38bc0369
No related branches found
No related tags found
No related merge requests found
#pragma once
#include <mosquittopp.h>
#include <iostream>
#include <cstring>
#include <cstdio>
#include <mosquittopp.h>
#include "dbSqlite.h"
#define MAX_PAYLOAD 50
#define DEFAULT_KEEP_ALIVE 60
#define PUBLISH_TOPIC "EXAMPLE_TOPIC"
#define DEBUG true
class MqttClient : public mosqpp::mosquittopp {
private:
const char* host;
const char* id;
const char* user;
const char* pass;
int port;
int keepalive;
dbSqlite* db;
public:
MqttClient(const char* id, const char* host, int port) : mosquittopp(id) {
int keepalive = DEFAULT_KEEP_ALIVE;
connect(host, port, keepalive);
}
~MqttClient() {}
void on_connect(int rc) {
if (!rc) std::cout << "Connected - code " << rc << std::endl;
}
void on_subscribe(int mid, int qos_count, const int* granted_qos){
std::cout << "Subscription succeeded." << std::endl;
}
void on_message(const struct mosquitto_message* message) {
int payload_size = MAX_PAYLOAD + 1;
char buf[payload_size];
if (!strcmp(message->topic, PUBLISH_TOPIC)) {
memset(buf, 0, payload_size * sizeof(char));
/* Copy N-1 bytes to ensure always 0 terminated. */
memcpy(buf, message->payload, MAX_PAYLOAD * sizeof(char));
std::cout << buf << std::endl;
// Examples of messages for M2M communications...
if (!strcmp(buf, "STATUS")) {
snprintf(buf, payload_size, "This is a Status Message...");
publish(NULL, PUBLISH_TOPIC, strlen(buf), buf);
std::cout << "Status Request Recieved." << std::endl;
}
if (!strcmp(buf, "ON")) {
snprintf(buf, payload_size, "Turning on...");
publish(NULL, PUBLISH_TOPIC, strlen(buf), buf);
std::cout << "Request to turn on." << std::endl;
}
if (!strcmp(buf, "OFF")) {
snprintf(buf, payload_size, "Turning off...");
publish(NULL, PUBLISH_TOPIC, strlen(buf), buf);
std::cout << "Request to turn off." << std::endl;
}
}
}
}
\ No newline at end of file
MqttClient(const char* id, const char* host, int port, const char* user, const char* pass, const char* topic) : mosquittopp(id) {
if(DEBUG) std::cout << "MqttClient::MqttClient()" << std::endl;
db = new dbSqlite();
this->keepalive = 60;
this->id = id;
this->port = db->getSettings().mqtt_port;
this->host = db->getSettings().mqtt_host.c_str();
this->user = db->getSettings().mqtt_user.c_str();
this->pass = db->getSettings().mqtt_pass.c_str();
// Benutzername und Passwort setzen
username_pw_set(user, pass);
// Asynchrone Verbindung aufbauen
connect(host, port, keepalive);
// Subscribe zu Backend Topic
subscribe(NULL, db->getSettings().mqtt_topic_backend.c_str());
// Startet den Thread
loop_start();
}
~MqttClient() {
// Stoppt den Thread
loop_stop();
// Mosuqitto library cleanup
mosqpp::lib_cleanup();
}
void on_connect(int rc) {
if (DEBUG) std::cout << "MqttClient::on_connect()" << std::endl;
if (!rc) std::cout << "Connected - code " << rc << std::endl;
}
void on_subscribe(int mid, int qos_count, const int* granted_qos) {
if (DEBUG) std::cout << "MqttClient::on_subscribe()" << std::endl;
std::cout << "Subscription succeeded." << std::endl;
}
bool send_message(const char* message, const char* topic) {
// Send message - depending on QoS, mosquitto lib managed re-submission this the thread
//
// * NULL : Message Id (int *) this allow to latter get status of each message
// * topic : topic to be used
// * lenght of the message
// * message
// * qos (0,1,2)
// * retain (boolean) - indicates if message is retained on broker or not
// Should return MOSQ_ERR_SUCCESS
int ret = publish(NULL, topic, sizeof(message), message, db->getSettings().mqtt_qos, true);
return (ret == MOSQ_ERR_SUCCESS);
}
void on_publish(int mid){
if(DEBUG) std::cout << "MqttClient::on_publish(" << mid << ") published " << std::endl;
}
void on_message(const mosquitto_message* message) {
if (DEBUG) std::cout << "MqttClient::on_message()" << " received message of topic: " << message->topic << " Data: " << reinterpret_cast<char*>(message->payload) << std::endl;
const char payload_size = MAX_PAYLOAD + 1;
char buf[payload_size];
if (!strcmp(message->topic, db->getSettings().mqtt_topic_backend.c_str())) {
memset(buf, 0, payload_size * sizeof(char));
/* Copy N-1 bytes to ensure always 0 terminated. */
memcpy(buf, message->payload, MAX_PAYLOAD * sizeof(char));
std::string str(buf);
if (str.find("Request_") != std::string::npos) {
std::cout << "MqttClient::on_message() - Request found" << std::endl;
std::string client = str.substr(str.find("_") + 1);
std::string message = "Client " + client + " requested update";
snprintf(buf, payload_size, message.c_str());
publish(NULL, db->getSettings().mqtt_topic_frontend.c_str(), strlen(buf), buf);
}
}
}
};
\ No newline at end of file
#include <iostream>
#include <thread>
#include <vector>
#include "compatibility.h"
#include <chrono>
#include <ctime>
#include "openweathermap.h"
#include "dbSqlite.h"
#include "MqttClient.h"
#include "dbSqlite.h"
using namespace std;
#define DEBUG true
// Database object
dbSqlite* db;
// Thread for openweathermap:getWeather
void schedulerWeather(int time) {
std::cout << "Wetterstation::schedulerWeather" << std::endl;
if (DEBUG) std::cout << "Wetterstation::schedulerWeather()" << std::endl;
while (1) {
openweathermap owmw;
......@@ -24,7 +25,7 @@ void schedulerWeather(int time) {
// Thread for openweathermap:getForecast
void schedulerForecast(int time) {
std::cout << "Wetterstation::schedulerForecast" << std::endl;
if (DEBUG) std::cout << "Wetterstation::schedulerForecast()" << std::endl;
while (1) {
openweathermap owmw;
......@@ -33,9 +34,19 @@ void schedulerForecast(int time) {
}
}
void mqttStart() {
if (DEBUG) std::cout << "Wetterstation::mqttStart()" << std::endl;
mosqpp::lib_init();
MqttClient* mqttClient = new MqttClient("Wetterstation", db->getSettings().mqtt_host.c_str(), db->getSettings().mqtt_port, db->getSettings().mqtt_user.c_str(), db->getSettings().mqtt_pass.c_str(), db->getSettings().mqtt_topic_backend.c_str());
mosqpp::lib_cleanup();
}
// Main method
int main() {
std::cout << "Wetterstation::main" << std::endl;
if (DEBUG) std::cout << "Wetterstation::main()" << std::endl;
// Sockets aktivieren
WSADATA wsaData;
......@@ -44,14 +55,19 @@ int main() {
printf("WSAStartup failed: %d\n", iResult);
return 1;
}
// Initialisieren der Datenbank
db = new dbSqlite();
// Thread fr getWeather starten
std::thread thrWeather{ schedulerWeather, 60000 };
csleep(50);
std::this_thread::sleep_for(std::chrono::milliseconds(50));
// Thread fr getForecast starten
std::thread thrForecast{ schedulerForecast, 900000 };
csleep(100);
std::this_thread::sleep_for(std::chrono::milliseconds(50));
//mqttStart();
// MQTT starten
mqttStart();
std::vector<WeatherConditions> cond = db->getConditions();
......
void csleep(int time) {
#if defined(WIN32) || defined(WIN64)
Sleep(time);
#else
usleep((long) time);
#endif
}
\ No newline at end of file
......@@ -11,6 +11,8 @@ using Poco::Data::Statement;
using std::string;
using std::cout;
#define DEBUG false
struct WeatherConditions {
int condition_id;
std::string main;
......@@ -33,6 +35,7 @@ struct Settings {
std::string mqtt_topic_backend;
int mqtt_qos;
std::string mqtt_timeout;
int mqtt_port;
};
class dbSqlite {
......@@ -44,7 +47,7 @@ public:
string dbFile;
dbSqlite() {
std::cout << "sbSqlite()" << std::endl;
if (DEBUG) std::cout << "sbSqlite()" << std::endl;
Poco::Data::SQLite::Connector::registerConnector();
this->dbFile = "weatherstation.db";
this->weatherConditions = this->queryConditions();
......@@ -52,7 +55,7 @@ public:
};
Settings getSettings() {
std::cout << "dbSqlite::getSettings()" << std::endl;
if (DEBUG) std::cout << "dbSqlite::getSettings()" << std::endl;
return this->settings;
}
......@@ -61,7 +64,7 @@ public:
}
void querySettings() {
std::cout << "dbSqlite::querySettings()" << std::endl;
if (DEBUG) std::cout << "dbSqlite::querySettings()" << std::endl;
Session session("SQLite", this->dbFile);
Statement select(session);
......@@ -81,12 +84,13 @@ public:
into(this->settings.mqtt_topic_backend),
into(this->settings.mqtt_qos),
into(this->settings.mqtt_timeout),
into(this->settings.mqtt_port),
range(0, 1); // iterate over result set one row at a time
select.execute();
}
std::vector<WeatherConditions> queryConditions() {
std::cout << "dbSqlite::queryConditions()" << std::endl;
if (DEBUG) std::cout << "dbSqlite::queryConditions()" << std::endl;
Session session("SQLite", this->dbFile);
std::vector<WeatherConditions> result;
WeatherConditions wc;
......
......@@ -19,6 +19,8 @@
#include "dbSqlite.h"
#include "json.h"
#define DEBUG false
struct weatherData {
std::string plz;
std::string lngCode;
......@@ -52,12 +54,12 @@ private:
dbSqlite* db;
public:
openweathermap() {
std::cout << "openweathermap::openweathermap()" << std::endl;
if (DEBUG) std::cout << "openweathermap::openweathermap()" << std::endl;
db = new dbSqlite();
}
std::string str_tolower(std::string s) {
std::cout << "openweathermap::str_tolower()" << std::endl;
if (DEBUG) std::cout << "openweathermap::str_tolower()" << std::endl;
std::transform(s.begin(), s.end(), s.begin(),
[](unsigned char c) { return std::tolower(c); }
);
......@@ -65,17 +67,17 @@ public:
}
weatherData getSWeather() {
std::cout << "openweathermap::getSWeather()" << std::endl;
if (DEBUG) std::cout << "openweathermap::getSWeather()" << std::endl;
return this->sWeather;
}
std::vector<weatherData> getSForecast() {
std::cout << "openweathermap::getSForecast()" << std::endl;
if (DEBUG) std::cout << "openweathermap::getSForecast()" << std::endl;
return this->vecForecast;
}
void setSWeather(json::JSON jObj, std::string plz, std::string lngCode) {
std::cout << "openweathermap::setSWeather(" << plz << ")" << std::endl;
if (DEBUG) std::cout << "openweathermap::setSWeather(" << plz << ")" << std::endl;
this->sWeather.plz = plz;
this->sWeather.lngCode = lngCode;
this->sWeather.visibility = jObj["visibility"].ToInt();
......@@ -98,7 +100,7 @@ public:
}
void setSForecast(json::JSON jObj, std::string plz, std::string lngCode) {
std::cout << "openweathermap::setSForecast(" << plz << ")" << std::endl;
if (DEBUG) std::cout << "openweathermap::setSForecast(" << plz << ")" << std::endl;
for (int i = 0; i < jObj["cnt"].ToInt(); i++) {
weatherData item;
item.plz = plz;
......@@ -126,7 +128,7 @@ public:
}
std::string getResponse(std::string url) {
std::cout << "openweathermap::getResponse()" << std::endl;
if (DEBUG) std::cout << "openweathermap::getResponse()" << std::endl;
try {
// prepare session
Poco::URI uri(url);
......@@ -157,11 +159,11 @@ public:
}
void getWeather(std::string plz, std::string lngCode) {
std::cout << "openweathermap::getWeather(" << plz << ", " << lngCode << ")" << std::endl;
if (DEBUG) std::cout << "openweathermap::getWeather(" << plz << ", " << lngCode << ")" << std::endl;
lngCode = str_tolower(lngCode);
this->setSWeather(json::JSON::Load(this->getResponse("http://api.openweathermap.org/data/2.5/weather?zip=" + plz + "," + lngCode + "&appid=" + db->getSettings().owm_appid + "&mode=json&units=metric&lang=de")), plz, lngCode);
std::cout << "owm_appid: " << db->getSettings().owm_appid << std::endl;
if (DEBUG) std::cout << "owm_appid: " << db->getSettings().owm_appid << std::endl;
influxdb_cpp::server_info si(db->getSettings().influx_host, db->getSettings().influx_port, db->getSettings().influx_db, db->getSettings().influx_user, db->getSettings().influx_pass);
influxdb_cpp::builder()
......@@ -189,7 +191,7 @@ public:
}
void getForecast(std::string plz, std::string lngCode) {
std::cout << "openweathermap::getForecast(" << plz << ", " << lngCode << ")" << std::endl;
if (DEBUG) std::cout << "openweathermap::getForecast(" << plz << ", " << lngCode << ")" << std::endl;
lngCode = str_tolower(lngCode);
this->setSForecast(json::JSON::Load(this->getResponse("http://api.openweathermap.org/data/2.5/forecase?zip=" + plz + "," + lngCode + "&appid=" + db->getSettings().owm_appid + "&mode=json&units=metric&lang=de")), plz, lngCode);
}
......
No preview for this file type
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment