Building a Central Authentication Server with Rust, PostgreSQL, Kafka and gRPC

Adeoye Adefemi
10 min readNov 7, 2023
Image generated with gencraft.com

TL;DR!

The entire code implementation of this tutorial is available at https://github.com/opeolluwa/martus/tree/master/auth

In the following sections, I’ll explain how a central authentication server may be built with Rust, PostgreSQL, Apache Kafka and gRPC. I’ll start with an overview of the technologies, the motivation of project, the challenges and possible improvements.

The guide would require some experience with the Rust programming language.

Introduction

Rust is a high performance systems programming language poised at empowering everyone to build efficient software.

gRPC is a modern open source high performance Remote Procedure Call (RPC) framework that can run in any environment built at Google. It uses protobuf for the service definition.

Protocol buffers (protobuf) are language-independent and extensible mechanism for serializing data. As of this writing, protobuf v3, which we’ll be using is the most prevalent.

Apache Kafka is a open source, distributed event streaming platform for building mission-critical application. It can be configured to do a couple of things, however in this context, it will be used as a message broker, pretty much like RabbitMQ or Redis

PostgreSQL is Open source Object Relational Database Management System

To keep the tutorial basic, I’ll not talk about the installation of the technologies.

Motivation

Let’s face it, why build an authentication server with so much technologies?

Allow me to digress a bit

I’m building a school management software, that has a couple of different entities — admin, staff, teachers, student, to mention a few. These entities would need a mechanism of authentication.

The sequence of authentication and authorization is such that the client application (web or mobile) requests would come into the system through a reverse proxy server, which I already built here.

Afterwards, the concerned service would send the authentication request to the central authentication server using gRPC. On success, the affected service would generate an authorization token before it finally returns the authorization token to the client application, through the proxy server.

Simply, the central auth server would authenticate each entity and the affected service will authorize the users on subsequent request.

martus auth sequence diagram

Implementation

We’ll begin by create a new Rust application

cargo new auth
cd auth

Next we’ll add the dependencies

[package]
edition = "2021"
name = "auth"
version = "0.1.0"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
anyhow = "1.0.75"
bcrypt = "0.15.0"
dotenv = "0.15.0"
jsonwebtoken = "9.1.0"
kafka = "0.10.0"
otp_generator = "0.1.0"
prost = "0.12.1"
serde = {version = "1.0.190", features = ["derive"]}
serde_json = "1.0.108"
sqlx = {version = "0.7.2", features = ["postgres", "tls-rustls", "uuid", "migrate", "chrono", "runtime-tokio"]}
tokio = {version = "1.33.0", features = ["macros", "rt-multi-thread"]}
tonic = "0.10.2"
tower-http = {version = "0.4.4", features = ["trace"]}
tracing = "0.1.40"
tracing-subscriber = {version = "0.3.17", features = ["env-filter"]}
uuid = {version = "1.5.0", features = [
"serde",
"v4",
"fast-rng",
"macro-diagnostics",
]}

[build-dependencies]
tonic-build = "0.10.2"

Next, I’ll install sqlx-cli to interact with my PostgreSQL database. Note that, I have a local PostgreSQL instance before creating a new database table

cargo add sqlx-cli
touch .env
echo "DATABASE_URL=postgres://<username>:<password>@localhost/martus_auth" >> .env
sqlx migrate add create_user_information_table

The step above would create a file like migrations/20231107205101_create_user_information_table.sql, fill the content of the file with this

-- Add migration script here
CREATE TABLE
user_information (
id uuid PRIMARY KEY NOT NULL,
email VARCHAR UNIQUE NOT NULL,
password VARCHAR NOT NULL,
is_verified BOOLEAN NOT NULL DEFAULT FALSE,
created_at TIMESTAMP NOT NULL DEFAULT NOW (),
updated_at TIMESTAMP NOT NULL DEFAULT NOW ()
)

Next, Well create the protobuf file, not that I’ve trimmed the file to a few lines for the sake of brevity.

mkdir proto
touch proto/auth.proto
echo "syntax = \"proto3\";
package martus_auth;


service Auth {
// see the grpc server status
rpc HealthCheck (HealthCheckRequest) returns (HealthCheckResponse);
rpc SignUp (SignupRequest) returns (SignUpResponse);
rpc Login (LoginRequest) returns (LoginResponse);
rpc Logout (LogoutRequest) returns (LogoutResponse);
rpc RefreshToken (RefreshTokenRequest) returns (RefreshTokenResponse);
rpc VerifyToken (VerifyTokenRequest) returns (VerifyTokenResponse);
rpc VerifyEmail (VerifyEmailRequest) returns (VerifyEmailResponse);
rpc ForgotPassword (ForgotPasswordRequest) returns (ForgotPasswordResponse);
rpc ResetPassword (ResetPasswordRequest) returns (ResetPasswordResponse);
rpc ChangePassword (ChangePasswordRequest) returns (ChangePasswordResponse);


}



// the health check is an empty object
message HealthCheckRequest {
}


// the health check response response
message HealthCheckResponse {
string status = 1;
string message = 2;
}

// the signup request
message SignupRequest {
string email = 1;
string password = 2;
}

// the signup response
message SignUpResponse {
bool success = 1;
string message = 2;
string token = 3;
}


// the login request
message LoginRequest {
string email = 1;
string password = 2;
}

// the login response
message LoginResponse {
bool success = 1;
string message = 2;
string token = 3;
}

// ...
}" >> proto/auth.proto

Finally, we can go on to add the logic layer. Just before this, we need a build script that’ll generate the Rust equivalent of the protobuf file, this would create a src/martus_auth.rs file, due to the size of the code, I’ll skip providing the content of the generated file and subsequent database migration files

touch build.rs
echo "use std::env;
use std::path::PathBuf;

fn main() -> Result<(), Box<dyn std::error::Error>> {
let proto_file = \"./proto/auth.proto\";
let out_dir = PathBuf::from(env::var(\"OUT_DIR\").unwrap());

tonic_build::configure()
.protoc_arg(\"--experimental_allow_proto3_optional\") // for older systems
// .build_client(true) // don't compile the client code
.build_server(true)
.file_descriptor_set_path(out_dir.join(\"store_descriptor.bin\"))
.out_dir(\"./src\")
.compile(&[proto_file], &[\"proto\"])?;

Ok(())
}" >> build.rs

Let’s fill our src/main.rs with the following

use martus_auth::auth_server::AuthServer;
use std::net::SocketAddr;
use tonic::transport::Server;

pub mod constants;
pub mod database;
mod grpc_server;
pub mod jwt;
pub mod mailer;
pub mod martus_auth;
pub mod otp;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
//parse env
dotenv::dotenv().ok();

//enable logging
tracing_subscriber::fmt()
.with_max_level(tracing::Level::INFO)
.init();

let server_address = SocketAddr::from(([0, 0, 0, 0], 5001));
let server = grpc_server::GrpcServer::default();

//TODO: run migrations
// sqlx::migrate!("./migrations")
// .run(&database_pool_connection)
// .await?;

tracing::info!(message = "Starting server.", %server_address);

// run the gRPC server
Server::builder()
.trace_fn(|_| tracing::info_span!("martus_auth_server"))
.add_service(AuthServer::new(server))
.serve(server_address)
.await?;

Ok(())
}

let’s create a bunch of modules which i already import

touch src/constants.rs src/jwt.rs src/database.rs src/grpc_server.rs src/otp.rs mailer.rs

The mailer module encapsulate Kafka and it would would take our data and ship it off to a message queue, at the other end of the queue, another application would fetch these data and send the emails

// src/mailer.rs


use anyhow::{Ok, Result};
use kafka::producer::{Producer, Record, RequiredAcks, DEFAULT_ACK_TIMEOUT_MILLIS};
use serde::{Deserialize, Serialize};
use std::env;
use std::fmt::Debug;
use std::time::Duration;

use crate::constants::EMAIL_QUEUE;
/// the mailer is a tiny layer over Apache Kafka
/// it allows a pub sub communication between the auth service and the email service in that it receives email payload and adds it to a message broker
/// the email service on the other end takes the email, feeds the data to a templates than sends it to the user
///

#[derive(Debug, Serialize, Deserialize)]
pub struct Mailer<T> {
pub email: String,
pub subject: String,
pub template: String,
pub data: T,
}

#[derive(Debug)]
pub enum EmailTemplate {
Signup,
Welcome,
PasswordReset,
ForgottenPassword,
}

// for each of the email template, return a tuple of string, containing the template and the subject name
impl EmailTemplate {
pub fn get_template(&self) -> (&str, &str) {
match self {
EmailTemplate::Signup => ("sign-up", "Welcome to Martus"),
EmailTemplate::Welcome => ("welcome", "Welcome to Martus"),
EmailTemplate::PasswordReset => ("password-update", "Martus Password Reset"),
EmailTemplate::ForgottenPassword => ("forgotten-password", "Password Reset"),
}
}
}

/// the email builder enforces correct positional argument for the mailer constructor
pub struct MailBuilder<'a>(pub &'a str, pub EmailTemplate);

impl<'a, T: Serialize + Debug + Deserialize<'a>> Mailer<T> {
pub fn new(email: &'a str, template: EmailTemplate, data: T) -> Self {
let (email_template, email_subject) = template.get_template();

let mailer = Mailer {
data,
email: email.to_string(),
subject: email_subject.to_string(),
template: email_template.to_string(),
};
mailer
}

pub async fn send(&self) -> Result<()> {
let timeout = DEFAULT_ACK_TIMEOUT_MILLIS;
let acks = RequiredAcks::One;
let kafka_host = env::var("KAFKA_HOST")?;
let payload = serde_json::to_string(self).unwrap();
let mut producer = Producer::from_hosts(vec![kafka_host.to_owned()])
.with_ack_timeout(Duration::from_millis(timeout))
.with_required_acks(acks)
.create()
.unwrap();
let record = Record::from_value(EMAIL_QUEUE, payload.as_bytes());
// add email to the queue
let status = producer.send(&record).ok();
match status {
Some(_) => println!("email sent"),
_ => println!("error sending email: ", ),
}
Ok(())
}
}

Similarly the database.rs file makes it possible to interact with the database

use anyhow::{Ok, Result};
use bcrypt::{hash, DEFAULT_COST};
use serde::{Deserialize, Serialize};
use sqlx::{postgres::PgPoolOptions, Pool, Postgres};
use uuid::Uuid;

use crate::{jwt::Jwt, otp::Otp};
#[derive(Debug)]

/// the database is reusable outside to module
/// especially for sharing the database connection
/// with other modules
pub struct Database {}

impl Database {
pub async fn conn() -> Pool<Postgres> {
let database_connection_url = std::env::var("DATABASE_URL")
.unwrap_or("postgres://opeolluwa:thunderstorm@localhost/martus_auth".to_string());

PgPoolOptions::new()
.max_connections(5)
.connect(&database_connection_url)
.await
.expect("error creating connection")
}
}

#[derive(Debug, Serialize, Deserialize, sqlx::FromRow, Default)]
#[serde(rename_all = "camelCase")]
pub struct UserInformation {
pub id: Uuid,
pub otp_id: Option<Uuid>,
pub email: String,
pub password: String,
pub is_verified: bool,
}

#[derive(Debug, Serialize, Deserialize, sqlx::FromRow)]
#[serde(rename_all = "camelCase")]
pub struct BlacklistedJwt {
pub id: Uuid,
pub email: String,
}

pub struct UserInformationBuilder<'a>(&'a str, &'a str);
impl<'a> UserInformationBuilder<'a> {
pub fn new(email: &'a str, password: &'a str) -> Self {
Self(email, password)
}
}

impl UserInformation {
pub async fn new(user: UserInformationBuilder<'_>) -> Result<Self> {
let database_pool_connection = Database::conn().await;

let id = Uuid::new_v4();
let email = user.0;
let password = hash(user.1, DEFAULT_COST)?;

let new_user = sqlx::query_as::<_, UserInformation>(
r#"
INSERT INTO user_information (id, email, password)
VALUES ($1, $2, $3)
RETURNING *
"#,
)
.bind(id)
.bind(email)
.bind(password)
.fetch_one(&database_pool_connection)
.await?;

Ok(new_user)
}

pub async fn validate_password<'a>(&self, password: &'a str) -> Result<bool> {
let database_pool_connection = Database::conn().await;
let user = sqlx::query_as::<_, UserInformation>(
r#"
SELECT * FROM user_information WHERE email=$1
"#,
)
.bind(&self.email)
.fetch_one(&database_pool_connection)
.await?;

Ok(bcrypt::verify(password, &user.password)?)
}

// update password
pub async fn change_password<'a>(email: &'a str, password: &'a str) -> Result<UserInformation> {
let database_pool_connection = Database::conn().await;
let user = sqlx::query_as::<_, UserInformation>(
r#"
UPDATE user_information SET password=$1 WHERE email=$2
RETURNING *
"#,
)
.bind(email)
.bind(password)
.fetch_one(&database_pool_connection)
.await
.ok()
.unwrap();

Ok(user)
}

// set verified
pub async fn set_verified<'a>(email: &'a str) -> Result<bool> {
let database_pool_connection = Database::conn().await;
let user = sqlx::query_as::<_, UserInformation>(
r#"
ALTER user_information SET is_verified=true WHERE email=$1
"#,
)
.bind(email)
.fetch_one(&database_pool_connection)
.await
.ok();

Ok(user.is_some())
}

// get a user record
pub async fn fetch<'a>(email: &'a str) -> Result<Self> {
let database_pool_connection = Database::conn().await;
let user = sqlx::query_as::<_, UserInformation>(
r#"
SELECT * FROM user_information WHERE email=$1
"#,
)
.bind(email)
.fetch_one(&database_pool_connection)
.await?;

Ok(user)
}
// creds_exists
pub async fn creds_exists<'a>(email: &'a str) -> Result<bool> {
let database_pool_connection = Database::conn().await;
let user = sqlx::query_as::<_, UserInformation>(
r#"
SELECT * FROM user_information WHERE email=$1
"#,
)
.bind(email)
.fetch_one(&database_pool_connection)
.await
.ok();

Ok(user.is_some())
}

pub async fn logout<'a>(token: &'a str) {
Jwt::blacklist(token).await;
}

// link user and otp
pub async fn gen_otp(&self, validity: i64) -> anyhow::Result<Otp> {
let otp = Otp::new(validity).await?;
let database_pool_connection = Database::conn().await;
let _ = sqlx::query_as::<_, UserInformation>(
r#"
UPDATE user_information SET otp_id=$1 WHERE email=$2
"#,
)
.bind(otp.id)
.bind(&self.email)
.fetch_one(&database_pool_connection)
.await
.ok();
Ok(otp)
}
}

Finally, the src/grpc_server implements the business logic, For brevity I’ve cut it down to a few controllers

use crate::{
database::{UserInformation, UserInformationBuilder},
jwt::{Claim, Jwt},
mailer::{EmailTemplate, Mailer},
martus_auth::{
auth_server::Auth, ChangePasswordRequest, ChangePasswordResponse, ForgotPasswordRequest,
ForgotPasswordResponse, HealthCheckRequest, HealthCheckResponse, LoginRequest,
LoginResponse, LogoutRequest, LogoutResponse, RefreshTokenRequest, RefreshTokenResponse,
ResetPasswordRequest, ResetPasswordResponse, SignUpResponse, SignupRequest,
VerifyEmailRequest, VerifyEmailResponse, VerifyTokenRequest, VerifyTokenResponse,
}, otp::Otp,
};
use serde_json::json;
use tonic::Response;

// impl
#[derive(Debug, Default)]
pub struct GrpcServer {}

#[tonic::async_trait]
impl Auth for GrpcServer {
// the health check rpc
async fn health_check(
&self,
_request: tonic::Request<HealthCheckRequest>,
) -> std::result::Result<tonic::Response<HealthCheckResponse>, tonic::Status> {
// build the response
let response = HealthCheckResponse {
status: "Ok".to_string(),
message: "Service up and running".to_string(),
};
Ok(Response::new(response))
}

async fn sign_up(
&self,
request: tonic::Request<SignupRequest>,
) -> std::result::Result<tonic::Response<SignUpResponse>, tonic::Status> {
// get the payload
let payload = request.into_inner();

//see if creds exist
let account_exist = UserInformation::creds_exists(&payload.email).await.unwrap();
if account_exist {
return Err(tonic::Status::already_exists("email already exist"));
}

// create the user
let new_user = UserInformationBuilder::new(&payload.email, &payload.password);
let user: Result<UserInformation, anyhow::Error> = UserInformation::new(new_user).await;

if !user.is_ok() {
return Err(tonic::Status::internal("error creating user"));
}

let user = user.unwrap();
let Otp { otp, .. } = user.gen_otp(5).await.unwrap();

// send a verification email to the user
let _ = Mailer::new(
&payload.email,
EmailTemplate::Signup,
json!({
"otp": otp
}),
)
.send()
.await
.unwrap();

let claim = Claim {
id: user.id.to_string(),
email: user.email,
};
let jwt = Jwt::new(claim).sign().await;
let response = SignUpResponse {
success: true,
message: "User created successfully".to_string(),
token: jwt.unwrap(),
};

Ok(Response::new(response))
}

async fn login(
&self,
request: tonic::Request<LoginRequest>,
) -> std::result::Result<tonic::Response<LoginResponse>, tonic::Status> {
let LoginRequest { email, password } = request.into_inner();

// validate the user credentials
let user = UserInformation::fetch(&email).await;
if !user.is_ok() {
return Err(tonic::Status::already_exists(
"no user with proved credentials was found",
));
}

let user = user.unwrap();
let Some(is_correct_password) = user.validate_password(&password).await.ok() else {
return Err(tonic::Status::invalid_argument(
"invalid username or password",
));
};

if !is_correct_password {
return Err(tonic::Status::invalid_argument(
"invalid username or password",
));
}

// sign the JWT
let claim = Claim {
id: user.id.to_string(),
email: user.email,
};
let jwt = Jwt::new(claim).sign().await;
let response = LoginResponse {
success: true,
message: "user successfully logged in".to_string(),
token: jwt.unwrap(),
};

Ok(Response::new(response))
}


}

Finally, the otp and the jwt modules

// otp.rs

use crate::database::Database;
use anyhow::Result;
use serde::{Deserialize, Serialize};
use sqlx::types::chrono::Utc;
use uuid::Uuid;

#[derive(Debug, Serialize, Deserialize, sqlx::FromRow)]
#[serde(rename_all = "camelCase")]
pub struct Otp {
pub id: Uuid,
pub exp: i64,
pub otp: String,
}

impl Otp {
pub async fn new(validity: i64) -> Result<Self> {
let database_pool_connection = Database::conn().await;
let record_id = Uuid::new_v4();

let flags = otp_generator::Flags {
digits: true,
..Default::default()
};
let otp = otp_generator::generate(6, &flags).unwrap();
let now = Utc::now().timestamp();
let exp = now + 60 * 1000 * validity;

// let exp: NaiveDateTime = NaiveDateTime::date(now).timestamp();
let otp = sqlx::query_as::<_, Otp>(
r#"
INSERT INTO one_time_passwords (id, exp, otp)
VALUES ($1, $2, $3) RETURNING *
"#,
)
.bind(record_id)
.bind(exp)
.bind(otp)
.fetch_one(&database_pool_connection)
.await?;

Ok(otp)
}
}
//jwt.rs

use anyhow::Result;
use jsonwebtoken::{decode, encode, Algorithm, DecodingKey, EncodingKey, Header, Validation};
use serde::{Deserialize, Serialize};
use std::env;
use uuid::Uuid;

use crate::database::{BlacklistedJwt, Database};
/// the jwt module provides helpers amd mechanism
/// for encrypting and decrypting json web token
/// it uses EdSCA algorithm
#[derive(Debug, Serialize, Deserialize)]
pub struct Jwt {
pub claim: Claim,
}
#[derive(Debug, Deserialize, Serialize)]
pub struct Claim {
/// the user id, a UUId
pub id: String,
/// the user email
pub email: String,
}

impl Jwt {
/// take new a new user credential and build the claims
pub fn new(claim: Claim) -> Self {
let claim = Claim {
email: claim.email,
id: claim.id.to_string(),
};
Self { claim }
}

pub async fn sign(&self) -> Result<String> {
let jtw_secret = env::var("JWT_SECRET")?;
let algorithm = Algorithm::HS512;
let header = Header::new(algorithm);
let jwt_token = encode(
&header,
self,
&EncodingKey::from_secret(jtw_secret.as_bytes()),
);
Ok(jwt_token?)
}

// decode the jwt
pub fn decode(token: &str) -> Result<Claim> {
let jtw_secret = env::var("JWT_SECRET")?;
let token = decode::<Claim>(
&token,
&DecodingKey::from_secret(jtw_secret.as_ref()),
&Validation::default(),
)?;
Ok(token.claims)
}

pub async fn blacklist<'a>(token: &'a str) {
let database_pool_connection = Database::conn().await;
let record_id = Uuid::new_v4();
let _ = sqlx::query_as::<_, BlacklistedJwt>(
r#"
INSERT INTO blacklisted_jwt (id, token) VALUES ($1,$2) RETURNING *
"#,
)
.bind(record_id)
.bind(token)
.fetch_one(&database_pool_connection)
.await;
}
}

// tests

That was quite a handful, to get the best of it, you might want to check out the project here

Challenges

The major challenge I have was setting up kafka locally luckily, I was able to get it right after a few retries, I believe the codebase can be made more modular as time goes by.

There you have it, I’ll see you in the next one!

--

--

Adeoye Adefemi

Rustacean, adventurous backend engineer and embedded software enthusiast