Building a Chat Application With Angular and Spring RSocket

Hantsy
The Startup
Published in
5 min readJul 20, 2020

--

In this post, we will use RSocket protocol to reimplement the chat application.

If you have missed the former posts about implementing the chat application, there is a checklist.

RSocket is a binary protocol for use on byte stream transports, such as TCP, WebSocket, RCP etc.

RSocket embraces ReactiveStreams semantics, and Spring provides excellent RSocket support through the existing messaging infrastructure. I have introduced RSocket in my former posts, check here.

In this post, we will use WebSocket as transport protocol which is good for web application. RSocket defines 4 interaction modes, we will use fire-and-forget to send a message to the server side, and request/streams to retrieve messages as an infinite stream from the server.

Firstly let’s create the server application. Generate a project skeleton using Spring Initializr.

  • Project type: Gradle
  • Language: Kotlin
  • Spring Boot version :2.4.0M1
  • Project Metadata/Java: 14
  • Dependencies: Reactive Web, RSocket

Hit the Generate button to download the generated archive, and extract it into your local disk.

Make sure you have installed the latest JDK 14 (AdoptOpenJDK is highly recommended), then import the source codes in your IDEs. eg. Intellij IDEA, and start to implement the server side.

We also skip the discussion of Reactor’s Sink implementation here.

Create a Message document definition and a Repository for it.

interface MessageRepository : ReactiveMongoRepository<Message, String> {
@Tailable
fun getMessagesBy(): Flux<Message>
}
@Document(collection = "messages")
data class Message(@Id var id: String? = null, var body: String, var sentAt: Instant = Instant.now())

Create a @Controller to handle messages.

@Controller
class MessageController(private val messages: MessageRepository) {
@MessageMapping("send")
fun hello(p: String) = this.messages.save(Message(body = p, sentAt = Instant.now())).log().then()
@MessageMapping("messages")
fun messageStream(): Flux<Message> = this.messages.getMessagesBy().log()
}

The send route accept a String based message body return a Mono<Void>, which will handle the fire-and-forget mode from client. The messages route accept a null payload and return Flux<Message>, which will as the handler of request-stream mode.

If you are new to the Spring RSocket , you may be confused how @Controller and MessageMapping are mapped to the types of the original RSocket message handlers. Spring hides the complexity of RSocket protocol itself , and reuse the existing messaging infrastructure to handle RSocket messages. Remember, compare the incoming payload and outgoing message type with 4 interaction mode definitions in the official RSocket website, you can determine which interaction mode it is mapped to.

Configure RSocket to use websocket transport in the application.properties file.

# a mapping path is defined
spring.rsocket.server.mapping-path=/rsocket
# websocket is chosen as a transport
spring.rsocket.server.transport=websocket

Start a MongoDB service as follows.

docker-compose up mongodb

You have to prepare a capped messages collection, check this post for more details.

Run the following command to start the server side application.

./gradlew bootRun

Write a small integration test to verify if it works.

@SpringBootTest
class RSocketServerApplicationTests {
@Autowired
lateinit var rSocketRequester: RSocketRequester;
@Test
fun contextLoads() {
val verifier= rSocketRequester.route("messages")
.retrieveFlux(Message::class.java)
.log()
.`as` { StepVerifier.create(it) }
.consumeNextWith { it -> assertThat(it.body).isEqualTo("test message") }
.consumeNextWith { it -> assertThat(it.body).isEqualTo("test message2") }
.thenCancel()
.verifyLater()
rSocketRequester.route("send").data("test message").send().then().block()
rSocketRequester.route("send").data("test message2").send().then().block()
verifier.verify(Duration.ofSeconds(5))
}
@TestConfiguration
class TestConfig {
@Bean
fun rSocketRequester(builder: RSocketRequester.Builder) = builder.dataMimeType(MimeTypeUtils.APPLICATION_JSON)
.connectWebSocket(URI.create("ws://localhost:8080/rsocket")).block()
}
}

In the above codes, use a test specific @TestConfiguration to define a RSocketRequester bean, which is a helper to communicate with the server side.

Let’s move to the frontend application.

Create a new Angular project, add two dependencies: roscket-core, rsocket-websocket-client.

npm install roscket-core rsocket-websocket-client

Next fill the following codes in the app.component.ts file. I've spent some time on making this work with my backend, the article RSocket With Spring Boot + JS: Zero to Hero from Domenico Sibilio is very helpful.

export class AppComponent implements OnInit, OnDestroy {  title = 'client';
message = '';
messages: any[];
client: RSocketClient;
sub = new Subject();
ngOnInit(): void {
this.messages = [];
// Create an instance of a client
this.client = new RSocketClient({
serializers: {
data: JsonSerializer,
metadata: IdentitySerializer
},
setup: {
// ms btw sending keepalive to server
keepAlive: 60000,
// ms timeout if no keepalive response
lifetime: 180000,
// format of `data`
dataMimeType: 'application/json',
// format of `metadata`
metadataMimeType: 'message/x.rsocket.routing.v0',
},
transport: new RSocketWebSocketClient({
url: 'ws://localhost:8080/rsocket'
}),
});
// Open the connection
this.client.connect().subscribe({
onComplete: (socket: RSocket) => {
// socket provides the rsocket interactions fire/forget, request/response,
// request/stream, etc as well as methods to close the socket.
socket
.requestStream({
data: null, // null is a must if it does not include a message payload, else the Spring server side will not be matched.
metadata: String.fromCharCode('messages'.length) + 'messages'
})
.subscribe({
onComplete: () => console.log('complete'),
onError: error => {
console.log("Connection has been closed due to:: " + error);
},
onNext: payload => {
console.log(payload);
this.addMessage(payload.data);
},
onSubscribe: subscription => {
subscription.request(1000000);
},
});
this.sub.subscribe({
next: (data) => {
socket.fireAndForget({
data: data,
metadata: String.fromCharCode('send'.length) + 'send',
});
}
})
},
onError: error => {
console.log("Connection has been refused due to:: " + error);
},
onSubscribe: cancel => {
/* call cancel() to abort */
}
});
}
addMessage(newMessage: any) {
console.log("add message:" + JSON.stringify(newMessage))
this.messages = [...this.messages, newMessage];
}
ngOnDestroy(): void {
this.sub.unsubscribe();
if (this.client) {
this.client.close();
}
}
sendMessage() {
console.log("sending message:" + this.message);
this.sub.next(this.message);
this.message = '';
}
}

Reuse the template file we uesd in the former posts.

<div fxFlex>
<p *ngFor="let m of messages">
{{m|json}}
</p>
</div>
<div>
<form fxLayout="row baseline" #messageForm="ngForm" (ngSubmit)="sendMessage()">
<mat-form-field fxFlex>
<input name="message" fxFill matInput #messageCtrl="ngModel" [(ngModel)]="message" required />
<mat-error fxLayoutAlign="start" *ngIf="messageCtrl.hasError('required')">
Message body can not be empty.
</mat-error>
</mat-form-field>
<div>
<button mat-button mat-icon-button type="submit" [disabled]="messageForm.invalid || messageForm.pending">
<mat-icon>send</mat-icon>
</button>
</div>
</form>
</div>

Next run the client application.

npm run start

Open two browser windows(or two different browsers), type some messages in each window and experience it.

I found a weired issue may be caused by the JSON Searializer encode/decode in the roscket-js, described it in rsocket-js issues#93, if you have an idea to overcome this, please comment on this issue.

Get the complete codes from my github.

--

--

Hantsy
The Startup

Self-employed technical consultant, solution architect and full-stack developer, open source contributor, freelancer and remote worker