Fork of the excellent esp8266-react - https://github.com/rjwats/esp8266-react
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

166 lines
5.0 KiB

  1. #ifndef MqttPubSub_h
  2. #define MqttPubSub_h
  3. #include <StatefulService.h>
  4. #include <JsonSerializer.h>
  5. #include <JsonDeserializer.h>
  6. #include <AsyncMqttClient.h>
  7. #define MAX_MESSAGE_SIZE 1024
  8. #define MQTT_ORIGIN_ID "mqtt"
  9. template <class T>
  10. class MqttConnector {
  11. protected:
  12. StatefulService<T>* _statefulService;
  13. AsyncMqttClient* _mqttClient;
  14. MqttConnector(StatefulService<T>* statefulService, AsyncMqttClient* mqttClient) :
  15. _statefulService(statefulService), _mqttClient(mqttClient) {
  16. _mqttClient->onConnect(std::bind(&MqttConnector::onConnect, this));
  17. }
  18. virtual void onConnect() = 0;
  19. public:
  20. inline AsyncMqttClient* getMqttClient() const {
  21. return _mqttClient;
  22. }
  23. };
  24. template <class T>
  25. class MqttPub : virtual public MqttConnector<T> {
  26. public:
  27. MqttPub(JsonSerializer<T> jsonSerializer,
  28. StatefulService<T>* statefulService,
  29. AsyncMqttClient* mqttClient,
  30. String pubTopic = "") :
  31. MqttConnector<T>(statefulService, mqttClient), _jsonSerializer(jsonSerializer), _pubTopic(pubTopic) {
  32. MqttConnector<T>::_statefulService->addUpdateHandler([&](String originId) { publish(); }, false);
  33. }
  34. void setPubTopic(String pubTopic) {
  35. _pubTopic = pubTopic;
  36. publish();
  37. }
  38. protected:
  39. virtual void onConnect() {
  40. publish();
  41. }
  42. private:
  43. JsonSerializer<T> _jsonSerializer;
  44. String _pubTopic;
  45. void publish() {
  46. if (_pubTopic.length() > 0 && MqttConnector<T>::_mqttClient->connected()) {
  47. // serialize to json doc
  48. DynamicJsonDocument json(MAX_MESSAGE_SIZE);
  49. JsonObject jsonObject = json.to<JsonObject>();
  50. MqttConnector<T>::_statefulService->read(jsonObject, _jsonSerializer);
  51. // serialize to string
  52. String payload;
  53. serializeJson(json, payload);
  54. // publish the payload
  55. MqttConnector<T>::_mqttClient->publish(_pubTopic.c_str(), 0, false, payload.c_str());
  56. }
  57. }
  58. };
  59. template <class T>
  60. class MqttSub : virtual public MqttConnector<T> {
  61. public:
  62. MqttSub(JsonDeserializer<T> jsonDeserializer,
  63. StatefulService<T>* statefulService,
  64. AsyncMqttClient* mqttClient,
  65. String subTopic = "") :
  66. MqttConnector<T>(statefulService, mqttClient), _jsonDeserializer(jsonDeserializer), _subTopic(subTopic) {
  67. MqttConnector<T>::_mqttClient->onMessage(std::bind(&MqttSub::onMqttMessage,
  68. this,
  69. std::placeholders::_1,
  70. std::placeholders::_2,
  71. std::placeholders::_3,
  72. std::placeholders::_4,
  73. std::placeholders::_5,
  74. std::placeholders::_6));
  75. }
  76. void setSubTopic(String subTopic) {
  77. if (!_subTopic.equals(subTopic)) {
  78. // unsubscribe from the existing topic if one was set
  79. if (_subTopic.length() > 0) {
  80. MqttConnector<T>::_mqttClient->unsubscribe(_subTopic.c_str());
  81. }
  82. // set the new topic and re-configure the subscription
  83. _subTopic = subTopic;
  84. subscribe();
  85. }
  86. }
  87. protected:
  88. virtual void onConnect() {
  89. subscribe();
  90. }
  91. private:
  92. JsonDeserializer<T> _jsonDeserializer;
  93. String _subTopic;
  94. void subscribe() {
  95. if (_subTopic.length() > 0) {
  96. MqttConnector<T>::_mqttClient->subscribe(_subTopic.c_str(), 2);
  97. }
  98. }
  99. void onMqttMessage(char* topic,
  100. char* payload,
  101. AsyncMqttClientMessageProperties properties,
  102. size_t len,
  103. size_t index,
  104. size_t total) {
  105. // we only care about the topic we are watching in this class
  106. if (strcmp(_subTopic.c_str(), topic)) {
  107. return;
  108. }
  109. // deserialize from string
  110. DynamicJsonDocument json(MAX_MESSAGE_SIZE);
  111. DeserializationError error = deserializeJson(json, payload, len);
  112. if (!error && json.is<JsonObject>()) {
  113. JsonObject jsonObject = json.as<JsonObject>();
  114. MqttConnector<T>::_statefulService->update(jsonObject, _jsonDeserializer, MQTT_ORIGIN_ID);
  115. }
  116. }
  117. };
  118. template <class T>
  119. class MqttPubSub : public MqttPub<T>, public MqttSub<T> {
  120. public:
  121. MqttPubSub(JsonSerializer<T> jsonSerializer,
  122. JsonDeserializer<T> jsonDeserializer,
  123. StatefulService<T>* statefulService,
  124. AsyncMqttClient* mqttClient,
  125. String pubTopic = "",
  126. String subTopic = "") :
  127. MqttConnector<T>(statefulService, mqttClient),
  128. MqttPub<T>(jsonSerializer, statefulService, mqttClient, pubTopic),
  129. MqttSub<T>(jsonDeserializer, statefulService, mqttClient, subTopic) {
  130. }
  131. public:
  132. void configureTopics(String pubTopic, String subTopic) {
  133. MqttSub<T>::setSubTopic(subTopic);
  134. MqttPub<T>::setPubTopic(pubTopic);
  135. }
  136. protected:
  137. void onConnect() {
  138. MqttSub<T>::onConnect();
  139. MqttPub<T>::onConnect();
  140. }
  141. };
  142. #endif // end MqttPubSub