Browse Source

Make StatefulService buffer size configurable (#118)

Introduce DEFAULT_BUFFER_SIZE for StatefulService related classes
Add configurable buffer sizes for StatefulService related classes
Remove redundant function from HttpEndpoint
master
rjwats 4 years ago
committed by GitHub
parent
commit
bcb1098402
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
  1. 29
      lib/framework/FSPersistence.h
  2. 58
      lib/framework/HttpEndpoint.h
  3. 31
      lib/framework/MqttPubSub.h
  4. 4
      lib/framework/StatefulService.h
  5. 82
      lib/framework/WebSocketTxRx.h

29
lib/framework/FSPersistence.h

@ -6,8 +6,6 @@
#include <JsonDeserializer.h>
#include <FS.h>
#define MAX_FILE_SIZE 1024
template <class T>
class FSPersistence {
public:
@ -15,12 +13,14 @@ class FSPersistence {
JsonDeserializer<T> jsonDeserializer,
StatefulService<T>* statefulService,
FS* fs,
char const* filePath) :
char const* filePath,
size_t bufferSize = DEFAULT_BUFFER_SIZE) :
_jsonSerializer(jsonSerializer),
_jsonDeserializer(jsonDeserializer),
_statefulService(statefulService),
_fs(fs),
_filePath(filePath),
_bufferSize(bufferSize),
_updateHandlerId(0) {
enableUpdateHandler();
}
@ -29,15 +29,13 @@ class FSPersistence {
File settingsFile = _fs->open(_filePath, "r");
if (settingsFile) {
if (settingsFile.size() <= MAX_FILE_SIZE) {
DynamicJsonDocument jsonDocument = DynamicJsonDocument(MAX_FILE_SIZE);
DeserializationError error = deserializeJson(jsonDocument, settingsFile);
if (error == DeserializationError::Ok && jsonDocument.is<JsonObject>()) {
JsonObject jsonObject = jsonDocument.as<JsonObject>();
_statefulService->updateWithoutPropagation(jsonObject, _jsonDeserializer);
settingsFile.close();
return;
}
DynamicJsonDocument jsonDocument = DynamicJsonDocument(_bufferSize);
DeserializationError error = deserializeJson(jsonDocument, settingsFile);
if (error == DeserializationError::Ok && jsonDocument.is<JsonObject>()) {
JsonObject jsonObject = jsonDocument.as<JsonObject>();
_statefulService->updateWithoutPropagation(jsonObject, _jsonDeserializer);
settingsFile.close();
return;
}
settingsFile.close();
}
@ -49,7 +47,7 @@ class FSPersistence {
bool writeToFS() {
// create and populate a new json object
DynamicJsonDocument jsonDocument = DynamicJsonDocument(MAX_FILE_SIZE);
DynamicJsonDocument jsonDocument = DynamicJsonDocument(_bufferSize);
JsonObject jsonObject = jsonDocument.to<JsonObject>();
_statefulService->read(jsonObject, _jsonSerializer);
@ -84,15 +82,16 @@ class FSPersistence {
JsonSerializer<T> _jsonSerializer;
JsonDeserializer<T> _jsonDeserializer;
StatefulService<T>* _statefulService;
FS* _fs;
FS* _fs;
char const* _filePath;
size_t _bufferSize;
update_handler_id_t _updateHandlerId;
protected:
// We assume the deserializer supplies sensible defaults if an empty object
// is supplied, this virtual function allows that to be changed.
virtual void applyDefaults() {
DynamicJsonDocument jsonDocument = DynamicJsonDocument(MAX_FILE_SIZE);
DynamicJsonDocument jsonDocument = DynamicJsonDocument(_bufferSize);
JsonObject jsonObject = jsonDocument.as<JsonObject>();
_statefulService->updateWithoutPropagation(jsonObject, _jsonDeserializer);
}

58
lib/framework/HttpEndpoint.h

@ -11,7 +11,6 @@
#include <JsonSerializer.h>
#include <JsonDeserializer.h>
#define MAX_CONTENT_LENGTH 1024
#define HTTP_ENDPOINT_ORIGIN_ID "http"
template <class T>
@ -22,8 +21,9 @@ class HttpGetEndpoint {
AsyncWebServer* server,
const String& servicePath,
SecurityManager* securityManager,
AuthenticationPredicate authenticationPredicate = AuthenticationPredicates::IS_ADMIN) :
_jsonSerializer(jsonSerializer), _statefulService(statefulService) {
AuthenticationPredicate authenticationPredicate = AuthenticationPredicates::IS_ADMIN,
size_t bufferSize = DEFAULT_BUFFER_SIZE) :
_jsonSerializer(jsonSerializer), _statefulService(statefulService), _bufferSize(bufferSize) {
server->on(servicePath.c_str(),
HTTP_GET,
securityManager->wrapRequest(std::bind(&HttpGetEndpoint::fetchSettings, this, std::placeholders::_1),
@ -33,17 +33,19 @@ class HttpGetEndpoint {
HttpGetEndpoint(JsonSerializer<T> jsonSerializer,
StatefulService<T>* statefulService,
AsyncWebServer* server,
const String& servicePath) :
_jsonSerializer(jsonSerializer), _statefulService(statefulService) {
const String& servicePath,
size_t bufferSize = DEFAULT_BUFFER_SIZE) :
_jsonSerializer(jsonSerializer), _statefulService(statefulService), _bufferSize(bufferSize) {
server->on(servicePath.c_str(), HTTP_GET, std::bind(&HttpGetEndpoint::fetchSettings, this, std::placeholders::_1));
}
protected:
JsonSerializer<T> _jsonSerializer;
StatefulService<T>* _statefulService;
size_t _bufferSize;
void fetchSettings(AsyncWebServerRequest* request) {
AsyncJsonResponse* response = new AsyncJsonResponse(false, MAX_CONTENT_LENGTH);
AsyncJsonResponse* response = new AsyncJsonResponse(false, _bufferSize);
JsonObject jsonObject = response->getRoot().to<JsonObject>();
_statefulService->read(jsonObject, _jsonSerializer);
@ -61,7 +63,8 @@ class HttpPostEndpoint {
AsyncWebServer* server,
const String& servicePath,
SecurityManager* securityManager,
AuthenticationPredicate authenticationPredicate = AuthenticationPredicates::IS_ADMIN) :
AuthenticationPredicate authenticationPredicate = AuthenticationPredicates::IS_ADMIN,
size_t bufferSize = DEFAULT_BUFFER_SIZE) :
_jsonSerializer(jsonSerializer),
_jsonDeserializer(jsonDeserializer),
_statefulService(statefulService),
@ -69,9 +72,10 @@ class HttpPostEndpoint {
servicePath,
securityManager->wrapCallback(
std::bind(&HttpPostEndpoint::updateSettings, this, std::placeholders::_1, std::placeholders::_2),
authenticationPredicate)) {
authenticationPredicate),
bufferSize),
_bufferSize(bufferSize) {
_updateHandler.setMethod(HTTP_POST);
_updateHandler.setMaxContentLength(MAX_CONTENT_LENGTH);
server->addHandler(&_updateHandler);
}
@ -79,14 +83,16 @@ class HttpPostEndpoint {
JsonDeserializer<T> jsonDeserializer,
StatefulService<T>* statefulService,
AsyncWebServer* server,
const String& servicePath) :
const String& servicePath,
size_t bufferSize = DEFAULT_BUFFER_SIZE) :
_jsonSerializer(jsonSerializer),
_jsonDeserializer(jsonDeserializer),
_statefulService(statefulService),
_updateHandler(servicePath,
std::bind(&HttpPostEndpoint::updateSettings, this, std::placeholders::_1, std::placeholders::_2)) {
std::bind(&HttpPostEndpoint::updateSettings, this, std::placeholders::_1, std::placeholders::_2),
bufferSize),
_bufferSize(bufferSize) {
_updateHandler.setMethod(HTTP_POST);
_updateHandler.setMaxContentLength(MAX_CONTENT_LENGTH);
server->addHandler(&_updateHandler);
}
@ -95,19 +101,11 @@ class HttpPostEndpoint {
JsonDeserializer<T> _jsonDeserializer;
StatefulService<T>* _statefulService;
AsyncCallbackJsonWebHandler _updateHandler;
void fetchSettings(AsyncWebServerRequest* request) {
AsyncJsonResponse* response = new AsyncJsonResponse(false, MAX_CONTENT_LENGTH);
JsonObject jsonObject = response->getRoot().to<JsonObject>();
_statefulService->read(jsonObject, _jsonSerializer);
response->setLength();
request->send(response);
}
size_t _bufferSize;
void updateSettings(AsyncWebServerRequest* request, JsonVariant& json) {
if (json.is<JsonObject>()) {
AsyncJsonResponse* response = new AsyncJsonResponse(false, MAX_CONTENT_LENGTH);
AsyncJsonResponse* response = new AsyncJsonResponse(false, _bufferSize);
// use callback to update the settings once the response is complete
request->onDisconnect([this]() { _statefulService->callUpdateHandlers(HTTP_ENDPOINT_ORIGIN_ID); });
@ -138,29 +136,33 @@ class HttpEndpoint : public HttpGetEndpoint<T>, public HttpPostEndpoint<T> {
AsyncWebServer* server,
const String& servicePath,
SecurityManager* securityManager,
AuthenticationPredicate authenticationPredicate = AuthenticationPredicates::IS_ADMIN) :
AuthenticationPredicate authenticationPredicate = AuthenticationPredicates::IS_ADMIN,
size_t bufferSize = DEFAULT_BUFFER_SIZE) :
HttpGetEndpoint<T>(jsonSerializer,
statefulService,
server,
servicePath,
securityManager,
authenticationPredicate),
authenticationPredicate,
bufferSize),
HttpPostEndpoint<T>(jsonSerializer,
jsonDeserializer,
statefulService,
server,
servicePath,
securityManager,
authenticationPredicate) {
authenticationPredicate,
bufferSize) {
}
HttpEndpoint(JsonSerializer<T> jsonSerializer,
JsonDeserializer<T> jsonDeserializer,
StatefulService<T>* statefulService,
AsyncWebServer* server,
const String& servicePath) :
HttpGetEndpoint<T>(jsonSerializer, statefulService, server, servicePath),
HttpPostEndpoint<T>(jsonSerializer, jsonDeserializer, statefulService, server, servicePath) {
const String& servicePath,
size_t bufferSize = DEFAULT_BUFFER_SIZE) :
HttpGetEndpoint<T>(jsonSerializer, statefulService, server, servicePath, bufferSize),
HttpPostEndpoint<T>(jsonSerializer, jsonDeserializer, statefulService, server, servicePath, bufferSize) {
}
};

31
lib/framework/MqttPubSub.h

@ -6,7 +6,6 @@
#include <JsonDeserializer.h>
#include <AsyncMqttClient.h>
#define MAX_MESSAGE_SIZE 1024
#define MQTT_ORIGIN_ID "mqtt"
template <class T>
@ -14,9 +13,10 @@ class MqttConnector {
protected:
StatefulService<T>* _statefulService;
AsyncMqttClient* _mqttClient;
size_t _bufferSize;
MqttConnector(StatefulService<T>* statefulService, AsyncMqttClient* mqttClient) :
_statefulService(statefulService), _mqttClient(mqttClient) {
MqttConnector(StatefulService<T>* statefulService, AsyncMqttClient* mqttClient, size_t bufferSize) :
_statefulService(statefulService), _mqttClient(mqttClient), _bufferSize(bufferSize) {
_mqttClient->onConnect(std::bind(&MqttConnector::onConnect, this));
}
@ -34,8 +34,9 @@ class MqttPub : virtual public MqttConnector<T> {
MqttPub(JsonSerializer<T> jsonSerializer,
StatefulService<T>* statefulService,
AsyncMqttClient* mqttClient,
const String& pubTopic = "") :
MqttConnector<T>(statefulService, mqttClient), _jsonSerializer(jsonSerializer), _pubTopic(pubTopic) {
const String& pubTopic = "",
size_t bufferSize = DEFAULT_BUFFER_SIZE) :
MqttConnector<T>(statefulService, mqttClient, bufferSize), _jsonSerializer(jsonSerializer), _pubTopic(pubTopic) {
MqttConnector<T>::_statefulService->addUpdateHandler([&](const String& originId) { publish(); }, false);
}
@ -56,7 +57,7 @@ class MqttPub : virtual public MqttConnector<T> {
void publish() {
if (_pubTopic.length() > 0 && MqttConnector<T>::_mqttClient->connected()) {
// serialize to json doc
DynamicJsonDocument json(MAX_MESSAGE_SIZE);
DynamicJsonDocument json(MqttConnector<T>::_bufferSize);
JsonObject jsonObject = json.to<JsonObject>();
MqttConnector<T>::_statefulService->read(jsonObject, _jsonSerializer);
@ -76,8 +77,11 @@ class MqttSub : virtual public MqttConnector<T> {
MqttSub(JsonDeserializer<T> jsonDeserializer,
StatefulService<T>* statefulService,
AsyncMqttClient* mqttClient,
const String& subTopic = "") :
MqttConnector<T>(statefulService, mqttClient), _jsonDeserializer(jsonDeserializer), _subTopic(subTopic) {
const String& subTopic = "",
size_t bufferSize = DEFAULT_BUFFER_SIZE) :
MqttConnector<T>(statefulService, mqttClient, bufferSize),
_jsonDeserializer(jsonDeserializer),
_subTopic(subTopic) {
MqttConnector<T>::_mqttClient->onMessage(std::bind(&MqttSub::onMqttMessage,
this,
std::placeholders::_1,
@ -127,7 +131,7 @@ class MqttSub : virtual public MqttConnector<T> {
}
// deserialize from string
DynamicJsonDocument json(MAX_MESSAGE_SIZE);
DynamicJsonDocument json(MqttConnector<T>::_bufferSize);
DeserializationError error = deserializeJson(json, payload, len);
if (!error && json.is<JsonObject>()) {
JsonObject jsonObject = json.as<JsonObject>();
@ -144,10 +148,11 @@ class MqttPubSub : public MqttPub<T>, public MqttSub<T> {
StatefulService<T>* statefulService,
AsyncMqttClient* mqttClient,
const String& pubTopic = "",
const String& subTopic = "") :
MqttConnector<T>(statefulService, mqttClient),
MqttPub<T>(jsonSerializer, statefulService, mqttClient, pubTopic),
MqttSub<T>(jsonDeserializer, statefulService, mqttClient, subTopic) {
const String& subTopic = "",
size_t bufferSize = DEFAULT_BUFFER_SIZE) :
MqttConnector<T>(statefulService, mqttClient, bufferSize),
MqttPub<T>(jsonSerializer, statefulService, mqttClient, pubTopic, bufferSize),
MqttSub<T>(jsonDeserializer, statefulService, mqttClient, subTopic, bufferSize) {
}
public:

4
lib/framework/StatefulService.h

@ -12,6 +12,10 @@
#include <freertos/semphr.h>
#endif
#ifndef DEFAULT_BUFFER_SIZE
#define DEFAULT_BUFFER_SIZE 1024
#endif
typedef size_t update_handler_id_t;
typedef std::function<void(const String& originId)> StateUpdateCallback;

82
lib/framework/WebSocketTxRx.h

@ -6,7 +6,6 @@
#include <JsonDeserializer.h>
#include <ESPAsyncWebServer.h>
#define WEB_SOCKET_MSG_SIZE 1024
#define WEB_SOCKET_CLIENT_ID_MSG_SIZE 128
#define WEB_SOCKET_ORIGIN "websocket"
@ -18,13 +17,15 @@ class WebSocketConnector {
StatefulService<T>* _statefulService;
AsyncWebServer* _server;
AsyncWebSocket _webSocket;
size_t _bufferSize;
WebSocketConnector(StatefulService<T>* statefulService,
AsyncWebServer* server,
char const* webSocketPath,
SecurityManager* securityManager,
AuthenticationPredicate authenticationPredicate = AuthenticationPredicates::IS_ADMIN) :
_statefulService(statefulService), _server(server), _webSocket(webSocketPath) {
AuthenticationPredicate authenticationPredicate,
size_t bufferSize) :
_statefulService(statefulService), _server(server), _webSocket(webSocketPath), _bufferSize(bufferSize) {
_webSocket.setFilter(securityManager->filterRequest(authenticationPredicate));
_webSocket.onEvent(std::bind(&WebSocketConnector::onWSEvent,
this,
@ -38,8 +39,11 @@ class WebSocketConnector {
_server->on(webSocketPath, HTTP_GET, std::bind(&WebSocketConnector::forbidden, this, std::placeholders::_1));
}
WebSocketConnector(StatefulService<T>* statefulService, AsyncWebServer* server, char const* webSocketPath) :
_statefulService(statefulService), _server(server), _webSocket(webSocketPath) {
WebSocketConnector(StatefulService<T>* statefulService,
AsyncWebServer* server,
char const* webSocketPath,
size_t bufferSize) :
_statefulService(statefulService), _server(server), _webSocket(webSocketPath), _bufferSize(bufferSize) {
_webSocket.onEvent(std::bind(&WebSocketConnector::onWSEvent,
this,
std::placeholders::_1,
@ -76,8 +80,14 @@ class WebSocketTx : virtual public WebSocketConnector<T> {
AsyncWebServer* server,
char const* webSocketPath,
SecurityManager* securityManager,
AuthenticationPredicate authenticationPredicate = AuthenticationPredicates::IS_ADMIN) :
WebSocketConnector<T>(statefulService, server, webSocketPath, securityManager, authenticationPredicate),
AuthenticationPredicate authenticationPredicate = AuthenticationPredicates::IS_ADMIN,
size_t bufferSize = DEFAULT_BUFFER_SIZE) :
WebSocketConnector<T>(statefulService,
server,
webSocketPath,
securityManager,
authenticationPredicate,
bufferSize),
_jsonSerializer(jsonSerializer) {
WebSocketConnector<T>::_statefulService->addUpdateHandler(
[&](const String& originId) { transmitData(nullptr, originId); }, false);
@ -86,10 +96,11 @@ class WebSocketTx : virtual public WebSocketConnector<T> {
WebSocketTx(JsonSerializer<T> jsonSerializer,
StatefulService<T>* statefulService,
AsyncWebServer* server,
char const* webSocketPath) :
WebSocketConnector<T>(statefulService, server, webSocketPath), _jsonSerializer(jsonSerializer) {
WebSocketConnector<T>::_statefulService->addUpdateHandler(
[&](const String& originId) { transmitData(nullptr, originId); }, false);
char const* webSocketPath,
size_t bufferSize = DEFAULT_BUFFER_SIZE) :
WebSocketConnector<T>(statefulService, server, webSocketPath, bufferSize), _jsonSerializer(jsonSerializer) {
WebSocketConnector<T>::_statefulService->addUpdateHandler([&](const String& originId) { transmitData(nullptr, originId); },
false);
}
protected:
@ -130,7 +141,7 @@ class WebSocketTx : virtual public WebSocketConnector<T> {
* simplifies the client and the server implementation but may not be sufficent for all use-cases.
*/
void transmitData(AsyncWebSocketClient* client, const String& originId) {
DynamicJsonDocument jsonDocument = DynamicJsonDocument(WEB_SOCKET_MSG_SIZE);
DynamicJsonDocument jsonDocument = DynamicJsonDocument(WebSocketConnector<T>::_bufferSize);
JsonObject root = jsonDocument.to<JsonObject>();
root["type"] = "payload";
root["origin_id"] = originId;
@ -158,16 +169,23 @@ class WebSocketRx : virtual public WebSocketConnector<T> {
AsyncWebServer* server,
char const* webSocketPath,
SecurityManager* securityManager,
AuthenticationPredicate authenticationPredicate = AuthenticationPredicates::IS_ADMIN) :
WebSocketConnector<T>(statefulService, server, webSocketPath, securityManager, authenticationPredicate),
AuthenticationPredicate authenticationPredicate = AuthenticationPredicates::IS_ADMIN,
size_t bufferSize = DEFAULT_BUFFER_SIZE) :
WebSocketConnector<T>(statefulService,
server,
webSocketPath,
securityManager,
authenticationPredicate,
bufferSize),
_jsonDeserializer(jsonDeserializer) {
}
WebSocketRx(JsonDeserializer<T> jsonDeserializer,
StatefulService<T>* statefulService,
AsyncWebServer* server,
char const* webSocketPath) :
WebSocketConnector<T>(statefulService, server, webSocketPath), _jsonDeserializer(jsonDeserializer) {
char const* webSocketPath,
size_t bufferSize = DEFAULT_BUFFER_SIZE) :
WebSocketConnector<T>(statefulService, server, webSocketPath, bufferSize), _jsonDeserializer(jsonDeserializer) {
}
protected:
@ -181,7 +199,7 @@ class WebSocketRx : virtual public WebSocketConnector<T> {
AwsFrameInfo* info = (AwsFrameInfo*)arg;
if (info->final && info->index == 0 && info->len == len) {
if (info->opcode == WS_TEXT) {
DynamicJsonDocument jsonDocument = DynamicJsonDocument(WEB_SOCKET_MSG_SIZE);
DynamicJsonDocument jsonDocument = DynamicJsonDocument(WebSocketConnector<T>::_bufferSize);
DeserializationError error = deserializeJson(jsonDocument, (char*)data);
if (!error && jsonDocument.is<JsonObject>()) {
JsonObject jsonObject = jsonDocument.as<JsonObject>();
@ -206,25 +224,39 @@ class WebSocketTxRx : public WebSocketTx<T>, public WebSocketRx<T> {
AsyncWebServer* server,
char const* webSocketPath,
SecurityManager* securityManager,
AuthenticationPredicate authenticationPredicate = AuthenticationPredicates::IS_ADMIN) :
WebSocketConnector<T>(statefulService, server, webSocketPath, securityManager, authenticationPredicate),
WebSocketTx<T>(jsonSerializer, statefulService, server, webSocketPath, securityManager, authenticationPredicate),
AuthenticationPredicate authenticationPredicate = AuthenticationPredicates::IS_ADMIN,
size_t bufferSize = DEFAULT_BUFFER_SIZE) :
WebSocketConnector<T>(statefulService,
server,
webSocketPath,
securityManager,
authenticationPredicate,
bufferSize),
WebSocketTx<T>(jsonSerializer,
statefulService,
server,
webSocketPath,
securityManager,
authenticationPredicate,
bufferSize),
WebSocketRx<T>(jsonDeserializer,
statefulService,
server,
webSocketPath,
securityManager,
authenticationPredicate) {
authenticationPredicate,
bufferSize) {
}
WebSocketTxRx(JsonSerializer<T> jsonSerializer,
JsonDeserializer<T> jsonDeserializer,
StatefulService<T>* statefulService,
AsyncWebServer* server,
char const* webSocketPath) :
WebSocketConnector<T>(statefulService, server, webSocketPath),
WebSocketTx<T>(jsonSerializer, statefulService, server, webSocketPath),
WebSocketRx<T>(jsonDeserializer, statefulService, server, webSocketPath) {
char const* webSocketPath,
size_t bufferSize = DEFAULT_BUFFER_SIZE) :
WebSocketConnector<T>(statefulService, server, webSocketPath, bufferSize),
WebSocketTx<T>(jsonSerializer, statefulService, server, webSocketPath, bufferSize),
WebSocketRx<T>(jsonDeserializer, statefulService, server, webSocketPath, bufferSize) {
}
protected:

Loading…
Cancel
Save