Fork of the excellent esp8266-react - https://github.com/rjwats/esp8266-react
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

171 lines
5.3 KiB

  1. #ifndef MqttPubSub_h
  2. #define MqttPubSub_h
  3. #include <StatefulService.h>
  4. #include <JsonSerializer.h>
  5. #include <JsonDeserializer.h>
  6. #include <AsyncMqttClient.h>
  7. #define MQTT_ORIGIN_ID "mqtt"
  8. template <class T>
  9. class MqttConnector {
  10. protected:
  11. StatefulService<T>* _statefulService;
  12. AsyncMqttClient* _mqttClient;
  13. size_t _bufferSize;
  14. MqttConnector(StatefulService<T>* statefulService, AsyncMqttClient* mqttClient, size_t bufferSize) :
  15. _statefulService(statefulService), _mqttClient(mqttClient), _bufferSize(bufferSize) {
  16. _mqttClient->onConnect(std::bind(&MqttConnector::onConnect, this));
  17. }
  18. virtual void onConnect() = 0;
  19. public:
  20. inline AsyncMqttClient* getMqttClient() const {
  21. return _mqttClient;
  22. }
  23. };
  24. template <class T>
  25. class MqttPub : virtual public MqttConnector<T> {
  26. public:
  27. MqttPub(JsonSerializer<T> jsonSerializer,
  28. StatefulService<T>* statefulService,
  29. AsyncMqttClient* mqttClient,
  30. const String& pubTopic = "",
  31. size_t bufferSize = DEFAULT_BUFFER_SIZE) :
  32. MqttConnector<T>(statefulService, mqttClient, bufferSize), _jsonSerializer(jsonSerializer), _pubTopic(pubTopic) {
  33. MqttConnector<T>::_statefulService->addUpdateHandler([&](const String& originId) { publish(); }, false);
  34. }
  35. void setPubTopic(const String& pubTopic) {
  36. _pubTopic = pubTopic;
  37. publish();
  38. }
  39. protected:
  40. virtual void onConnect() {
  41. publish();
  42. }
  43. private:
  44. JsonSerializer<T> _jsonSerializer;
  45. String _pubTopic;
  46. void publish() {
  47. if (_pubTopic.length() > 0 && MqttConnector<T>::_mqttClient->connected()) {
  48. // serialize to json doc
  49. DynamicJsonDocument json(MqttConnector<T>::_bufferSize);
  50. JsonObject jsonObject = json.to<JsonObject>();
  51. MqttConnector<T>::_statefulService->read(jsonObject, _jsonSerializer);
  52. // serialize to string
  53. String payload;
  54. serializeJson(json, payload);
  55. // publish the payload
  56. MqttConnector<T>::_mqttClient->publish(_pubTopic.c_str(), 0, false, payload.c_str());
  57. }
  58. }
  59. };
  60. template <class T>
  61. class MqttSub : virtual public MqttConnector<T> {
  62. public:
  63. MqttSub(JsonDeserializer<T> jsonDeserializer,
  64. StatefulService<T>* statefulService,
  65. AsyncMqttClient* mqttClient,
  66. const String& subTopic = "",
  67. size_t bufferSize = DEFAULT_BUFFER_SIZE) :
  68. MqttConnector<T>(statefulService, mqttClient, bufferSize),
  69. _jsonDeserializer(jsonDeserializer),
  70. _subTopic(subTopic) {
  71. MqttConnector<T>::_mqttClient->onMessage(std::bind(&MqttSub::onMqttMessage,
  72. this,
  73. std::placeholders::_1,
  74. std::placeholders::_2,
  75. std::placeholders::_3,
  76. std::placeholders::_4,
  77. std::placeholders::_5,
  78. std::placeholders::_6));
  79. }
  80. void setSubTopic(const String& subTopic) {
  81. if (!_subTopic.equals(subTopic)) {
  82. // unsubscribe from the existing topic if one was set
  83. if (_subTopic.length() > 0) {
  84. MqttConnector<T>::_mqttClient->unsubscribe(_subTopic.c_str());
  85. }
  86. // set the new topic and re-configure the subscription
  87. _subTopic = subTopic;
  88. subscribe();
  89. }
  90. }
  91. protected:
  92. virtual void onConnect() {
  93. subscribe();
  94. }
  95. private:
  96. JsonDeserializer<T> _jsonDeserializer;
  97. String _subTopic;
  98. void subscribe() {
  99. if (_subTopic.length() > 0) {
  100. MqttConnector<T>::_mqttClient->subscribe(_subTopic.c_str(), 2);
  101. }
  102. }
  103. void onMqttMessage(char* topic,
  104. char* payload,
  105. AsyncMqttClientMessageProperties properties,
  106. size_t len,
  107. size_t index,
  108. size_t total) {
  109. // we only care about the topic we are watching in this class
  110. if (strcmp(_subTopic.c_str(), topic)) {
  111. return;
  112. }
  113. // deserialize from string
  114. DynamicJsonDocument json(MqttConnector<T>::_bufferSize);
  115. DeserializationError error = deserializeJson(json, payload, len);
  116. if (!error && json.is<JsonObject>()) {
  117. JsonObject jsonObject = json.as<JsonObject>();
  118. MqttConnector<T>::_statefulService->update(jsonObject, _jsonDeserializer, MQTT_ORIGIN_ID);
  119. }
  120. }
  121. };
  122. template <class T>
  123. class MqttPubSub : public MqttPub<T>, public MqttSub<T> {
  124. public:
  125. MqttPubSub(JsonSerializer<T> jsonSerializer,
  126. JsonDeserializer<T> jsonDeserializer,
  127. StatefulService<T>* statefulService,
  128. AsyncMqttClient* mqttClient,
  129. const String& pubTopic = "",
  130. const String& subTopic = "",
  131. size_t bufferSize = DEFAULT_BUFFER_SIZE) :
  132. MqttConnector<T>(statefulService, mqttClient, bufferSize),
  133. MqttPub<T>(jsonSerializer, statefulService, mqttClient, pubTopic, bufferSize),
  134. MqttSub<T>(jsonDeserializer, statefulService, mqttClient, subTopic, bufferSize) {
  135. }
  136. public:
  137. void configureTopics(const String& pubTopic, const String& subTopic) {
  138. MqttSub<T>::setSubTopic(subTopic);
  139. MqttPub<T>::setPubTopic(pubTopic);
  140. }
  141. protected:
  142. void onConnect() {
  143. MqttSub<T>::onConnect();
  144. MqttPub<T>::onConnect();
  145. }
  146. };
  147. #endif // end MqttPubSub