From bcb1098402120dc794398769fdb9bcede2b2ea22 Mon Sep 17 00:00:00 2001 From: rjwats Date: Fri, 22 May 2020 19:26:12 +0100 Subject: [PATCH] Make StatefulService buffer size configurable (#118) Introduce DEFAULT_BUFFER_SIZE for StatefulService related classes Add configurable buffer sizes for StatefulService related classes Remove redundant function from HttpEndpoint --- lib/framework/FSPersistence.h | 29 ++++++------ lib/framework/HttpEndpoint.h | 58 ++++++++++++----------- lib/framework/MqttPubSub.h | 31 +++++++------ lib/framework/StatefulService.h | 4 ++ lib/framework/WebSocketTxRx.h | 82 +++++++++++++++++++++++---------- 5 files changed, 123 insertions(+), 81 deletions(-) diff --git a/lib/framework/FSPersistence.h b/lib/framework/FSPersistence.h index 19b7ae8..915ed18 100644 --- a/lib/framework/FSPersistence.h +++ b/lib/framework/FSPersistence.h @@ -6,8 +6,6 @@ #include #include -#define MAX_FILE_SIZE 1024 - template class FSPersistence { public: @@ -15,12 +13,14 @@ class FSPersistence { JsonDeserializer jsonDeserializer, StatefulService* statefulService, FS* fs, - char const* filePath) : + char const* filePath, + size_t bufferSize = DEFAULT_BUFFER_SIZE) : _jsonSerializer(jsonSerializer), _jsonDeserializer(jsonDeserializer), _statefulService(statefulService), _fs(fs), _filePath(filePath), + _bufferSize(bufferSize), _updateHandlerId(0) { enableUpdateHandler(); } @@ -29,15 +29,13 @@ class FSPersistence { File settingsFile = _fs->open(_filePath, "r"); if (settingsFile) { - if (settingsFile.size() <= MAX_FILE_SIZE) { - DynamicJsonDocument jsonDocument = DynamicJsonDocument(MAX_FILE_SIZE); - DeserializationError error = deserializeJson(jsonDocument, settingsFile); - if (error == DeserializationError::Ok && jsonDocument.is()) { - JsonObject jsonObject = jsonDocument.as(); - _statefulService->updateWithoutPropagation(jsonObject, _jsonDeserializer); - settingsFile.close(); - return; - } + DynamicJsonDocument jsonDocument = DynamicJsonDocument(_bufferSize); + DeserializationError error = deserializeJson(jsonDocument, settingsFile); + if (error == DeserializationError::Ok && jsonDocument.is()) { + JsonObject jsonObject = jsonDocument.as(); + _statefulService->updateWithoutPropagation(jsonObject, _jsonDeserializer); + settingsFile.close(); + return; } settingsFile.close(); } @@ -49,7 +47,7 @@ class FSPersistence { bool writeToFS() { // create and populate a new json object - DynamicJsonDocument jsonDocument = DynamicJsonDocument(MAX_FILE_SIZE); + DynamicJsonDocument jsonDocument = DynamicJsonDocument(_bufferSize); JsonObject jsonObject = jsonDocument.to(); _statefulService->read(jsonObject, _jsonSerializer); @@ -84,15 +82,16 @@ class FSPersistence { JsonSerializer _jsonSerializer; JsonDeserializer _jsonDeserializer; StatefulService* _statefulService; - FS* _fs; + FS* _fs; char const* _filePath; + size_t _bufferSize; update_handler_id_t _updateHandlerId; protected: // We assume the deserializer supplies sensible defaults if an empty object // is supplied, this virtual function allows that to be changed. virtual void applyDefaults() { - DynamicJsonDocument jsonDocument = DynamicJsonDocument(MAX_FILE_SIZE); + DynamicJsonDocument jsonDocument = DynamicJsonDocument(_bufferSize); JsonObject jsonObject = jsonDocument.as(); _statefulService->updateWithoutPropagation(jsonObject, _jsonDeserializer); } diff --git a/lib/framework/HttpEndpoint.h b/lib/framework/HttpEndpoint.h index 6842963..2e747e2 100644 --- a/lib/framework/HttpEndpoint.h +++ b/lib/framework/HttpEndpoint.h @@ -11,7 +11,6 @@ #include #include -#define MAX_CONTENT_LENGTH 1024 #define HTTP_ENDPOINT_ORIGIN_ID "http" template @@ -22,8 +21,9 @@ class HttpGetEndpoint { AsyncWebServer* server, const String& servicePath, SecurityManager* securityManager, - AuthenticationPredicate authenticationPredicate = AuthenticationPredicates::IS_ADMIN) : - _jsonSerializer(jsonSerializer), _statefulService(statefulService) { + AuthenticationPredicate authenticationPredicate = AuthenticationPredicates::IS_ADMIN, + size_t bufferSize = DEFAULT_BUFFER_SIZE) : + _jsonSerializer(jsonSerializer), _statefulService(statefulService), _bufferSize(bufferSize) { server->on(servicePath.c_str(), HTTP_GET, securityManager->wrapRequest(std::bind(&HttpGetEndpoint::fetchSettings, this, std::placeholders::_1), @@ -33,17 +33,19 @@ class HttpGetEndpoint { HttpGetEndpoint(JsonSerializer jsonSerializer, StatefulService* statefulService, AsyncWebServer* server, - const String& servicePath) : - _jsonSerializer(jsonSerializer), _statefulService(statefulService) { + const String& servicePath, + size_t bufferSize = DEFAULT_BUFFER_SIZE) : + _jsonSerializer(jsonSerializer), _statefulService(statefulService), _bufferSize(bufferSize) { server->on(servicePath.c_str(), HTTP_GET, std::bind(&HttpGetEndpoint::fetchSettings, this, std::placeholders::_1)); } protected: JsonSerializer _jsonSerializer; StatefulService* _statefulService; + size_t _bufferSize; void fetchSettings(AsyncWebServerRequest* request) { - AsyncJsonResponse* response = new AsyncJsonResponse(false, MAX_CONTENT_LENGTH); + AsyncJsonResponse* response = new AsyncJsonResponse(false, _bufferSize); JsonObject jsonObject = response->getRoot().to(); _statefulService->read(jsonObject, _jsonSerializer); @@ -61,7 +63,8 @@ class HttpPostEndpoint { AsyncWebServer* server, const String& servicePath, SecurityManager* securityManager, - AuthenticationPredicate authenticationPredicate = AuthenticationPredicates::IS_ADMIN) : + AuthenticationPredicate authenticationPredicate = AuthenticationPredicates::IS_ADMIN, + size_t bufferSize = DEFAULT_BUFFER_SIZE) : _jsonSerializer(jsonSerializer), _jsonDeserializer(jsonDeserializer), _statefulService(statefulService), @@ -69,9 +72,10 @@ class HttpPostEndpoint { servicePath, securityManager->wrapCallback( std::bind(&HttpPostEndpoint::updateSettings, this, std::placeholders::_1, std::placeholders::_2), - authenticationPredicate)) { + authenticationPredicate), + bufferSize), + _bufferSize(bufferSize) { _updateHandler.setMethod(HTTP_POST); - _updateHandler.setMaxContentLength(MAX_CONTENT_LENGTH); server->addHandler(&_updateHandler); } @@ -79,14 +83,16 @@ class HttpPostEndpoint { JsonDeserializer jsonDeserializer, StatefulService* statefulService, AsyncWebServer* server, - const String& servicePath) : + const String& servicePath, + size_t bufferSize = DEFAULT_BUFFER_SIZE) : _jsonSerializer(jsonSerializer), _jsonDeserializer(jsonDeserializer), _statefulService(statefulService), _updateHandler(servicePath, - std::bind(&HttpPostEndpoint::updateSettings, this, std::placeholders::_1, std::placeholders::_2)) { + std::bind(&HttpPostEndpoint::updateSettings, this, std::placeholders::_1, std::placeholders::_2), + bufferSize), + _bufferSize(bufferSize) { _updateHandler.setMethod(HTTP_POST); - _updateHandler.setMaxContentLength(MAX_CONTENT_LENGTH); server->addHandler(&_updateHandler); } @@ -95,19 +101,11 @@ class HttpPostEndpoint { JsonDeserializer _jsonDeserializer; StatefulService* _statefulService; AsyncCallbackJsonWebHandler _updateHandler; - - void fetchSettings(AsyncWebServerRequest* request) { - AsyncJsonResponse* response = new AsyncJsonResponse(false, MAX_CONTENT_LENGTH); - JsonObject jsonObject = response->getRoot().to(); - _statefulService->read(jsonObject, _jsonSerializer); - - response->setLength(); - request->send(response); - } + size_t _bufferSize; void updateSettings(AsyncWebServerRequest* request, JsonVariant& json) { if (json.is()) { - AsyncJsonResponse* response = new AsyncJsonResponse(false, MAX_CONTENT_LENGTH); + AsyncJsonResponse* response = new AsyncJsonResponse(false, _bufferSize); // use callback to update the settings once the response is complete request->onDisconnect([this]() { _statefulService->callUpdateHandlers(HTTP_ENDPOINT_ORIGIN_ID); }); @@ -138,29 +136,33 @@ class HttpEndpoint : public HttpGetEndpoint, public HttpPostEndpoint { AsyncWebServer* server, const String& servicePath, SecurityManager* securityManager, - AuthenticationPredicate authenticationPredicate = AuthenticationPredicates::IS_ADMIN) : + AuthenticationPredicate authenticationPredicate = AuthenticationPredicates::IS_ADMIN, + size_t bufferSize = DEFAULT_BUFFER_SIZE) : HttpGetEndpoint(jsonSerializer, statefulService, server, servicePath, securityManager, - authenticationPredicate), + authenticationPredicate, + bufferSize), HttpPostEndpoint(jsonSerializer, jsonDeserializer, statefulService, server, servicePath, securityManager, - authenticationPredicate) { + authenticationPredicate, + bufferSize) { } HttpEndpoint(JsonSerializer jsonSerializer, JsonDeserializer jsonDeserializer, StatefulService* statefulService, AsyncWebServer* server, - const String& servicePath) : - HttpGetEndpoint(jsonSerializer, statefulService, server, servicePath), - HttpPostEndpoint(jsonSerializer, jsonDeserializer, statefulService, server, servicePath) { + const String& servicePath, + size_t bufferSize = DEFAULT_BUFFER_SIZE) : + HttpGetEndpoint(jsonSerializer, statefulService, server, servicePath, bufferSize), + HttpPostEndpoint(jsonSerializer, jsonDeserializer, statefulService, server, servicePath, bufferSize) { } }; diff --git a/lib/framework/MqttPubSub.h b/lib/framework/MqttPubSub.h index 937063f..959a582 100644 --- a/lib/framework/MqttPubSub.h +++ b/lib/framework/MqttPubSub.h @@ -6,7 +6,6 @@ #include #include -#define MAX_MESSAGE_SIZE 1024 #define MQTT_ORIGIN_ID "mqtt" template @@ -14,9 +13,10 @@ class MqttConnector { protected: StatefulService* _statefulService; AsyncMqttClient* _mqttClient; + size_t _bufferSize; - MqttConnector(StatefulService* statefulService, AsyncMqttClient* mqttClient) : - _statefulService(statefulService), _mqttClient(mqttClient) { + MqttConnector(StatefulService* statefulService, AsyncMqttClient* mqttClient, size_t bufferSize) : + _statefulService(statefulService), _mqttClient(mqttClient), _bufferSize(bufferSize) { _mqttClient->onConnect(std::bind(&MqttConnector::onConnect, this)); } @@ -34,8 +34,9 @@ class MqttPub : virtual public MqttConnector { MqttPub(JsonSerializer jsonSerializer, StatefulService* statefulService, AsyncMqttClient* mqttClient, - const String& pubTopic = "") : - MqttConnector(statefulService, mqttClient), _jsonSerializer(jsonSerializer), _pubTopic(pubTopic) { + const String& pubTopic = "", + size_t bufferSize = DEFAULT_BUFFER_SIZE) : + MqttConnector(statefulService, mqttClient, bufferSize), _jsonSerializer(jsonSerializer), _pubTopic(pubTopic) { MqttConnector::_statefulService->addUpdateHandler([&](const String& originId) { publish(); }, false); } @@ -56,7 +57,7 @@ class MqttPub : virtual public MqttConnector { void publish() { if (_pubTopic.length() > 0 && MqttConnector::_mqttClient->connected()) { // serialize to json doc - DynamicJsonDocument json(MAX_MESSAGE_SIZE); + DynamicJsonDocument json(MqttConnector::_bufferSize); JsonObject jsonObject = json.to(); MqttConnector::_statefulService->read(jsonObject, _jsonSerializer); @@ -76,8 +77,11 @@ class MqttSub : virtual public MqttConnector { MqttSub(JsonDeserializer jsonDeserializer, StatefulService* statefulService, AsyncMqttClient* mqttClient, - const String& subTopic = "") : - MqttConnector(statefulService, mqttClient), _jsonDeserializer(jsonDeserializer), _subTopic(subTopic) { + const String& subTopic = "", + size_t bufferSize = DEFAULT_BUFFER_SIZE) : + MqttConnector(statefulService, mqttClient, bufferSize), + _jsonDeserializer(jsonDeserializer), + _subTopic(subTopic) { MqttConnector::_mqttClient->onMessage(std::bind(&MqttSub::onMqttMessage, this, std::placeholders::_1, @@ -127,7 +131,7 @@ class MqttSub : virtual public MqttConnector { } // deserialize from string - DynamicJsonDocument json(MAX_MESSAGE_SIZE); + DynamicJsonDocument json(MqttConnector::_bufferSize); DeserializationError error = deserializeJson(json, payload, len); if (!error && json.is()) { JsonObject jsonObject = json.as(); @@ -144,10 +148,11 @@ class MqttPubSub : public MqttPub, public MqttSub { StatefulService* statefulService, AsyncMqttClient* mqttClient, const String& pubTopic = "", - const String& subTopic = "") : - MqttConnector(statefulService, mqttClient), - MqttPub(jsonSerializer, statefulService, mqttClient, pubTopic), - MqttSub(jsonDeserializer, statefulService, mqttClient, subTopic) { + const String& subTopic = "", + size_t bufferSize = DEFAULT_BUFFER_SIZE) : + MqttConnector(statefulService, mqttClient, bufferSize), + MqttPub(jsonSerializer, statefulService, mqttClient, pubTopic, bufferSize), + MqttSub(jsonDeserializer, statefulService, mqttClient, subTopic, bufferSize) { } public: diff --git a/lib/framework/StatefulService.h b/lib/framework/StatefulService.h index cc2e063..e29e6fc 100644 --- a/lib/framework/StatefulService.h +++ b/lib/framework/StatefulService.h @@ -12,6 +12,10 @@ #include #endif +#ifndef DEFAULT_BUFFER_SIZE +#define DEFAULT_BUFFER_SIZE 1024 +#endif + typedef size_t update_handler_id_t; typedef std::function StateUpdateCallback; diff --git a/lib/framework/WebSocketTxRx.h b/lib/framework/WebSocketTxRx.h index c0a45dc..ee5bb70 100644 --- a/lib/framework/WebSocketTxRx.h +++ b/lib/framework/WebSocketTxRx.h @@ -6,7 +6,6 @@ #include #include -#define WEB_SOCKET_MSG_SIZE 1024 #define WEB_SOCKET_CLIENT_ID_MSG_SIZE 128 #define WEB_SOCKET_ORIGIN "websocket" @@ -18,13 +17,15 @@ class WebSocketConnector { StatefulService* _statefulService; AsyncWebServer* _server; AsyncWebSocket _webSocket; + size_t _bufferSize; WebSocketConnector(StatefulService* statefulService, AsyncWebServer* server, char const* webSocketPath, SecurityManager* securityManager, - AuthenticationPredicate authenticationPredicate = AuthenticationPredicates::IS_ADMIN) : - _statefulService(statefulService), _server(server), _webSocket(webSocketPath) { + AuthenticationPredicate authenticationPredicate, + size_t bufferSize) : + _statefulService(statefulService), _server(server), _webSocket(webSocketPath), _bufferSize(bufferSize) { _webSocket.setFilter(securityManager->filterRequest(authenticationPredicate)); _webSocket.onEvent(std::bind(&WebSocketConnector::onWSEvent, this, @@ -38,8 +39,11 @@ class WebSocketConnector { _server->on(webSocketPath, HTTP_GET, std::bind(&WebSocketConnector::forbidden, this, std::placeholders::_1)); } - WebSocketConnector(StatefulService* statefulService, AsyncWebServer* server, char const* webSocketPath) : - _statefulService(statefulService), _server(server), _webSocket(webSocketPath) { + WebSocketConnector(StatefulService* statefulService, + AsyncWebServer* server, + char const* webSocketPath, + size_t bufferSize) : + _statefulService(statefulService), _server(server), _webSocket(webSocketPath), _bufferSize(bufferSize) { _webSocket.onEvent(std::bind(&WebSocketConnector::onWSEvent, this, std::placeholders::_1, @@ -76,8 +80,14 @@ class WebSocketTx : virtual public WebSocketConnector { AsyncWebServer* server, char const* webSocketPath, SecurityManager* securityManager, - AuthenticationPredicate authenticationPredicate = AuthenticationPredicates::IS_ADMIN) : - WebSocketConnector(statefulService, server, webSocketPath, securityManager, authenticationPredicate), + AuthenticationPredicate authenticationPredicate = AuthenticationPredicates::IS_ADMIN, + size_t bufferSize = DEFAULT_BUFFER_SIZE) : + WebSocketConnector(statefulService, + server, + webSocketPath, + securityManager, + authenticationPredicate, + bufferSize), _jsonSerializer(jsonSerializer) { WebSocketConnector::_statefulService->addUpdateHandler( [&](const String& originId) { transmitData(nullptr, originId); }, false); @@ -86,10 +96,11 @@ class WebSocketTx : virtual public WebSocketConnector { WebSocketTx(JsonSerializer jsonSerializer, StatefulService* statefulService, AsyncWebServer* server, - char const* webSocketPath) : - WebSocketConnector(statefulService, server, webSocketPath), _jsonSerializer(jsonSerializer) { - WebSocketConnector::_statefulService->addUpdateHandler( - [&](const String& originId) { transmitData(nullptr, originId); }, false); + char const* webSocketPath, + size_t bufferSize = DEFAULT_BUFFER_SIZE) : + WebSocketConnector(statefulService, server, webSocketPath, bufferSize), _jsonSerializer(jsonSerializer) { + WebSocketConnector::_statefulService->addUpdateHandler([&](const String& originId) { transmitData(nullptr, originId); }, + false); } protected: @@ -130,7 +141,7 @@ class WebSocketTx : virtual public WebSocketConnector { * simplifies the client and the server implementation but may not be sufficent for all use-cases. */ void transmitData(AsyncWebSocketClient* client, const String& originId) { - DynamicJsonDocument jsonDocument = DynamicJsonDocument(WEB_SOCKET_MSG_SIZE); + DynamicJsonDocument jsonDocument = DynamicJsonDocument(WebSocketConnector::_bufferSize); JsonObject root = jsonDocument.to(); root["type"] = "payload"; root["origin_id"] = originId; @@ -158,16 +169,23 @@ class WebSocketRx : virtual public WebSocketConnector { AsyncWebServer* server, char const* webSocketPath, SecurityManager* securityManager, - AuthenticationPredicate authenticationPredicate = AuthenticationPredicates::IS_ADMIN) : - WebSocketConnector(statefulService, server, webSocketPath, securityManager, authenticationPredicate), + AuthenticationPredicate authenticationPredicate = AuthenticationPredicates::IS_ADMIN, + size_t bufferSize = DEFAULT_BUFFER_SIZE) : + WebSocketConnector(statefulService, + server, + webSocketPath, + securityManager, + authenticationPredicate, + bufferSize), _jsonDeserializer(jsonDeserializer) { } WebSocketRx(JsonDeserializer jsonDeserializer, StatefulService* statefulService, AsyncWebServer* server, - char const* webSocketPath) : - WebSocketConnector(statefulService, server, webSocketPath), _jsonDeserializer(jsonDeserializer) { + char const* webSocketPath, + size_t bufferSize = DEFAULT_BUFFER_SIZE) : + WebSocketConnector(statefulService, server, webSocketPath, bufferSize), _jsonDeserializer(jsonDeserializer) { } protected: @@ -181,7 +199,7 @@ class WebSocketRx : virtual public WebSocketConnector { AwsFrameInfo* info = (AwsFrameInfo*)arg; if (info->final && info->index == 0 && info->len == len) { if (info->opcode == WS_TEXT) { - DynamicJsonDocument jsonDocument = DynamicJsonDocument(WEB_SOCKET_MSG_SIZE); + DynamicJsonDocument jsonDocument = DynamicJsonDocument(WebSocketConnector::_bufferSize); DeserializationError error = deserializeJson(jsonDocument, (char*)data); if (!error && jsonDocument.is()) { JsonObject jsonObject = jsonDocument.as(); @@ -206,25 +224,39 @@ class WebSocketTxRx : public WebSocketTx, public WebSocketRx { AsyncWebServer* server, char const* webSocketPath, SecurityManager* securityManager, - AuthenticationPredicate authenticationPredicate = AuthenticationPredicates::IS_ADMIN) : - WebSocketConnector(statefulService, server, webSocketPath, securityManager, authenticationPredicate), - WebSocketTx(jsonSerializer, statefulService, server, webSocketPath, securityManager, authenticationPredicate), + AuthenticationPredicate authenticationPredicate = AuthenticationPredicates::IS_ADMIN, + size_t bufferSize = DEFAULT_BUFFER_SIZE) : + WebSocketConnector(statefulService, + server, + webSocketPath, + securityManager, + authenticationPredicate, + bufferSize), + WebSocketTx(jsonSerializer, + statefulService, + server, + webSocketPath, + securityManager, + authenticationPredicate, + bufferSize), WebSocketRx(jsonDeserializer, statefulService, server, webSocketPath, securityManager, - authenticationPredicate) { + authenticationPredicate, + bufferSize) { } WebSocketTxRx(JsonSerializer jsonSerializer, JsonDeserializer jsonDeserializer, StatefulService* statefulService, AsyncWebServer* server, - char const* webSocketPath) : - WebSocketConnector(statefulService, server, webSocketPath), - WebSocketTx(jsonSerializer, statefulService, server, webSocketPath), - WebSocketRx(jsonDeserializer, statefulService, server, webSocketPath) { + char const* webSocketPath, + size_t bufferSize = DEFAULT_BUFFER_SIZE) : + WebSocketConnector(statefulService, server, webSocketPath, bufferSize), + WebSocketTx(jsonSerializer, statefulService, server, webSocketPath, bufferSize), + WebSocketRx(jsonDeserializer, statefulService, server, webSocketPath, bufferSize) { } protected: