DB2: Tokio Mini Redis — Data IO (Part 2)

Arjun Sunil Kumar
Distributed Systems Engineering
4 min readMay 1, 2022

The last time we looked into mini-redis, we looked command.rs, server.rs etc. Today, we will continue the left out.

connection.rs

Usage: In the Listener implementation, we are creating a new handler for every incoming client. In the handler, we are passing in the Connection.

Struct:

Functions:

Init:

> pub fn new(socket: TcpStream) -> Connection {

Read:

> pub async fn read_frame(&mut self) -> crate::Result<Option<Frame>>

> fn parse_frame(&mut self) -> crate::Result<Option<Frame>>

Write:

> pub async fn write_frame(&mut self, frame: &Frame) -> io::Result<()>

> async fn write_value(&mut self, frame: &Frame) -> io::Result<()>

> async fn write_decimal(&mut self, val: u64) -> io::Result<()>

  • Write
https://doc.rust-lang.org/std/io/struct.BufWriter.html
  • Read
https://docs.rs/bytes/0.4.12/bytes/struct.BytesMut.html

Let's see each function one by one.

  • new()
  • read

into()

  • If you are able to read the full frame (since the Buffer is full), then return the frame using parse_frame
  • else use read_buf to fill the contents into the buffer, so that you will be able to read it later as a frame, using parse_frame.

Cursor: (Iterator Pattern)

  • write

Usage:

It is used to write the frame in

  • client
  • in the commands

frame.rs

Usage of write! (macro) in rust

For into(), Frame implements all the from()

`a usage

SO Answer:

https://stackoverflow.com/questions/47640550/what-is-a-in-rust-language

That’s pretty much it for frame.rs

I am not going into the depths of all the syntax at present. Trying to get an overall holistic understanding of the code as such.

db.rs

DbDropGuard →Db →Shared →State →Entry

// Similar to DLock #lock()
self.shared.state.lock().unwrap();

Pretty much details in the comments.

parse.rs

Reads tokens and parse it to Format Datatypes.

shutdown.rs

Flow:

Ctrl + C can be thought of as a thread wait for the keyboard signal.

bin/server.rs

From the bin/server.rs, we are calling the run() function in the server.rs

Tokio::select!

Lets see how tokio::select! macro works

The tokio::select! macro allows waiting on multiple async computations and returns when a single computation completes.

https://tokio.rs/tokio/tutorial/select

Here also, when the shutdown thread finishes first, the program beings the shutdown.

server.rs
Changing the fields in the Server Listener.
  • Calling the drop(notify_shutdown) sends out the shutdown broadcast. It is Observer patter used here.

Every Handler created for a client, will have a Shutdown object.

server.rs — Handler constructor()

In the Handler.run() we are checking if the server shutdown was received before executing the operation.

server.rs — Handler run()

The run, calls apply() function. The apply function receives the shutdown object.

server.rs — Handler run() → cmd.apply()

The shutdownload object is used by Subscribe Command. It is handled to send back Ok() to the client, incase we receive a Shutdown signal in the server.

subscribe.rs — apply()

Conclusion:

We covered most of the mini-redis by this session. This database is composed of

  • commands : mod :{get, ping, publish, set, subscribe}
  • Server deamon : server, shutdown
  • State: db
  • Block Datatypes + Parser: frame. parser
  • Socket IO (Buffer): connection, buffer
  • Client: cli, client, blocking_client, buffer.

In the next session, we will cover the client side of things. Untill then, have a blast!

Found it Interesting?

Please show your support by 👏.

--

--

Arjun Sunil Kumar
Distributed Systems Engineering

Writes on Database Kernel, Distributed Systems, Cloud Technology, Data Engineering & SDE Paradigm. github.com/arjunsk