Fork of the excellent esp8266-react - https://github.com/rjwats/esp8266-react
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

274 lines
11 KiB

  1. #ifndef WebSocketTxRx_h
  2. #define WebSocketTxRx_h
  3. #include <StatefulService.h>
  4. #include <JsonSerializer.h>
  5. #include <JsonDeserializer.h>
  6. #include <ESPAsyncWebServer.h>
  7. #define WEB_SOCKET_CLIENT_ID_MSG_SIZE 128
  8. #define WEB_SOCKET_ORIGIN "websocket"
  9. #define WEB_SOCKET_ORIGIN_CLIENT_ID_PREFIX "websocket:"
  10. template <class T>
  11. class WebSocketConnector {
  12. protected:
  13. StatefulService<T>* _statefulService;
  14. AsyncWebServer* _server;
  15. AsyncWebSocket _webSocket;
  16. size_t _bufferSize;
  17. WebSocketConnector(StatefulService<T>* statefulService,
  18. AsyncWebServer* server,
  19. char const* webSocketPath,
  20. SecurityManager* securityManager,
  21. AuthenticationPredicate authenticationPredicate,
  22. size_t bufferSize) :
  23. _statefulService(statefulService), _server(server), _webSocket(webSocketPath), _bufferSize(bufferSize) {
  24. _webSocket.setFilter(securityManager->filterRequest(authenticationPredicate));
  25. _webSocket.onEvent(std::bind(&WebSocketConnector::onWSEvent,
  26. this,
  27. std::placeholders::_1,
  28. std::placeholders::_2,
  29. std::placeholders::_3,
  30. std::placeholders::_4,
  31. std::placeholders::_5,
  32. std::placeholders::_6));
  33. _server->addHandler(&_webSocket);
  34. _server->on(webSocketPath, HTTP_GET, std::bind(&WebSocketConnector::forbidden, this, std::placeholders::_1));
  35. }
  36. WebSocketConnector(StatefulService<T>* statefulService,
  37. AsyncWebServer* server,
  38. char const* webSocketPath,
  39. size_t bufferSize) :
  40. _statefulService(statefulService), _server(server), _webSocket(webSocketPath), _bufferSize(bufferSize) {
  41. _webSocket.onEvent(std::bind(&WebSocketConnector::onWSEvent,
  42. this,
  43. std::placeholders::_1,
  44. std::placeholders::_2,
  45. std::placeholders::_3,
  46. std::placeholders::_4,
  47. std::placeholders::_5,
  48. std::placeholders::_6));
  49. _server->addHandler(&_webSocket);
  50. }
  51. virtual void onWSEvent(AsyncWebSocket* server,
  52. AsyncWebSocketClient* client,
  53. AwsEventType type,
  54. void* arg,
  55. uint8_t* data,
  56. size_t len) = 0;
  57. String clientId(AsyncWebSocketClient* client) {
  58. return WEB_SOCKET_ORIGIN_CLIENT_ID_PREFIX + String(client->id());
  59. }
  60. private:
  61. void forbidden(AsyncWebServerRequest* request) {
  62. request->send(403);
  63. }
  64. };
  65. template <class T>
  66. class WebSocketTx : virtual public WebSocketConnector<T> {
  67. public:
  68. WebSocketTx(JsonSerializer<T> jsonSerializer,
  69. StatefulService<T>* statefulService,
  70. AsyncWebServer* server,
  71. char const* webSocketPath,
  72. SecurityManager* securityManager,
  73. AuthenticationPredicate authenticationPredicate = AuthenticationPredicates::IS_ADMIN,
  74. size_t bufferSize = DEFAULT_BUFFER_SIZE) :
  75. WebSocketConnector<T>(statefulService,
  76. server,
  77. webSocketPath,
  78. securityManager,
  79. authenticationPredicate,
  80. bufferSize),
  81. _jsonSerializer(jsonSerializer) {
  82. WebSocketConnector<T>::_statefulService->addUpdateHandler(
  83. [&](const String& originId) { transmitData(nullptr, originId); }, false);
  84. }
  85. WebSocketTx(JsonSerializer<T> jsonSerializer,
  86. StatefulService<T>* statefulService,
  87. AsyncWebServer* server,
  88. char const* webSocketPath,
  89. size_t bufferSize = DEFAULT_BUFFER_SIZE) :
  90. WebSocketConnector<T>(statefulService, server, webSocketPath, bufferSize), _jsonSerializer(jsonSerializer) {
  91. WebSocketConnector<T>::_statefulService->addUpdateHandler([&](const String& originId) { transmitData(nullptr, originId); },
  92. false);
  93. }
  94. protected:
  95. virtual void onWSEvent(AsyncWebSocket* server,
  96. AsyncWebSocketClient* client,
  97. AwsEventType type,
  98. void* arg,
  99. uint8_t* data,
  100. size_t len) {
  101. if (type == WS_EVT_CONNECT) {
  102. // when a client connects, we transmit it's id and the current payload
  103. transmitId(client);
  104. transmitData(client, WEB_SOCKET_ORIGIN);
  105. }
  106. }
  107. private:
  108. JsonSerializer<T> _jsonSerializer;
  109. void transmitId(AsyncWebSocketClient* client) {
  110. DynamicJsonDocument jsonDocument = DynamicJsonDocument(WEB_SOCKET_CLIENT_ID_MSG_SIZE);
  111. JsonObject root = jsonDocument.to<JsonObject>();
  112. root["type"] = "id";
  113. root["id"] = WebSocketConnector<T>::clientId(client);
  114. size_t len = measureJson(jsonDocument);
  115. AsyncWebSocketMessageBuffer* buffer = WebSocketConnector<T>::_webSocket.makeBuffer(len);
  116. if (buffer) {
  117. serializeJson(jsonDocument, (char*)buffer->get(), len + 1);
  118. client->text(buffer);
  119. }
  120. }
  121. /**
  122. * Broadcasts the payload to the destination, if provided. Otherwise broadcasts to all clients except the origin, if
  123. * specified.
  124. *
  125. * Original implementation sent clients their own IDs so they could ignore updates they initiated. This approach
  126. * simplifies the client and the server implementation but may not be sufficent for all use-cases.
  127. */
  128. void transmitData(AsyncWebSocketClient* client, const String& originId) {
  129. DynamicJsonDocument jsonDocument = DynamicJsonDocument(WebSocketConnector<T>::_bufferSize);
  130. JsonObject root = jsonDocument.to<JsonObject>();
  131. root["type"] = "payload";
  132. root["origin_id"] = originId;
  133. JsonObject payload = root.createNestedObject("payload");
  134. WebSocketConnector<T>::_statefulService->read(payload, _jsonSerializer);
  135. size_t len = measureJson(jsonDocument);
  136. AsyncWebSocketMessageBuffer* buffer = WebSocketConnector<T>::_webSocket.makeBuffer(len);
  137. if (buffer) {
  138. serializeJson(jsonDocument, (char*)buffer->get(), len + 1);
  139. if (client) {
  140. client->text(buffer);
  141. } else {
  142. WebSocketConnector<T>::_webSocket.textAll(buffer);
  143. }
  144. }
  145. }
  146. };
  147. template <class T>
  148. class WebSocketRx : virtual public WebSocketConnector<T> {
  149. public:
  150. WebSocketRx(JsonDeserializer<T> jsonDeserializer,
  151. StatefulService<T>* statefulService,
  152. AsyncWebServer* server,
  153. char const* webSocketPath,
  154. SecurityManager* securityManager,
  155. AuthenticationPredicate authenticationPredicate = AuthenticationPredicates::IS_ADMIN,
  156. size_t bufferSize = DEFAULT_BUFFER_SIZE) :
  157. WebSocketConnector<T>(statefulService,
  158. server,
  159. webSocketPath,
  160. securityManager,
  161. authenticationPredicate,
  162. bufferSize),
  163. _jsonDeserializer(jsonDeserializer) {
  164. }
  165. WebSocketRx(JsonDeserializer<T> jsonDeserializer,
  166. StatefulService<T>* statefulService,
  167. AsyncWebServer* server,
  168. char const* webSocketPath,
  169. size_t bufferSize = DEFAULT_BUFFER_SIZE) :
  170. WebSocketConnector<T>(statefulService, server, webSocketPath, bufferSize), _jsonDeserializer(jsonDeserializer) {
  171. }
  172. protected:
  173. virtual void onWSEvent(AsyncWebSocket* server,
  174. AsyncWebSocketClient* client,
  175. AwsEventType type,
  176. void* arg,
  177. uint8_t* data,
  178. size_t len) {
  179. if (type == WS_EVT_DATA) {
  180. AwsFrameInfo* info = (AwsFrameInfo*)arg;
  181. if (info->final && info->index == 0 && info->len == len) {
  182. if (info->opcode == WS_TEXT) {
  183. DynamicJsonDocument jsonDocument = DynamicJsonDocument(WebSocketConnector<T>::_bufferSize);
  184. DeserializationError error = deserializeJson(jsonDocument, (char*)data);
  185. if (!error && jsonDocument.is<JsonObject>()) {
  186. JsonObject jsonObject = jsonDocument.as<JsonObject>();
  187. WebSocketConnector<T>::_statefulService->update(
  188. jsonObject, _jsonDeserializer, WebSocketConnector<T>::clientId(client));
  189. }
  190. }
  191. }
  192. }
  193. }
  194. private:
  195. JsonDeserializer<T> _jsonDeserializer;
  196. };
  197. template <class T>
  198. class WebSocketTxRx : public WebSocketTx<T>, public WebSocketRx<T> {
  199. public:
  200. WebSocketTxRx(JsonSerializer<T> jsonSerializer,
  201. JsonDeserializer<T> jsonDeserializer,
  202. StatefulService<T>* statefulService,
  203. AsyncWebServer* server,
  204. char const* webSocketPath,
  205. SecurityManager* securityManager,
  206. AuthenticationPredicate authenticationPredicate = AuthenticationPredicates::IS_ADMIN,
  207. size_t bufferSize = DEFAULT_BUFFER_SIZE) :
  208. WebSocketConnector<T>(statefulService,
  209. server,
  210. webSocketPath,
  211. securityManager,
  212. authenticationPredicate,
  213. bufferSize),
  214. WebSocketTx<T>(jsonSerializer,
  215. statefulService,
  216. server,
  217. webSocketPath,
  218. securityManager,
  219. authenticationPredicate,
  220. bufferSize),
  221. WebSocketRx<T>(jsonDeserializer,
  222. statefulService,
  223. server,
  224. webSocketPath,
  225. securityManager,
  226. authenticationPredicate,
  227. bufferSize) {
  228. }
  229. WebSocketTxRx(JsonSerializer<T> jsonSerializer,
  230. JsonDeserializer<T> jsonDeserializer,
  231. StatefulService<T>* statefulService,
  232. AsyncWebServer* server,
  233. char const* webSocketPath,
  234. size_t bufferSize = DEFAULT_BUFFER_SIZE) :
  235. WebSocketConnector<T>(statefulService, server, webSocketPath, bufferSize),
  236. WebSocketTx<T>(jsonSerializer, statefulService, server, webSocketPath, bufferSize),
  237. WebSocketRx<T>(jsonDeserializer, statefulService, server, webSocketPath, bufferSize) {
  238. }
  239. protected:
  240. void onWSEvent(AsyncWebSocket* server,
  241. AsyncWebSocketClient* client,
  242. AwsEventType type,
  243. void* arg,
  244. uint8_t* data,
  245. size_t len) {
  246. WebSocketRx<T>::onWSEvent(server, client, type, arg, data, len);
  247. WebSocketTx<T>::onWSEvent(server, client, type, arg, data, len);
  248. }
  249. };
  250. #endif