#ifndef MqttPubSub_h #define MqttPubSub_h #include #include #include #include #define MAX_MESSAGE_SIZE 1024 #define MQTT_ORIGIN_ID "mqtt" template class MqttConnector { protected: StatefulService* _statefulService; AsyncMqttClient* _mqttClient; MqttConnector(StatefulService* statefulService, AsyncMqttClient* mqttClient) : _statefulService(statefulService), _mqttClient(mqttClient) { _mqttClient->onConnect(std::bind(&MqttConnector::onConnect, this)); } virtual void onConnect() = 0; public: inline AsyncMqttClient* getMqttClient() const { return _mqttClient; } }; template class MqttPub : virtual public MqttConnector { public: MqttPub(JsonSerializer jsonSerializer, StatefulService* statefulService, AsyncMqttClient* mqttClient, const String& pubTopic = "") : MqttConnector(statefulService, mqttClient), _jsonSerializer(jsonSerializer), _pubTopic(pubTopic) { MqttConnector::_statefulService->addUpdateHandler([&](const String& originId) { publish(); }, false); } void setPubTopic(const String& pubTopic) { _pubTopic = pubTopic; publish(); } protected: virtual void onConnect() { publish(); } private: JsonSerializer _jsonSerializer; String _pubTopic; void publish() { if (_pubTopic.length() > 0 && MqttConnector::_mqttClient->connected()) { // serialize to json doc DynamicJsonDocument json(MAX_MESSAGE_SIZE); JsonObject jsonObject = json.to(); MqttConnector::_statefulService->read(jsonObject, _jsonSerializer); // serialize to string String payload; serializeJson(json, payload); // publish the payload MqttConnector::_mqttClient->publish(_pubTopic.c_str(), 0, false, payload.c_str()); } } }; template class MqttSub : virtual public MqttConnector { public: MqttSub(JsonDeserializer jsonDeserializer, StatefulService* statefulService, AsyncMqttClient* mqttClient, const String& subTopic = "") : MqttConnector(statefulService, mqttClient), _jsonDeserializer(jsonDeserializer), _subTopic(subTopic) { MqttConnector::_mqttClient->onMessage(std::bind(&MqttSub::onMqttMessage, this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3, std::placeholders::_4, std::placeholders::_5, std::placeholders::_6)); } void setSubTopic(const String& subTopic) { if (!_subTopic.equals(subTopic)) { // unsubscribe from the existing topic if one was set if (_subTopic.length() > 0) { MqttConnector::_mqttClient->unsubscribe(_subTopic.c_str()); } // set the new topic and re-configure the subscription _subTopic = subTopic; subscribe(); } } protected: virtual void onConnect() { subscribe(); } private: JsonDeserializer _jsonDeserializer; String _subTopic; void subscribe() { if (_subTopic.length() > 0) { MqttConnector::_mqttClient->subscribe(_subTopic.c_str(), 2); } } void onMqttMessage(char* topic, char* payload, AsyncMqttClientMessageProperties properties, size_t len, size_t index, size_t total) { // we only care about the topic we are watching in this class if (strcmp(_subTopic.c_str(), topic)) { return; } // deserialize from string DynamicJsonDocument json(MAX_MESSAGE_SIZE); DeserializationError error = deserializeJson(json, payload, len); if (!error && json.is()) { JsonObject jsonObject = json.as(); MqttConnector::_statefulService->update(jsonObject, _jsonDeserializer, MQTT_ORIGIN_ID); } } }; template class MqttPubSub : public MqttPub, public MqttSub { public: MqttPubSub(JsonSerializer jsonSerializer, JsonDeserializer jsonDeserializer, StatefulService* statefulService, AsyncMqttClient* mqttClient, const String& pubTopic = "", const String& subTopic = "") : MqttConnector(statefulService, mqttClient), MqttPub(jsonSerializer, statefulService, mqttClient, pubTopic), MqttSub(jsonDeserializer, statefulService, mqttClient, subTopic) { } public: void configureTopics(const String& pubTopic, const String& subTopic) { MqttSub::setSubTopic(subTopic); MqttPub::setPubTopic(pubTopic); } protected: void onConnect() { MqttSub::onConnect(); MqttPub::onConnect(); } }; #endif // end MqttPubSub