Fork of the excellent esp8266-react - https://github.com/rjwats/esp8266-react
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

167 lines
5.2 KiB

  1. #ifndef MqttPubSub_h
  2. #define MqttPubSub_h
  3. #include <StatefulService.h>
  4. #include <AsyncMqttClient.h>
  5. #define MQTT_ORIGIN_ID "mqtt"
  6. template <class T>
  7. class MqttConnector {
  8. protected:
  9. StatefulService<T>* _statefulService;
  10. AsyncMqttClient* _mqttClient;
  11. size_t _bufferSize;
  12. MqttConnector(StatefulService<T>* statefulService, AsyncMqttClient* mqttClient, size_t bufferSize) :
  13. _statefulService(statefulService), _mqttClient(mqttClient), _bufferSize(bufferSize) {
  14. _mqttClient->onConnect(std::bind(&MqttConnector::onConnect, this));
  15. }
  16. virtual void onConnect() = 0;
  17. public:
  18. inline AsyncMqttClient* getMqttClient() const {
  19. return _mqttClient;
  20. }
  21. };
  22. template <class T>
  23. class MqttPub : virtual public MqttConnector<T> {
  24. public:
  25. MqttPub(JsonStateReader<T> stateReader,
  26. StatefulService<T>* statefulService,
  27. AsyncMqttClient* mqttClient,
  28. const String& pubTopic = "",
  29. size_t bufferSize = DEFAULT_BUFFER_SIZE) :
  30. MqttConnector<T>(statefulService, mqttClient, bufferSize), _stateReader(stateReader), _pubTopic(pubTopic) {
  31. MqttConnector<T>::_statefulService->addUpdateHandler([&](const String& originId) { publish(); }, false);
  32. }
  33. void setPubTopic(const String& pubTopic) {
  34. _pubTopic = pubTopic;
  35. publish();
  36. }
  37. protected:
  38. virtual void onConnect() {
  39. publish();
  40. }
  41. private:
  42. JsonStateReader<T> _stateReader;
  43. String _pubTopic;
  44. void publish() {
  45. if (_pubTopic.length() > 0 && MqttConnector<T>::_mqttClient->connected()) {
  46. // serialize to json doc
  47. DynamicJsonDocument json(MqttConnector<T>::_bufferSize);
  48. JsonObject jsonObject = json.to<JsonObject>();
  49. MqttConnector<T>::_statefulService->read(jsonObject, _stateReader);
  50. // serialize to string
  51. String payload;
  52. serializeJson(json, payload);
  53. // publish the payload
  54. MqttConnector<T>::_mqttClient->publish(_pubTopic.c_str(), 0, false, payload.c_str());
  55. }
  56. }
  57. };
  58. template <class T>
  59. class MqttSub : virtual public MqttConnector<T> {
  60. public:
  61. MqttSub(JsonStateUpdater<T> stateUpdater,
  62. StatefulService<T>* statefulService,
  63. AsyncMqttClient* mqttClient,
  64. const String& subTopic = "",
  65. size_t bufferSize = DEFAULT_BUFFER_SIZE) :
  66. MqttConnector<T>(statefulService, mqttClient, bufferSize), _stateUpdater(stateUpdater), _subTopic(subTopic) {
  67. MqttConnector<T>::_mqttClient->onMessage(std::bind(&MqttSub::onMqttMessage,
  68. this,
  69. std::placeholders::_1,
  70. std::placeholders::_2,
  71. std::placeholders::_3,
  72. std::placeholders::_4,
  73. std::placeholders::_5,
  74. std::placeholders::_6));
  75. }
  76. void setSubTopic(const String& subTopic) {
  77. if (!_subTopic.equals(subTopic)) {
  78. // unsubscribe from the existing topic if one was set
  79. if (_subTopic.length() > 0) {
  80. MqttConnector<T>::_mqttClient->unsubscribe(_subTopic.c_str());
  81. }
  82. // set the new topic and re-configure the subscription
  83. _subTopic = subTopic;
  84. subscribe();
  85. }
  86. }
  87. protected:
  88. virtual void onConnect() {
  89. subscribe();
  90. }
  91. private:
  92. JsonStateUpdater<T> _stateUpdater;
  93. String _subTopic;
  94. void subscribe() {
  95. if (_subTopic.length() > 0) {
  96. MqttConnector<T>::_mqttClient->subscribe(_subTopic.c_str(), 2);
  97. }
  98. }
  99. void onMqttMessage(char* topic,
  100. char* payload,
  101. AsyncMqttClientMessageProperties properties,
  102. size_t len,
  103. size_t index,
  104. size_t total) {
  105. // we only care about the topic we are watching in this class
  106. if (strcmp(_subTopic.c_str(), topic)) {
  107. return;
  108. }
  109. // deserialize from string
  110. DynamicJsonDocument json(MqttConnector<T>::_bufferSize);
  111. DeserializationError error = deserializeJson(json, payload, len);
  112. if (!error && json.is<JsonObject>()) {
  113. JsonObject jsonObject = json.as<JsonObject>();
  114. MqttConnector<T>::_statefulService->update(jsonObject, _stateUpdater, MQTT_ORIGIN_ID);
  115. }
  116. }
  117. };
  118. template <class T>
  119. class MqttPubSub : public MqttPub<T>, public MqttSub<T> {
  120. public:
  121. MqttPubSub(JsonStateReader<T> stateReader,
  122. JsonStateUpdater<T> stateUpdater,
  123. StatefulService<T>* statefulService,
  124. AsyncMqttClient* mqttClient,
  125. const String& pubTopic = "",
  126. const String& subTopic = "",
  127. size_t bufferSize = DEFAULT_BUFFER_SIZE) :
  128. MqttConnector<T>(statefulService, mqttClient, bufferSize),
  129. MqttPub<T>(stateReader, statefulService, mqttClient, pubTopic, bufferSize),
  130. MqttSub<T>(stateUpdater, statefulService, mqttClient, subTopic, bufferSize) {
  131. }
  132. public:
  133. void configureTopics(const String& pubTopic, const String& subTopic) {
  134. MqttSub<T>::setSubTopic(subTopic);
  135. MqttPub<T>::setPubTopic(pubTopic);
  136. }
  137. protected:
  138. void onConnect() {
  139. MqttSub<T>::onConnect();
  140. MqttPub<T>::onConnect();
  141. }
  142. };
  143. #endif // end MqttPubSub