Sales email classification with AWS Comprehend

Richárd Rutai
SnapSoft.io Blog
Published in
22 min readMar 31, 2023

Introduction

Our team has created an email classification system that employs AWS Comprehend to categorize emails sent to sales employees in response to sales inquiries as either positive or negative. This allows them to focus their attention on emails that have the potential to generate sales leads while disregarding those that do not. To implement this system, we have built a set of Lambda functions using Node.js and the Serverless framework. These functions include a login function that uses OAuth2 to authenticate users and save their refresh tokens to DynamoDB, a logout function that removes tokens from DynamoDB, an email scraper function that retrieves emails from the authenticated user’s mailbox and sends them to Comprehend for classification, and a labeler function that applies labels to the emails in the users' mailboxes based on Comprehend’s classification results.

In this tutorial, we will see how to set up a Serverless framework project, how to use Gmail’s OAuth2 authentication and API to login and access emails, how to set up and access DynamoDB with the Serverless framework and typescript, how to set up AWS Comprehend for classification, and how to run a serverless application offline.

1: Setting up a Serverless framework project

Let's start with setting up a Serverless Framework project. We chose Serverless instead of the more traditional Terraform implementation because it is specifically designed for building serverless applications, which is an ideal architecture for us.

First, we have to install the Serverless framework:

npm install -g serverless

After this, we can create a new directory for our project:

mkdir SEA-serverless-project 
cd SEA-serverless-project

Then, we can initialize our serverless project:

serverless create --template aws-nodejs --path SEA
cd SEA

After this, we can install the serverless-plugin-typescript plugin for typescript development. While this plugin is ideal for offline development since VSCode can hit breakpoints during debugging, we should switch to another TypeScript plugin for our online serverless stage. Serverless-plugin-typescript may generate source files that exceed the AWS Lambda size limitation. For the cloud deployment, we will use the serverless-esbuild plugin.

npm install -D serverless-plugin-typescript typescript

At last, install the serverless-offline plugin, for running the app offline:

npm install serverless-offline --save-dev

After this, all of our plugins should be installed.

Now, let's create a tsconfig.json file in the root of our project:

  "compilerOptions": {
"target": "es2021",
"strict": true,
"preserveConstEnums": true,
"noEmit": false,
"sourceMap": true,
"module": "commonjs",
"moduleResolution": "node",
"esModuleInterop": true,
"skipLibCheck": true,
"forceConsistentCasingInFileNames": true,
"isolatedModules": true,
"outDir": ".build",
"types" : [ "node",]
},
"exclude": ["node_modules", "**/*.test.ts"]

And modify the serverless.yml file in the root of our project to include the serverless-offline plugin and enable TypeScript support:

service: myservice
plugins:
- serverless-plugin-typescript
- serverless-offline
provider:
name: aws
runtime: nodejs12.x
functions:
hello:
handler: handler.hello

Finally, create an src directory for our TypeScript code and write a Serverless function using TypeScript. For example, here’s a simple Lambda function that responds with a JSON payload:

import { APIGatewayProxyHandler } from 'aws-lambda';

export const hello: APIGatewayProxyHandler = async (event, context) => {
return {
statusCode: 200,
body: JSON.stringify({
message: 'Hello TypeScript!'
})
};
};

This function responds to HTTP requests with a JSON payload containing a “message” field. Note how we’re using TypeScript interfaces like APIGatewayProxyHandler to ensure type safety.

We can run our functions by calling

npx sls offline

And following the localhost links in the console.

We can of course at this point call npm-init.

Our serverless project running as intended

2: Setting up a Google Cloud Console project

Next, to implement our login flow, We have to set up a Google Cloud Console project. We will need a Google Cloud Console account for this step. We will create a Google Cloud Console project and enable the Gmail API, create an OAuth2 client ID and secret, and configure the OAuth2 consent screen.

2.1: Create a Google Cloud Console project and enable the Gmail API

First, create a new project in the Google Cloud Console. Once you’ve created the project, enable the Gmail API by navigating to the “APIs & Services” section of the console and clicking on the “Enable APIs and Services” button. Search for “Gmail API” and select it from the search results. Click the “Enable” button to enable the API for your project.

2.2 Create an OAuth2 client ID and secret

Next, we’ll create an OAuth2 client ID and secret for our application. Go to the “Credentials” section of the Google Cloud Console and click on the “Create credentials” button. Select “OAuth client ID” from the dropdown menu.

Select “Web application” as the application type and give it a name. For the “Authorized JavaScript origins” field, enter the URL of your application’s login page. For the “Authorized redirect URIs” field, enter the URL of your application’s redirect page. In this case, we just redirect back to the login page.

Click “Create” and your client ID and secret will be generated.

2.3. Configure the OAuth2 consent screen

Before we can use our OAuth2 client ID and secret, we need to configure the consent screen. Go to the “OAuth consent screen” tab in the Google Cloud Console and fill out the required fields. Make sure to select the “External” user type and add the Gmail API to the “Scopes for Google APIs” section.

3: The Login service

Now, we are ready to create the login/logout service. Create a typescript file called login-service.ts.

First, we’ll define the API endpoints for logging in and logging out:

async handleRequest(event: APIGatewayProxyEvent): Promise<APIGatewayProxyResult> {
if (event.path === "/login") {
if (!event?.queryStringParameters) return this.getRedirect();

const code = event.queryStringParameters["code"];
const error = event.queryStringParameters["error"];

if (error) throw new Error(`Error during login: ${error}`);

if (await this.login(code)) return ok('Credentials saved successfully');
}

if (event.path === "/logout") {
if (!event?.queryStringParameters) return this.getRedirectLogout();

const code = event.queryStringParameters["code"];
const error = event.queryStringParameters["error"];

if (error) throw new Error(`Error during login: ${error}`);

if (await this.logout(code)) return ok('Credentials deleted successfully');
}

return ok('No credentials were modified');
}

As we can see, we’re checking the path of the incoming request to determine whether the user is logging in or logging out. Both times, if they do not have the authentication codes as part of the queryString, we redirect them to the appropriate loginPages generated by oauth. Completing authentication on those pages will redirect the users to a predefined address, which we set up as this endpoint as well. This redirection sets up the codes in the querystring.

If the querystring has the codes, and the user is logging in, we’ll call the login function with the authorization code, which will exchange it for a refresh token and save it to our database. If they’re logging out, we’ll call the logout function with the authorization code, which will retrieve the account ID associated with the access token and delete the refresh token from our database.

Next, let’s define the getRedirect and getRedirectLogout functions, which generate the authorization URLs for Google OAuth2:

getRedirect(): APIGatewayProxyResult {
return redirect(
this.oauth2Client.generateAuthUrl({
access_type: "offline",
scope,
include_granted_scopes: true,
redirect_uri: process.env.REDIRECT_URI,
})
);
}

getRedirectLogout(): APIGatewayProxyResult {
return redirect(
this.oauth2Client.generateAuthUrl({
access_type: "offline",
scope,
include_granted_scopes: true,
redirect_uri: process.env.REDIRECT_URI_LOGOUT,
})
);

These functions use the injected oauth2Client to generate the authorization URLs, which include the redirect URIs that we specified earlier. The scope variable defines the Google API scopes that we want to request access to. In this example, we’re using the https://www.googleapis.com/auth/gmail.modify scope, which grants access to the user’s email account.

Finally, we have the login and logout functions, which exchange the authorization code for an access token, retrieve the account ID associated with the token, and either save or delete the refresh token in our database:

async login(code: string | undefined): Promise<boolean> {
const { tokens, accountId } = await this.getAccountIdAndTokens(code);

if (!tokens.refresh_token || !accountId) return false;

await this.tokenService.saveRefreshToken(tokens.refresh_token, accountId);

return true;
}

async logout(code: string | undefined): Promise<boolean> {
const accountId = await this.getAccountId(code);

if (!accountId) return false;

return await this.tokenService.deleteTokenByAccountId(accountId);
}

These methods first get the account_id (and the refreshToken) of the user associated with the authorization code, using the getAccountId method or the getAccountIdAndTokens methods:

private async getAccountId(code: string | undefined) {
if (!code) throw Error("Authorization code is missing from the url params");

let tokens = (await this.oauth2Client.getToken({
code,
redirect_uri: process.env.REDIRECT_URI_LOGOUT,
})).tokens;
const gmailClient = await this.gmailClientFactory.createGmailClientByAccessToken(tokens);

if (!gmailClient) {
console.log("Gmail client wasn't created, no credentials will be saved");
}

const accountId = (await gmailClient?.users.getProfile({ userId: "me" }))?.data.emailAddress as string;
return accountId;
}

private async getAccountIdAndTokens(code: string | undefined){
if (!code) throw Error("Authorization code is missing from the url params");

let tokens = (await this.oauth2Client.getToken({code, redirect_uri: process.env.REDIRECT_URI})).tokens;
const gmailClient = await this.gmailClientFactory.createGmailClientByCredentials(tokens);

if (!tokens.refresh_token)
{
console.log("Refresh token is missing, no credentials will be saved");
}
if (!gmailClient)
{
console.log("Gmail client wasn't created, no credentials will be saved");
}

const accountId = (await gmailClient?.users.getProfile({ userId: "me" }))?.data.emailAddress as string;
return {tokens, accountId};
}

These methods use the injected oauth2Client to get the refreshtokens, and the injected gmailClientFactory to create a Gmail client, so we can use the Gmail account id.

The GmailClientFactory is a utility class that provides methods for creating instances of the Gmail API client based on different sets of credentials. It relies on the Google OAuth2 library to handle authentication and authorization:

export class GmailClientFactory
{
private oauth2Client : OAuth2Client;

constructor(
oauth2Client : OAuth2Client = new google.auth.OAuth2(
process.env.CLIENT_ID,
process.env.CLIENT_SECRET,
process.env.REDIRECT_URI
)
) {
this.oauth2Client = oauth2Client;
}

async createGmailClientByRefreshToken(refreshToken : string): Promise<gmail_v1.Gmail> {
this.oauth2Client.setCredentials({ refresh_token: refreshToken });
await this.oauth2Client.refreshAccessToken();
return google.gmail({ version: "v1", auth: this.oauth2Client });
}

async createGmailClientByCredentials(credentials : Credentials): Promise<gmail_v1.Gmail | null> {
this.oauth2Client.setCredentials(credentials);
if (!credentials.access_token) throw Error("Access token is missing from credentials");
return credentials.refresh_token ? google.gmail({ version: "v1", auth: this.oauth2Client }) : null;
}

async createGmailClientByAccessToken(credentials : Credentials): Promise<gmail_v1.Gmail | null> {
this.oauth2Client.setCredentials(credentials);
if (!credentials.access_token) throw Error("Access token is missing from credentials");
return google.gmail({ version: "v1", auth: this.oauth2Client });
}
}

The class has three methods:

createGmailClientByRefreshToken: This method creates a Gmail client instance by providing a refresh token, which is used to generate a new access token for the user. The refresh token is saved in the user’s credentials object and can be used to generate a new access token when the current one expires.

createGmailClientByCredentials: This method creates a Gmail client instance using a credentials object that contains an access token and an optional refresh token. If a refresh token is present, the method also saves it in the user’s credentials object.

createGmailClientByAccessToken: This method creates a Gmail client instance using a credentials object that contains only an access token. This method does not save any tokens to the user’s credentials object.

Once we have the account_id (and refreshtoken), we call the deleteTokenByAccountId or saveRefreshToken method of our injected RefreshTokenRepository instance to remove the corresponding Token object from the database.

The RefreshTokenRepository interface is a basic repository that we will later implement to access the dynamoDB database. It looks like this:

export interface RefreshTokenRepository {
deleteTokenByAccountId(accountId: string): Promise<boolean>;
saveRefreshToken(refreshToken: string | null |undefined, accountId: string | null |undefined): Promise<void>;
getRefreshTokens(): Promise<TokenItem[]>;
getRefreshTokenByAccountId(account_id: string): Promise<string>;
}

With this code, we have now implemented a basic authentication system using Google OAuth2. Users can authorize our application to access their Gmail account and we can save their refresh token to our database for later use. We can also allow users to revoke our access to their accounts by deleting their credentials from our database. Finally, let’s create a function handler, in a different TypeScript file, for our login function, that will create a login service instance and can be called as a serverless function. Notice, that we use env variables to store our client id and secret:

const oauth2Client = new google.auth.OAuth2(
process.env.CLIENT_ID,
process.env.CLIENT_SECRET
);
const DynamoDbClient = createDynamoDBClient();
const TokenService: RefreshTokenRepository = new DynamoDbTokenRepository(DynamoDbClient);
const gmailClientFactory : GmailClientFactory = new GmailClientFactory(oauth2Client);
const loginService : LoginService = new LoginService(gmailClientFactory, TokenService, oauth2Client);

export const handler = async (
event: APIGatewayEvent,
context: Context
): Promise<APIGatewayProxyResult> => {
return await loginService.handleRequest(event);
};

And add this function to the serverless.yml file:

functions:
login:
handler: src/functions/login/index.handler
timeout: 360
events:
- http:
method: GET
path: /login
- http:
method: GET
path: /logout
Our OAuth2 sign in form

4: Setting up DynamoDB

From TypeScript, we can access the dynamoDB database with the DynamoDBClient class. To use it, we will first need to install the aws-sdk package via npm. Once installed, we can import the DynamoDBClient class from the aws-sdk/client-dynamodb module in our TypeScript file.

Let's create an implementation of the RefreshTokenRepository interface that calls dynamoDB.

Create a new class called DynamoDbTokenRepository. Let's start with the constructor:

constructor(dynamoDB: DynamoDBClient) {
this.dynamoDB = dynamoDB;
}

The constructor initializes the DynamoDBClient instance used to interact with the DynamoDB table.

private async getAllTokens(): Promise<Array<{accountId: string, token:string}>>{
const command: ScanCommand = new ScanCommand({ TableName: process.env.STAGE + "-tokens" });
const result = await this.dynamoDB.send(command);

if(!result?.Count || result.Count === 0) return [];
if(!result?.Items) return [];

return result.Items?.map(x => ({accountId: x["account_id"].S ?? "", token: x["refresh_token"].S ?? ""}))
}

The getAllTokens method returns all the refresh tokens saved in the table. The method uses the ScanCommand class to perform a full scan of the table and returns an array of objects that contain both the accountId and token values. This approach works well enough with a small number of refresh tokens, but with bigger data sets, a more sophisticated method might be advised. Note, that the table name we are using is generated from an env variable, so we can support multiple deployed Serverless stages.

async getRefreshTokenByAccountId(account_id: string): Promise<string> {
const result = await this.getAllTokens();

const refreshToken = result.find((item) => item.accountId === account_id);

return refreshToken?.token ?? "";
}

The getRefreshTokenByAccountId method calls the getAllTokens method to retrieve all the refresh tokens currently in the table. The method then searches for the refresh token associated with the provided account ID and returns it, or an empty string if it is not found.

async getRefreshTokens(): Promise<TokenItem[]> {

const result = await this.getAllTokens()
return result.map((row) => {
return {
refresh_token: row.token,
account_id: row.accountId
};
});
}

The getRefreshTokens method calls the getAllTokens method to retrieve all the refresh tokens currently in the table. The method then returns an array of TokenItem objects containing the refresh token and account ID.

async saveRefreshToken(
refreshToken: string,
accountId: string
): Promise<void> {
const records = await this.getAllTokens();
if(records.some(x => x.accountId === accountId)) {
console.warn(`The user with id ${accountId.substring(0,5)}**** already registered.`)
return;
}

const command: PutItemCommand = new PutItemCommand({
TableName: process.env.STAGE + "-tokens",
Item: {
refresh_token: { S: refreshToken },
account_id: { S: accountId }
}
});
await this.dynamoDB.send(command);
}

The saveRefreshToken method first calls the getAllTokens method to retrieve all the refresh tokens currently in the table. If the account ID is already associated with a refresh token, the method returns without saving the new token. Otherwise, a PutItemCommand is created to save the new refresh token and account ID to the DynamoDB table.

async deleteTokenByAccountId(accountId: string): Promise<boolean> {

const token = await this.getRefreshTokenByAccountId(accountId);

if (!token || token === '')
return false;

const command: DeleteItemCommand = new DeleteItemCommand({
TableName: process.env.STAGE + "-tokens",
Key:{
refresh_token: {S: token}
}
});

await this.dynamoDB.send(command);

return true;
}

The deleteTokenByAccountId method first calls the getRefreshTokenByAccountId method to retrieve the refresh token associated with the account ID. If a token is found, a DeleteItemCommand is created to delete the token from the DynamoDB table.

With this class, we can access a dynamoDB table named after one of our serverless stages, followed by “-tokens”. If we create a table like this in the AWS console, we could run the application, and access the data there, however, serverless can create dynamoDB resources for us.

In the serverless.yml file, we can add the following lines:

resources:
Resources:
UsersTable:
Type: AWS::DynamoDB::Table
Properties:
TableName: ${self:provider.stage}-tokens
AttributeDefinitions:
- AttributeName: refresh_token
AttributeType: S
KeySchema:
- AttributeName: refresh_token
KeyType: HASH
ProvisionedThroughput:
ReadCapacityUnits: 1
WriteCapacityUnits: 1

When we use the ‘sls deploy’ command to deploy to AWS or have the ‘serverless-dynamodb-local’ plugin installed and run ‘sls offline’ to start the application offline, the table with the specified name will be created either in the cloud or in our local DynamoDB emulator, respectively

5: Collecting and parsing messages using the Gmail API.

Now, that we can log in users, we can start collecting their messages for analysis. We need the plaintext content of their messages from the last 2 hours (as we will run our scraper every 2 hours). First, create a service called EmailCollectorService.

export class EmailCollectorService {
private parser: EmailParser;
private labelConfiguration: string[];
private emailBlackList: string[];
private userId = "me";

constructor(
parser: EmailParser,
labelConfiguration: string[],
emailBlackList: string[],
userId: string = "me"
) {
this.parser = parser;
this.labelConfiguration = labelConfiguration;
this.emailBlackList = emailBlackList;
this.userId = userId;
}

This is the constructor for the EmailCollectorService class. It takes in four parameters: an EmailParser instance, an array of label configurations and an array of email blacklists which are used to specify which emails we want to scrape, and an optional user ID (which defaults to “me” if not provided).

   private createLabelPattern(): string {
return this.labelConfiguration.reduce((accumulator: string, currentValue: string) => accumulator + "-label:" + currentValue.toString().toUpperCase() + " ", "");
}

private createBlacklistPattern(): string {
return `-from:(${(this.emailBlackList.reduce((acc, v, i) =>
`${acc}${v.toString().toUpperCase()}${this.emailBlackList.length - 1 === i ? '' : ' OR '}`, ""))})`;
}

These two private methods, createLabelPattern and createBlacklistPattern, generate the label and blacklist patterns that will be used to filter the Gmail API query.

public async listMessages(
gmailClient: gmail_v1.Gmail
): Promise<ParsedMessage[]> {
const gmailApiQuery = `-in:chat -category:social -category:promotions -category:forums -category:reservations -category:purchases newer_than:2h ${this.createLabelPattern()} ${this.createBlacklistPattern()}`
console.log(gmailApiQuery)
return await firstValueFrom(
from(
gmailClient.users.messages.list({
userId: this.userId,
q: gmailApiQuery
})
).pipe(
switchMap((listResponse) =>
from(listResponse.data.messages || [])
),
filter((message) => !!message?.id),
mergeMap((message) =>
from(
gmailClient!.users.messages.get({
userId: this.userId,
id: message.id!
})
)
),
map((messageResponse) => {
console.log(messageResponse);
return {
id: messageResponse.data.id ?? "",
message: this.parser.parseMessage(messageResponse.data)
};
}),
filter((message) => !!message?.id && !!message?.message),
toArray()
)
);
}

This method, listMessages, is responsible for fetching and parsing Gmail messages based on the provided label configurations and email blacklists. It returns a Promise that resolves to an array of ParsedMessage objects. The function takes a Gmail client object as an argument. This object is an instance of the Gmail class provided by the Google API client library for Node.js, and it is used to communicate with the Gmail API.

The function then builds a Gmail API query string using several methods: createLabelPattern, createBlacklistPattern, and some hard-coded values. The query string is constructed using the Gmail search syntax, which allows us to specify a variety of search criteria. The search criteria used in this function are:

-in:chat: exclude chat messages from the search results

-category:xxx: exclude messages automatically tagged by Gmail as promotions, social networks, etc

newer_than:2h: include only messages that are newer than 2 hours old

A label pattern based on the labelConfiguration array, to exclude emails already labeled by our solution

A blacklist pattern based on the emailBlackList array, to exclude a user-defined list of email addresses.

Once the query string is constructed, the function uses the Gmail API client to make a request to the Gmail API. Specifically, it calls the users.messages.list method with the following parameters:

userId: The ID of the Gmail user whose messages should be listed. This is set to the userId property of the EmailCollectorService class, which defaults to “me” (the authenticated user).

q: The Gmail search query string constructed earlier.

The response of users.messages.list contains an array of messages that match the query parameters, along with some metadata like the total number of messages that match the query, a token for retrieving the next page of messages, etc. The actual message content is not included in the response, only its ID. This means to get the content of the messages, we have to call users.messages.get for every id we just received.

The response of users.messages.get contains the full content of a single message, including its headers, body, attachments, etc. Here’s an example of what a users.messages.get response might look like:

{
"id": "17a9cb876862b3e3",
"threadId": "17a9cb876862b3e3",
"labelIds": [
"SENT",
"INBOX",
"IMPORTANT"
],
"snippet": "Hello, this is a test email.",
"payload": {
"partId": "",
"mimeType": "text/plain",
"filename": "",
"body": {
"size": 10,
"data": "SGVsbG8sIHRoaXMgaXMgYSB0ZXN0IGVtYWlsLg=="
}

}
}

Note, that the actual content of the message is base64 encoded.

In the listMessages function, we of course use the EmailParser class:

export class EmailParser {
public parseMessage(message: gmail_v1.Schema$Message): string {
const parts = this.getMessageParts(message.payload);
const plainText = this.createPlainText(parts);
return this.removeReplies(plainText);
}

private getMessageParts(messagePart: gmail_v1.Schema$MessagePart | null | undefined): EmailPart[] {
if (messagePart?.mimeType?.toLowerCase().startsWith("text")) {
return messagePart.body?.data
? [{
html : messagePart.mimeType?.toLowerCase().endsWith("html"),
data : messagePart.body.data,
}]
: [];
}
if (messagePart?.mimeType?.toLowerCase().startsWith("multipart/alternative")) {
let ret: EmailPart[] = [];
messagePart.parts?.filter(part => part.mimeType?.toLowerCase() == "text/html").forEach(part => {
ret = ret.concat(this.getMessageParts(part));
});
return ret;
}
if (messagePart?.mimeType?.toLowerCase().startsWith("multipart")) {
let ret: EmailPart[] = [];
messagePart.parts?.forEach(part => {
ret = ret.concat(this.getMessageParts(part));
});
return ret;
}
return [];
}


private createPlainText(messageParts: EmailPart[]): string {
return messageParts
.map(part => {
const decoded = Buffer.from(part.data, "base64").toString()
return part.html ? convert(decoded, { wordwrap: false }) : decoded;
})
.reduce(((accumulator, currentValue) => accumulator + EOL + currentValue), "")
.substring(EOL.length)
}


private removeReplies(plainText: string): string {
let lines = this.normalizeLineEndings(plainText).split("\n").filter((line) => !line.startsWith(">"));
const firstIndex = lines
.map((line) => line.toLowerCase())
.findIndex((line) =>
line == "-- " ||
line == "--" ||
line.startsWith("-----original message-----") ||
line.startsWith("________________________________") ||
(line.startsWith("on ") && line.endsWith("wrote:\n")) ||
line.startsWith("from: ") ||
line.startsWith("sent from my iphone") ||
line.startsWith("sent from my blackberry")
);
if (firstIndex > 0) lines = lines.slice(0, firstIndex);
return lines.reduce(((accumulator, currentValue) => accumulator + EOL + currentValue), "").substring(EOL.length);
}


private normalizeLineEndings = (str: string) => str.replace(/\r\n/g, "\n");
}

The EmailParser class is responsible for parsing and extracting relevant information from an email message. The parseMessage function takes a gmail_v1.Schema$Message object as input, and returns the plain text of the email message with any replies removed.

The getMessageParts() method of the EmailParser class is the main part of the email parsing logic. It takes in a messagePart parameter, which is an object representing a single part of an email message (such as the body, attachments, etc.). The function then checks the MIME type of the part to determine how to handle it.

If the MIME type starts with “text”, the function assumes that the part contains either plain text or HTML, and creates an EmailPart object representing that part.

If the MIME type starts with “multipart/alternative”, the function assumes that the part contains multiple versions of the same content in different formats (e.g. plain text and HTML), and recursively calls getMessageParts() on each of the sub-parts until it finds the desired format.

If the MIME type starts with “multipart”, the function assumes that the part contains multiple sub-parts (e.g. email attachments), and recursively calls getMessageParts() on each of the sub-parts.

If the MIME type is anything else (e.g. image/jpeg), the function ignores the part and moves on to the next one.

This recursive approach is necessary because emails can have complex structures with nested multipart parts, and each part can have its own set of sub-parts. By recursively calling getMessageParts() on each sub-part, the function can parse even the most complex email structures and extract the desired plain text and HTML content.

The createPlainText function takes an array of EmailPart objects and returns the plain text of the email message. It first decodes the base64-encoded data of each part using Buffer.from(part.data, “base64”).toString(). If the part is HTML, it converts the decoded text to plain text using the html-to-text packages convert function. It then concatenates the plain text of all parts and removes any extra line breaks.

The removeReplies function takes the plain text of the email message as input and removes any reply text. It first splits the plaintext into lines and removes any lines that are after the start of a reply. We couldn’t find an elegant way to check if a line is a reply, but this list of strings should cover most of the use cases. Finally, the method concatenates the remaining lines into a single string and removes any extra line breaks.

6: Setting up and calling comprehend

AWS Comprehend is a natural language processing service offered by Amazon Web Services that allows us to extract insights from text using machine learning. One of its features is text classification, which can automatically classify text into user-defined categories, such as positive or negative, based on a given training data set. We will use it to decide if an answer to a sales inquiry is positive or negative.

To set up AWS Comprehend for text classification, follow these steps:

Collect and prepare your training data

This should consist of a set of text samples, each of which has been manually labeled as positive or negative. In our case, we used a CSV file with text samples from older answers to our sales team and their corresponding labels.

Some training data from our csv file

Create an AWS Comprehend custom classifier

To do this, go to the AWS Comprehend console, click on “Custom classification,” and follow the prompts to create a new classifier. You’ll need to specify the input data format (in our case, CSV), the name of the classifier, and the number of categories you want to classify the text into (2).

Upload your training data to the classifier

Once you’ve created the classifier, you can upload your training data using the console. The classifier will use this data to train its machine-learning model.

Test the classifier

Once the training is complete, you can test the classifier by providing it with some text and seeing what category it assigns to it.

When we first tested our classifier, we did so manually by providing it with some text samples and seeing what categories it assigned to them. We found that we needed to work a bit on our training data set to ensure that it gave us accurate results. Specifically, we had to make sure that our data set was balanced and representative of the texts we wanted to classify. We also had to remove some outliers.

Our model seems to be working good

Set up the code that will call Comprehend

Calling this model from code is done through the Comprehend class, installed with aws-sdk package, which can be instantiated by a simple constructor call.

Create a class called EmailAnalyzerService:

export class EmailAnalyzerService {
private comprehend: Comprehend;
private options: ComprehendJobOptions;


constructor(comprehend: Comprehend, options: ComprehendJobOptions) {
this.comprehend = comprehend;
this.options = options;
}


async startComprehendJob(accountId: string) {
const OutputS3Uri = `s3://${this.options.s3BucketUri}/data/${accountId}`;
const InputS3Uri = `s3://${this.options.s3BucketUri}/${getS3EmailsFileName(accountId)}`;


await this.comprehend!.startDocumentClassificationJob(
{
DocumentClassifierArn: this.options.classifier,
OutputDataConfig: { S3Uri: OutputS3Uri },
InputDataConfig: {
InputFormat: "ONE_DOC_PER_LINE",
S3Uri: InputS3Uri
},
DataAccessRoleArn: this.options.dataAccessRoleArn
});
}
}

The startComprehendJob method starts a new Comprehend job to classify a list of emails associated with a specific account stored in an S3 bucket. It does this by calling the startDocumentClassificationJob method of the Comprehend service. The method takes a configuration object that specifies the location of the input and output data in an S3 bucket, the format of the input data, the ARN of the document classifier, and the ARN of the role that Comprehend uses to access the data.

The OutputS3Uri variable specifies the location in the S3 bucket where the output data will be stored. The InputS3Uri variable specifies the location of the input data in the S3 bucket. The DataAccessRoleArn property specifies the ARN of the role that Comprehend uses to access the data.

After we scrape the emails with the EmailCollectorService, we can put the resulting plaintext into S3 buckets, and call the emailAnalyzerService on those buckets.

Note, that we use AWS Comprehend’s asynchronous mode, which means we just start a classification job and do not receive any response right away. To get that response we trigger the last part of our application when Comprehend puts its output into the output S3 bucket.

The process to scrape the mails, and call comprehend on them looks something like this:

export class EmailAnalyzerComprehendService
{
private emailCollectorService : EmailCollectorService;
private refreshTokenRepository : RefreshTokenRepository;
private gmailClientFactory : GmailClientFactory
private sentimentAnalysisFileStorage: SentimentAnalysisStorageInterface;
private emailRepository: EmailRepository;
private emailAnalyzerService : EmailAnalyzerService;
private startComprehendJob : boolean;


constructor(
emailCollectorService : EmailCollectorService,
refreshTokenRepository : RefreshTokenRepository,
gmailClientFactory : GmailClientFactory,
sentimentAnalysisFileStorage: SentimentAnalysisStorageInterface,
emailRepository: EmailRepository,
emailAnalyzerService : EmailAnalyzerService,
startComprehendJob : boolean
) {
this.emailCollectorService = emailCollectorService;
this.refreshTokenRepository = refreshTokenRepository;
this.gmailClientFactory = gmailClientFactory;
this.sentimentAnalysisFileStorage = sentimentAnalysisFileStorage;
this.emailRepository = emailRepository;
this.emailAnalyzerService = emailAnalyzerService;
this.startComprehendJob = startComprehendJob;
}


async startComprehend() : Promise<void>
{
const tokensItem = await this.refreshTokenRepository.getRefreshTokens();

for (let item of tokensItem) {
const gmailClient = await this.gmailClientFactory.createGmailClientByRefreshToken(item.refresh_token);

// Get Emails & Clean their message body
const rawEmails = await this.emailCollectorService.listMessages(gmailClient);
console.log("Raw Emails");
console.log(rawEmails);
if(!rawEmails || rawEmails.length === 0) {
continue;
}

const cleanedEmails = this.emailCollectorService.cleanRawEmails(rawEmails);
console.log("Cleaned Emails");
console.log(cleanedEmails);

const accountId = item.account_id;
// We have to store emails in S3 for comprehend input
await this.sentimentAnalysisFileStorage.storeInput(accountId, cleanedEmails);

// Start Comprehend job
await this.emailAnalyzerService.startComprehendJob(item.account_id);
console.log("Comprehend job started");
}
}

Note, that we save the actual emails into a repository, so when we process the comprehend output, we can use that table to find the corresponding emails with the Gmail API.

7: Labeling the emails according to the output of Comprehend

Create a class called EmailLabelerService:

export class EmailLabelerService {
private gmailClient: gmail_v1.Gmail;

constructor(gmailClient: Gmail) {
this.gmailClient = gmailClient;
}

public async setupLabels(labelConfiguration: string[], userId: string = "me"): Promise<EmailLabel[]> {
// Get the list of existing labels for the given user
const result = await this.gmailClient.users.labels.list({ userId: userId });

// Create any labels that do not already exist
for (const value of labelConfiguration) {
if (result!.data!.labels!.some((label) => label?.name! === value)) {
} else {
await this.gmailClient!.users.labels.create({
userId: userId,
requestBody: {
labelListVisibility: "labelShow",
messageListVisibility: "show",
name: value
}
});
}
}

// Retrieve the label IDs and names for the configured labels
const labelListResponse = await this.gmailClient!.users.labels.list({ userId: userId });
const labelInfo = labelListResponse.data.labels;

return labelConfiguration.map((label: string) => {
const temp = labelInfo!.find((value1: Schema$Label) => value1.name === label);

return {
id: temp!.id as string,
name: label
};
});
}

public async labelEmail(emailLabelData: EmailLabelData): Promise<void> {
// Add the specified labels to the given email message
await this.gmailClient!.users.messages.modify({
userId: emailLabelData.userId,
id: emailLabelData.emailId,
requestBody: {
addLabelIds: emailLabelData.labelIdList
}
});
}

public async labelEmailList(emailLabelDataList: EmailLabelData[]): Promise<void> {
// Apply the specified labels to a list of email messages
for (const emailLabelData of emailLabelDataList) {
await this.labelEmail(emailLabelData);
}
}
}

It provides methods to interact with the Gmail API to create labels and apply them to email messages.

The setupLabels method takes an array of label names and creates them if they don’t already exist for the given user. It returns an array of EmailLabel objects, which contain the label ID and name for each configured label.

The labelEmail method takes an EmailLabelData object, which includes the email message ID, user ID, and an array of label IDs to apply to the message. It modifies the given email message to apply the specified labels.

The labelEmailList method takes an array of EmailLabelData objects and applies the specified labels to each email message in the array.

And last, create a class called EmailAnalyserLabelerService:

export class EmailAnalyserLabelerService {
private EmailLabelService: EmailLabelerService;
private SentimentAnalysisFileStorage: SentimentAnalysisStorageInterface;
private EmailRepository: EmailRepository;
private SentimentAnalysisResultParser: SentimentAnalysisResultParser;


constructor(
EmailLabelService: EmailLabelerService,
SentimentAnalysisFileStorage: SentimentAnalysisStorageInterface,
EmailRepository: EmailRepository,
SentimentAnalysisResultParser: SentimentAnalysisResultParser
) {
this.EmailLabelService = EmailLabelService;
this.SentimentAnalysisFileStorage = SentimentAnalysisFileStorage;
this.EmailRepository = EmailRepository;
this.SentimentAnalysisResultParser = SentimentAnalysisResultParser;
}


public async labelEmails(
accountId: string,
analysisOutputFilePath:string
) {
// Set up Sentiment analysis storage
const labelConfiguration = ["NEGATIVE", "POSITIVE"];
// Set Up Email Labels
const createdLabels = await this.EmailLabelService.setupLabels(labelConfiguration, "me");
// Get Sentiment analysis INPUT from S3
const analysisInputLines = await this.SentimentAnalysisFileStorage.getInputLines(accountId);
// Get Sentiment analysis RESULT from S3
const analysisResult = await this.SentimentAnalysisFileStorage.getResult(analysisOutputFilePath);
// Corresponding Emails from Gmail
const processableEmailList = await this.EmailRepository.getEmailsByAccountId(accountId);


// Process analysis result to email labels
const emailLabelDataList = this.SentimentAnalysisResultParser.parseResult(
accountId,
analysisResult,
analysisInputLines,
processableEmailList,
createdLabels
);


// Label emails
await this.EmailLabelService.labelEmailList(emailLabelDataList);


// Delete processed emails from dynamo
await this.EmailRepository.deleteEmailsByAccountId(processableEmailList);
}
}

It is called when Comprehend finished a classification job. It sets up our custom labels in the user's Gmail account, gets the classification results from S3, pairs each line of the result with a specific email, and applies the corresponding label. At the end of the method, we delete the saved emails from our system.

With this ready, we only need to create two more lambda functions (one for email scraping, set to run in every 2 hours, and one for labeling the emails, triggered by the comprehend output bucket), and we have a working Sales Email Analyzer.

8: A brief note on offline debugging

Developing serverless applications can be complicated if we cannot debug our solution locally. Fortunately, Serverless has a wide array of plugins making our life easier. We already talked about the serverless-plugin-typescript, which makes it possible for our code to stop on breakpoints. The serverless-dynamodb-local plugin is an easy-to-use method to create a local dynamoDB emulator, and the serverless-s3-local plugin can do the same with a local S3 emulator. We can have an alternative serverless.yml file (called serverless-local.yml for example) which can be passed to sls calls as the -c parameter. If we, in addition, create our factory functions that instantiate the S3 and dynamoDB clients in a way, that they will point to these offline resources instead of the cloud, we can use the serverless-offline plugin to run our whole application (sans Comprehend) locally, making testing and debugging way easier.

About the author

Richard Rutai is a Software Engineer at SnapSoft Ltd. He is working mostly as a backend developer, increasingly more with AWS. After hours, he is making silly little video games. Connect with Richard on LinkedIn.

--

--