Browse Source

Added actor rsocket conn controller

Some refactorings
Added counterpart of existing rsocket connection controller that is
based on kotlin coroutines and actor model
master
Rohan Sircar 4 years ago
parent
commit
b8e5dcd8d0
  1. 29
      frontend/src/main.ts
  2. 9
      src/main/kotlin/com/example/demo/controller/HomeRestController.kt
  3. 34
      src/main/kotlin/com/example/demo/controller/RsocketActor.kt
  4. 38
      src/main/kotlin/com/example/demo/controller/RsocketConnectionController.kt
  5. 86
      src/main/kotlin/com/example/demo/controller/RsocketConnectionControllerAsync.kt
  6. 20
      src/main/kotlin/com/example/demo/controller/RsocketDemoController.kt

29
frontend/src/main.ts

@ -11,14 +11,13 @@ import {
WellKnownMimeType, WellKnownMimeType,
APPLICATION_JSON, APPLICATION_JSON,
} from "rsocket-core"; } from "rsocket-core";
// MESSAGE_RSOCKET_AUTHENTICATION("message/x.rsocket.authentication.v0", (byte) 0x7C)
import { EchoResponder } from "./EchoResponder"; import { EchoResponder } from "./EchoResponder";
import { every } from "rsocket-flowable"; import { every } from "rsocket-flowable";
import RSocketTcpClient from "rsocket-tcp-client"; import RSocketTcpClient from "rsocket-tcp-client";
import RSocketWebSocketClient from "rsocket-websocket-client"; import RSocketWebSocketClient from "rsocket-websocket-client";
const maxRSocketRequestN = 2147483647; const maxRSocketRequestN = 2147483647;
const host = "127.0.0.1";
const port = 7000;
const keepAlive = 60000; const keepAlive = 60000;
const lifetime = 180000; const lifetime = 180000;
const dataMimeType = "application/octet-stream"; const dataMimeType = "application/octet-stream";
@ -28,16 +27,17 @@ const address = { host: "localhost", port: 7000 };
const messageReceiver = (payload) => { const messageReceiver = (payload) => {
//do what you want to do with received message //do what you want to do with received message
if ((payload.metadata as string).slice(1) == "user.queue.reply") console.log("YES");
if ((payload.metadata as string).slice(1) == "user.queue.reply")
console.log("YES");
else console.log("No"); else console.log("No");
console.log((payload.metadata as string).slice(1))
console.log((payload.metadata as string).slice(1));
console.log(payload); console.log(payload);
}; };
const responder = new EchoResponder(messageReceiver); const responder = new EchoResponder(messageReceiver);
function getClientTransport(host: string, port: number) { function getClientTransport(host: string, port: number) {
return new RSocketWebSocketClient({ return new RSocketWebSocketClient({
url: "ws://localhost:7000/client-id",
url: `ws://${host}:${port}/client-id`,
}); });
} }
@ -47,7 +47,7 @@ interface Message {
message: string; message: string;
} }
const client = new RSocketClient({
const client = new RSocketClient<Object, Encodable>({
// send/receive JSON objects instead of strings/buffers // send/receive JSON objects instead of strings/buffers
serializers: { serializers: {
data: JsonSerializer, data: JsonSerializer,
@ -56,8 +56,17 @@ const client = new RSocketClient({
setup: { setup: {
//for connection mapping on server //for connection mapping on server
payload: { payload: {
data: "1234",
metadata: String.fromCharCode("client-id".length) + "client-id",
data: { id: 1234, name: "John" },
// metadata: encodeAndAddWellKnownMetadata(
// encodeAndAddCustomMetadata(
// Buffer.alloc(0),
// "message/x.rsocket.authentication.v0",
// Buffer.from("Hello World")
// ),
// MESSAGE_RSOCKET_ROUTING,
// Buffer.from(String.fromCharCode("client-id2".length) + "client-id2")
// ),
metadata: String.fromCharCode("client-id2".length) + "client-id2"
}, },
// ms btw sending keepalive to server // ms btw sending keepalive to server
keepAlive: 60000, keepAlive: 60000,
@ -75,7 +84,7 @@ const client = new RSocketClient({
transport: getClientTransport(address.host, address.port), transport: getClientTransport(address.host, address.port),
}); });
const route = "user.queue.reply"; const route = "user.queue.reply";
const sendRoute = "private.news";
const sendRoute = "private.news.2";
client.connect().subscribe({ client.connect().subscribe({
onComplete: (rSocket) => { onComplete: (rSocket) => {
every(1000).subscribe({ every(1000).subscribe({
@ -110,7 +119,7 @@ client.connect().subscribe({
// }); // });
rSocket.fireAndForget({ rSocket.fireAndForget({
data: { toUser: "4567", fromUser: "1234", message: "testHello" },
data: <Message>{ toUser: "4567", fromUser: "1234", message: "testHello" },
metadata: String.fromCharCode(sendRoute.length) + sendRoute, metadata: String.fromCharCode(sendRoute.length) + sendRoute,
}); });
// S APPLICATION_JSON.string // S APPLICATION_JSON.string

9
src/main/kotlin/com/example/demo/controller/HomeRestController.kt

@ -61,15 +61,6 @@ class HomeRestController(@Autowired @Lazy private val userService: UserService)
userService.getUserMessages(userName) userService.getUserMessages(userName)
} }
@MessageMapping("messages.findAll")
suspend fun all() = coroutineScope {
userService.getAllMessages()
}
@MessageMapping("users.{name}")
suspend fun getUser(@DestinationVariable name: String) = coroutineScope {
userService.getUserByName(name)
}
} }
data class MyData(val id: Int, val name2: String) data class MyData(val id: Int, val name2: String)

34
src/main/kotlin/com/example/demo/controller/RsocketActor.kt

@ -0,0 +1,34 @@
package com.example.demo.controller
import io.vavr.collection.HashMap
import io.vavr.collection.Map
import kotlinx.coroutines.CompletableDeferred
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.ObsoleteCoroutinesApi
import kotlinx.coroutines.channels.actor
import org.springframework.messaging.rsocket.RSocketRequester
// Message types for counterActor
sealed class Message
data class AddRequester(val requester: RSocketRequester, val clientId: String) : Message()
data class RemoveRequester(val clientId: String) : Message()
data class GetRequesters(val response: CompletableDeferred<Map<String, RSocketRequester>>) : Message()
// This function launches a new rsocket actor
@ObsoleteCoroutinesApi // Actor API is planned to be overhauled so this annotation is required for now
fun CoroutineScope.rsocketActor() = actor<Message> {
var requesterMap: Map<String, RSocketRequester> = HashMap.empty() // actor state
for (msg in channel) { // iterate over incoming messages
when (msg) {
is AddRequester -> {
println("Actor adding requester")
requesterMap = requesterMap.put(msg.clientId, msg.requester)
}
is RemoveRequester -> {
println("Actor removing requester")
requesterMap = requesterMap.remove(msg.clientId)
}
is GetRequesters -> msg.response.complete(requesterMap)
}
}
}

38
src/main/kotlin/com/example/demo/controller/RsocketClient.kt → src/main/kotlin/com/example/demo/controller/RsocketConnectionController.kt

@ -1,16 +1,19 @@
package com.example.demo.controller package com.example.demo.controller
import com.example.demo.model.Message import com.example.demo.model.Message
import com.example.demo.model.User
import io.vavr.collection.HashMap import io.vavr.collection.HashMap
import io.vavr.collection.Map import io.vavr.collection.Map
import kotlinx.coroutines.delay import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.flow
import org.slf4j.LoggerFactory import org.slf4j.LoggerFactory
import org.springframework.messaging.handler.annotation.MessageExceptionHandler import org.springframework.messaging.handler.annotation.MessageExceptionHandler
import org.springframework.messaging.handler.annotation.MessageMapping import org.springframework.messaging.handler.annotation.MessageMapping
import org.springframework.messaging.handler.annotation.Payload
import org.springframework.messaging.rsocket.RSocketRequester import org.springframework.messaging.rsocket.RSocketRequester
import org.springframework.messaging.rsocket.annotation.ConnectMapping import org.springframework.messaging.rsocket.annotation.ConnectMapping
import org.springframework.stereotype.Controller import org.springframework.stereotype.Controller
import org.springframework.util.MimeType
@Controller @Controller
class RSocketConnectionController { class RSocketConnectionController {
@ -37,17 +40,18 @@ class RSocketConnectionController {
} }
@ConnectMapping("client-id") @ConnectMapping("client-id")
fun onConnect(rSocketRequester: RSocketRequester, clientId: String) {
val clientIdFixed = clientId.replace("\"", "") //check why the serializer adds " to strings
fun onConnect(rSocketRequester: RSocketRequester, @Payload user: User) {
// val clientIdFixed = clientId.replace("\"", "") //check why the serializer adds " to strings
// rSocketRequester.rsocket().dispose() //to reject connection // rSocketRequester.rsocket().dispose() //to reject connection
val clientId = user.id.toString()
rSocketRequester rSocketRequester
.rsocket() .rsocket()
.onClose() .onClose()
.subscribe(null, null, { .subscribe(null, null, {
log.info("{} just disconnected", clientIdFixed)
removeRequester(clientIdFixed)
log.info("{} just disconnected", clientId)
removeRequester(clientId)
}) })
addRequester(rSocketRequester, clientIdFixed)
addRequester(rSocketRequester, clientId)
} }
@MessageMapping("private.news") @MessageMapping("private.news")
@ -55,19 +59,37 @@ class RSocketConnectionController {
getRequesterMap() getRequesterMap()
.filterKeys { key -> key == message.toUser || key == message.fromUser } .filterKeys { key -> key == message.toUser || key == message.fromUser }
.values() .values()
.forEach { requester -> sendMessage(requester, message) }
.forEach { requester ->
run {
println("Sending message")
sendMessage(requester, message)
}
}
} }
@MessageExceptionHandler @MessageExceptionHandler
suspend fun handleException(ex: IllegalArgumentException): String { suspend fun handleException(ex: IllegalArgumentException): String {
delay(10) delay(10)
return "${ex.message} handled"
return "${ex.message} handled"
}
@MessageMapping("echo-stream-async")
suspend fun echoStreamAsync(payload: String): Flow<String> {
delay(10)
var i = 0
return flow {
while (true) {
delay(10)
emit("$payload ${i++}")
}
}
} }
private fun sendMessage(requester: RSocketRequester, message: Message) = private fun sendMessage(requester: RSocketRequester, message: Message) =
requester requester
.route("user.queue.reply") .route("user.queue.reply")
// .metadata("test", MimeTypeUtils.parseMimeType(WellKnownMimeType.MESSAGE_RSOCKET_COMPOSITE_METADATA.string))
.data(message) .data(message)
.send() .send()
.subscribe() .subscribe()

86
src/main/kotlin/com/example/demo/controller/RsocketConnectionControllerAsync.kt

@ -0,0 +1,86 @@
package com.example.demo.controller
import com.example.demo.model.Message
import com.example.demo.model.User
import kotlinx.coroutines.*
import org.slf4j.LoggerFactory
import org.springframework.messaging.handler.annotation.MessageMapping
import org.springframework.messaging.handler.annotation.Payload
import org.springframework.messaging.rsocket.RSocketRequester
import org.springframework.messaging.rsocket.annotation.ConnectMapping
import org.springframework.stereotype.Controller
import io.vavr.collection.Map
import org.springframework.messaging.handler.annotation.MessageExceptionHandler
@Controller
class RsocketConnectionControllerAsync : CoroutineScope {
private val log = LoggerFactory.getLogger(RsocketConnectionControllerAsync::class.java)
private val job = Job()
override val coroutineContext = Dispatchers.Unconfined + job
@ObsoleteCoroutinesApi
private val myRsocketActor = rsocketActor()
@ObsoleteCoroutinesApi
suspend fun addRequester(rSocketRequester: RSocketRequester, clientId: String) {
log.info("adding requester {}", clientId)
myRsocketActor.send(AddRequester(rSocketRequester, clientId))
}
@ObsoleteCoroutinesApi
suspend fun removeRequester(clientId: String) {
log.info("removing requester {}", clientId)
myRsocketActor.send(RemoveRequester(clientId))
}
@ObsoleteCoroutinesApi
@ConnectMapping(value = ["client-id2"])
fun onConnect(
rSocketRequester: RSocketRequester,
@Payload user: User
) {
launch(coroutineContext) {
val clientId = user.id.toString()
addRequester(rSocketRequester, clientId)
rSocketRequester
.rsocket()
.onClose()
.subscribe(null, null, {
log.info("{} just disconnected", clientId)
launch(coroutineContext) { removeRequester(clientId) }
})
}
}
@ObsoleteCoroutinesApi
@MessageMapping("private.news.2")
fun privateNews(message: Message) {
launch {
val res = CompletableDeferred<Map<String, RSocketRequester>>()
myRsocketActor.send(GetRequesters(res))
res
.await()
.filterKeys { key -> key == message.toUser || key == message.fromUser }
.values()
.forEach { requester ->
println("Sending message")
sendMessage(requester, message)
}
}
}
private fun sendMessage(requester: RSocketRequester, message: Message) =
requester
.route("user.queue.reply")
.data(message)
.send()
.subscribe()
@MessageExceptionHandler
suspend fun handleException(ex: IllegalArgumentException): String {
delay(10)
return "${ex.message} handled"
}
}

20
src/main/kotlin/com/example/demo/controller/RsocketDemoController.kt

@ -0,0 +1,20 @@
package com.example.demo.controller
import com.example.demo.service.UserService
import kotlinx.coroutines.coroutineScope
import org.springframework.messaging.handler.annotation.DestinationVariable
import org.springframework.messaging.handler.annotation.MessageMapping
import org.springframework.stereotype.Component
@Component
class RsocketDemoController(private val userService: UserService) {
@MessageMapping("messages.findAll")
suspend fun all() = coroutineScope {
userService.getAllMessages()
}
@MessageMapping("users.{name}")
suspend fun getUser(@DestinationVariable name: String) = coroutineScope {
userService.getUserByName(name)
}
}
Loading…
Cancel
Save