From b8e5dcd8d0eae3b2dbb59f163c36eef3199323b6 Mon Sep 17 00:00:00 2001 From: Rohan Sircar Date: Thu, 27 Aug 2020 18:09:51 +0530 Subject: [PATCH] Added actor rsocket conn controller Some refactorings Added counterpart of existing rsocket connection controller that is based on kotlin coroutines and actor model --- frontend/src/main.ts | 29 ++++--- .../demo/controller/HomeRestController.kt | 9 -- .../example/demo/controller/RsocketActor.kt | 34 ++++++++ ...ient.kt => RsocketConnectionController.kt} | 38 ++++++-- .../RsocketConnectionControllerAsync.kt | 86 +++++++++++++++++++ .../demo/controller/RsocketDemoController.kt | 20 +++++ 6 files changed, 189 insertions(+), 27 deletions(-) create mode 100644 src/main/kotlin/com/example/demo/controller/RsocketActor.kt rename src/main/kotlin/com/example/demo/controller/{RsocketClient.kt => RsocketConnectionController.kt} (63%) create mode 100644 src/main/kotlin/com/example/demo/controller/RsocketConnectionControllerAsync.kt create mode 100644 src/main/kotlin/com/example/demo/controller/RsocketDemoController.kt diff --git a/frontend/src/main.ts b/frontend/src/main.ts index 7f73617..7146a4a 100644 --- a/frontend/src/main.ts +++ b/frontend/src/main.ts @@ -11,14 +11,13 @@ import { WellKnownMimeType, APPLICATION_JSON, } from "rsocket-core"; +// MESSAGE_RSOCKET_AUTHENTICATION("message/x.rsocket.authentication.v0", (byte) 0x7C) import { EchoResponder } from "./EchoResponder"; import { every } from "rsocket-flowable"; import RSocketTcpClient from "rsocket-tcp-client"; import RSocketWebSocketClient from "rsocket-websocket-client"; const maxRSocketRequestN = 2147483647; -const host = "127.0.0.1"; -const port = 7000; const keepAlive = 60000; const lifetime = 180000; const dataMimeType = "application/octet-stream"; @@ -28,16 +27,17 @@ const address = { host: "localhost", port: 7000 }; const messageReceiver = (payload) => { //do what you want to do with received message - if ((payload.metadata as string).slice(1) == "user.queue.reply") console.log("YES"); + if ((payload.metadata as string).slice(1) == "user.queue.reply") + console.log("YES"); else console.log("No"); - console.log((payload.metadata as string).slice(1)) + console.log((payload.metadata as string).slice(1)); console.log(payload); }; const responder = new EchoResponder(messageReceiver); function getClientTransport(host: string, port: number) { return new RSocketWebSocketClient({ - url: "ws://localhost:7000/client-id", + url: `ws://${host}:${port}/client-id`, }); } @@ -47,7 +47,7 @@ interface Message { message: string; } -const client = new RSocketClient({ +const client = new RSocketClient({ // send/receive JSON objects instead of strings/buffers serializers: { data: JsonSerializer, @@ -56,8 +56,17 @@ const client = new RSocketClient({ setup: { //for connection mapping on server payload: { - data: "1234", - metadata: String.fromCharCode("client-id".length) + "client-id", + data: { id: 1234, name: "John" }, + // metadata: encodeAndAddWellKnownMetadata( + // encodeAndAddCustomMetadata( + // Buffer.alloc(0), + // "message/x.rsocket.authentication.v0", + // Buffer.from("Hello World") + // ), + // MESSAGE_RSOCKET_ROUTING, + // Buffer.from(String.fromCharCode("client-id2".length) + "client-id2") + // ), + metadata: String.fromCharCode("client-id2".length) + "client-id2" }, // ms btw sending keepalive to server keepAlive: 60000, @@ -75,7 +84,7 @@ const client = new RSocketClient({ transport: getClientTransport(address.host, address.port), }); const route = "user.queue.reply"; -const sendRoute = "private.news"; +const sendRoute = "private.news.2"; client.connect().subscribe({ onComplete: (rSocket) => { every(1000).subscribe({ @@ -110,7 +119,7 @@ client.connect().subscribe({ // }); rSocket.fireAndForget({ - data: { toUser: "4567", fromUser: "1234", message: "testHello" }, + data: { toUser: "4567", fromUser: "1234", message: "testHello" }, metadata: String.fromCharCode(sendRoute.length) + sendRoute, }); // S APPLICATION_JSON.string diff --git a/src/main/kotlin/com/example/demo/controller/HomeRestController.kt b/src/main/kotlin/com/example/demo/controller/HomeRestController.kt index d9c9c4b..adf137a 100644 --- a/src/main/kotlin/com/example/demo/controller/HomeRestController.kt +++ b/src/main/kotlin/com/example/demo/controller/HomeRestController.kt @@ -61,15 +61,6 @@ class HomeRestController(@Autowired @Lazy private val userService: UserService) userService.getUserMessages(userName) } - @MessageMapping("messages.findAll") - suspend fun all() = coroutineScope { - userService.getAllMessages() - } - - @MessageMapping("users.{name}") - suspend fun getUser(@DestinationVariable name: String) = coroutineScope { - userService.getUserByName(name) - } } data class MyData(val id: Int, val name2: String) \ No newline at end of file diff --git a/src/main/kotlin/com/example/demo/controller/RsocketActor.kt b/src/main/kotlin/com/example/demo/controller/RsocketActor.kt new file mode 100644 index 0000000..115533e --- /dev/null +++ b/src/main/kotlin/com/example/demo/controller/RsocketActor.kt @@ -0,0 +1,34 @@ +package com.example.demo.controller + +import io.vavr.collection.HashMap +import io.vavr.collection.Map +import kotlinx.coroutines.CompletableDeferred +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.ObsoleteCoroutinesApi +import kotlinx.coroutines.channels.actor +import org.springframework.messaging.rsocket.RSocketRequester + +// Message types for counterActor +sealed class Message +data class AddRequester(val requester: RSocketRequester, val clientId: String) : Message() +data class RemoveRequester(val clientId: String) : Message() +data class GetRequesters(val response: CompletableDeferred>) : Message() + +// This function launches a new rsocket actor +@ObsoleteCoroutinesApi // Actor API is planned to be overhauled so this annotation is required for now +fun CoroutineScope.rsocketActor() = actor { + var requesterMap: Map = HashMap.empty() // actor state + for (msg in channel) { // iterate over incoming messages + when (msg) { + is AddRequester -> { + println("Actor adding requester") + requesterMap = requesterMap.put(msg.clientId, msg.requester) + } + is RemoveRequester -> { + println("Actor removing requester") + requesterMap = requesterMap.remove(msg.clientId) + } + is GetRequesters -> msg.response.complete(requesterMap) + } + } +} \ No newline at end of file diff --git a/src/main/kotlin/com/example/demo/controller/RsocketClient.kt b/src/main/kotlin/com/example/demo/controller/RsocketConnectionController.kt similarity index 63% rename from src/main/kotlin/com/example/demo/controller/RsocketClient.kt rename to src/main/kotlin/com/example/demo/controller/RsocketConnectionController.kt index 329eebb..519ac3c 100644 --- a/src/main/kotlin/com/example/demo/controller/RsocketClient.kt +++ b/src/main/kotlin/com/example/demo/controller/RsocketConnectionController.kt @@ -1,16 +1,19 @@ package com.example.demo.controller import com.example.demo.model.Message +import com.example.demo.model.User import io.vavr.collection.HashMap import io.vavr.collection.Map import kotlinx.coroutines.delay +import kotlinx.coroutines.flow.Flow +import kotlinx.coroutines.flow.flow import org.slf4j.LoggerFactory import org.springframework.messaging.handler.annotation.MessageExceptionHandler import org.springframework.messaging.handler.annotation.MessageMapping +import org.springframework.messaging.handler.annotation.Payload import org.springframework.messaging.rsocket.RSocketRequester import org.springframework.messaging.rsocket.annotation.ConnectMapping import org.springframework.stereotype.Controller -import org.springframework.util.MimeType @Controller class RSocketConnectionController { @@ -37,17 +40,18 @@ class RSocketConnectionController { } @ConnectMapping("client-id") - fun onConnect(rSocketRequester: RSocketRequester, clientId: String) { - val clientIdFixed = clientId.replace("\"", "") //check why the serializer adds " to strings + fun onConnect(rSocketRequester: RSocketRequester, @Payload user: User) { + // val clientIdFixed = clientId.replace("\"", "") //check why the serializer adds " to strings // rSocketRequester.rsocket().dispose() //to reject connection + val clientId = user.id.toString() rSocketRequester .rsocket() .onClose() .subscribe(null, null, { - log.info("{} just disconnected", clientIdFixed) - removeRequester(clientIdFixed) + log.info("{} just disconnected", clientId) + removeRequester(clientId) }) - addRequester(rSocketRequester, clientIdFixed) + addRequester(rSocketRequester, clientId) } @MessageMapping("private.news") @@ -55,19 +59,37 @@ class RSocketConnectionController { getRequesterMap() .filterKeys { key -> key == message.toUser || key == message.fromUser } .values() - .forEach { requester -> sendMessage(requester, message) } + .forEach { requester -> + run { + println("Sending message") + sendMessage(requester, message) + } + } } @MessageExceptionHandler suspend fun handleException(ex: IllegalArgumentException): String { delay(10) - return "${ex.message} handled" + return "${ex.message} handled" + } + + @MessageMapping("echo-stream-async") + suspend fun echoStreamAsync(payload: String): Flow { + delay(10) + var i = 0 + return flow { + while (true) { + delay(10) + emit("$payload ${i++}") + } + } } private fun sendMessage(requester: RSocketRequester, message: Message) = requester .route("user.queue.reply") +// .metadata("test", MimeTypeUtils.parseMimeType(WellKnownMimeType.MESSAGE_RSOCKET_COMPOSITE_METADATA.string)) .data(message) .send() .subscribe() diff --git a/src/main/kotlin/com/example/demo/controller/RsocketConnectionControllerAsync.kt b/src/main/kotlin/com/example/demo/controller/RsocketConnectionControllerAsync.kt new file mode 100644 index 0000000..d0a73a5 --- /dev/null +++ b/src/main/kotlin/com/example/demo/controller/RsocketConnectionControllerAsync.kt @@ -0,0 +1,86 @@ +package com.example.demo.controller + +import com.example.demo.model.Message +import com.example.demo.model.User +import kotlinx.coroutines.* +import org.slf4j.LoggerFactory +import org.springframework.messaging.handler.annotation.MessageMapping +import org.springframework.messaging.handler.annotation.Payload +import org.springframework.messaging.rsocket.RSocketRequester +import org.springframework.messaging.rsocket.annotation.ConnectMapping +import org.springframework.stereotype.Controller +import io.vavr.collection.Map +import org.springframework.messaging.handler.annotation.MessageExceptionHandler + +@Controller +class RsocketConnectionControllerAsync : CoroutineScope { + private val log = LoggerFactory.getLogger(RsocketConnectionControllerAsync::class.java) + private val job = Job() + override val coroutineContext = Dispatchers.Unconfined + job + + @ObsoleteCoroutinesApi + private val myRsocketActor = rsocketActor() + + @ObsoleteCoroutinesApi + suspend fun addRequester(rSocketRequester: RSocketRequester, clientId: String) { + log.info("adding requester {}", clientId) + myRsocketActor.send(AddRequester(rSocketRequester, clientId)) + } + + @ObsoleteCoroutinesApi + suspend fun removeRequester(clientId: String) { + log.info("removing requester {}", clientId) + myRsocketActor.send(RemoveRequester(clientId)) + } + + @ObsoleteCoroutinesApi + @ConnectMapping(value = ["client-id2"]) + fun onConnect( + rSocketRequester: RSocketRequester, + @Payload user: User + ) { + launch(coroutineContext) { + val clientId = user.id.toString() + addRequester(rSocketRequester, clientId) + rSocketRequester + .rsocket() + .onClose() + .subscribe(null, null, { + log.info("{} just disconnected", clientId) + launch(coroutineContext) { removeRequester(clientId) } + }) + + } + } + + @ObsoleteCoroutinesApi + @MessageMapping("private.news.2") + fun privateNews(message: Message) { + launch { + val res = CompletableDeferred>() + myRsocketActor.send(GetRequesters(res)) + res + .await() + .filterKeys { key -> key == message.toUser || key == message.fromUser } + .values() + .forEach { requester -> + println("Sending message") + sendMessage(requester, message) + } + } + } + + private fun sendMessage(requester: RSocketRequester, message: Message) = + requester + .route("user.queue.reply") + .data(message) + .send() + .subscribe() + + @MessageExceptionHandler + suspend fun handleException(ex: IllegalArgumentException): String { + delay(10) + return "${ex.message} handled" + } + +} diff --git a/src/main/kotlin/com/example/demo/controller/RsocketDemoController.kt b/src/main/kotlin/com/example/demo/controller/RsocketDemoController.kt new file mode 100644 index 0000000..b313460 --- /dev/null +++ b/src/main/kotlin/com/example/demo/controller/RsocketDemoController.kt @@ -0,0 +1,20 @@ +package com.example.demo.controller + +import com.example.demo.service.UserService +import kotlinx.coroutines.coroutineScope +import org.springframework.messaging.handler.annotation.DestinationVariable +import org.springframework.messaging.handler.annotation.MessageMapping +import org.springframework.stereotype.Component + +@Component +class RsocketDemoController(private val userService: UserService) { + @MessageMapping("messages.findAll") + suspend fun all() = coroutineScope { + userService.getAllMessages() + } + + @MessageMapping("users.{name}") + suspend fun getUser(@DestinationVariable name: String) = coroutineScope { + userService.getUserByName(name) + } +} \ No newline at end of file