package com.example.demo.controller import com.example.demo.model.Message import io.vavr.collection.HashMap import io.vavr.collection.Map import kotlinx.coroutines.delay import org.slf4j.LoggerFactory import org.springframework.messaging.handler.annotation.MessageExceptionHandler import org.springframework.messaging.handler.annotation.MessageMapping import org.springframework.messaging.rsocket.RSocketRequester import org.springframework.messaging.rsocket.annotation.ConnectMapping import org.springframework.stereotype.Controller import org.springframework.util.MimeType @Controller class RSocketConnectionController { private val log = LoggerFactory.getLogger(RSocketConnectionController::class.java) private var requesterMap: Map = HashMap.empty() @Synchronized private fun getRequesterMap(): Map { return requesterMap } @Synchronized private fun addRequester(rSocketRequester: RSocketRequester, clientId: String) { log.info("adding requester {}", clientId) requesterMap = requesterMap.put(clientId, rSocketRequester) } @Synchronized private fun removeRequester(clientId: String) { log.info("removing requester {}", clientId) requesterMap = requesterMap.remove(clientId) } @ConnectMapping("client-id") fun onConnect(rSocketRequester: RSocketRequester, clientId: String) { val clientIdFixed = clientId.replace("\"", "") //check why the serializer adds " to strings // rSocketRequester.rsocket().dispose() //to reject connection rSocketRequester .rsocket() .onClose() .subscribe(null, null, { log.info("{} just disconnected", clientIdFixed) removeRequester(clientIdFixed) }) addRequester(rSocketRequester, clientIdFixed) } @MessageMapping("private.news") fun privateNews(message: Message, rSocketRequesterParam: RSocketRequester) { getRequesterMap() .filterKeys { key -> key == message.toUser || key == message.fromUser } .values() .forEach { requester -> sendMessage(requester, message) } } @MessageExceptionHandler suspend fun handleException(ex: IllegalArgumentException): String { delay(10) return "${ex.message} handled" } private fun sendMessage(requester: RSocketRequester, message: Message) = requester .route("user.queue.reply") .data(message) .send() .subscribe() }