Building a data engineering project. Part 2 — delivering base functionality

Kirill Zaitsev
9 min readNov 30, 2021

--

Image credit

We want to develop a tool that watches for specific words from what people tweet in real time. We have made the first step by defining our goal and creating a blueprint for the design of the system, and it is time to implement the system. For a quick reference, here is an outline of the series:

We finished part 1 of the project with the following UML diagram in mind:

UML diagram of the application

This time I invite you to discuss an implementation of the above diagram. Here is our plan for this part of the series:

  1. Tweets variety and how we deal with it
  2. Parallelized indexing.
  3. Somebody said "tokenization"?
  4. Serving the app in a client-server way
  5. Summary and takeaways

Let's move on to the first step.

Tweets variety and how we deal with it

As a quick recap, text tokenization converts words in a given language to integers, with the possibility to do this backward.

We are working with tweets written in English. When faced with a tweet, we need to know to which tokens each of its words maps. We can create such a map ourselves by parsing many text files or searching for a prepared vocabulary. The requirements for it are the following:

  1. it should be comprehensive
  2. reflect modern language (Shakespeare won't' do because we will have unusually many unknown words)
  3. easy to parse

After some time searching, the vocabulary accompanying the IMDb reviews dataset is a good match.

Parallelized indexing

Converting streaming tweets to vectors of tokens that can be fetched on-demand is the System's core. In other words, the indexer converts a set of words to a group of integers accessible down the pipeline.

Requirements for such an indexer:

  • has a static, globally accessible vocab
  • scales to a lot of words (a real-time stream of tweets)
  • responsible only for obtaining input tweets and updating the inverted index
  • implements a mechanism to talk to the tokenization service

To achieve this, we incorporate in the code two essential concepts: parallel processing and low-level network communication.

Each Indexer instance is a temporary object created to process a small set of tweets. We send tweets asynchronously to the tokenization server. Results are used to atomically update the inverted index map shared between Indexers.

public class Indexer extends Thread{
private final ConcurrentHashMap<Integer, ArrayList<String>> map;
private final ArrayList<FileItem> files2Id;
private Socket tokenizerSocket;
private DataOutputStream tokenizerOut;
private DataInputStream tokenizerIn;
private final Logger logger = Logger.getLogger(Indexer.class.getName());
private final Vocab vocab = new Vocab();

Indexer(ConcurrentHashMap<Integer, ArrayList<String>> wordToDoc, IndexerTask task) throws IOException {
this.map = wordToDoc;
this.files2Id = task.getFiles2Id();
}

public void initSocket() throws IOException {
tokenizerSocket = new Socket("localhost", 11030);
}

public void initDataStreams() throws IOException {
tokenizerOut = new DataOutputStream(tokenizerSocket.getOutputStream());
tokenizerIn = new DataInputStream(tokenizerSocket.getInputStream());
}

@Override
public void run() {
try {
initSocket();
initDataStreams();
} catch (IOException e) {
e.printStackTrace();
return;
}
this.files2Id.stream()
.map((fi) ->
CompletableFuture.supplyAsync(() -> this.processFileItem(fi))
.exceptionally(this::getFaultyFileItem))
.forEach(CompletableFuture::join);
this.logger.info("Processed " + this.files2Id.size() + " tweets");
}

protected Boolean getFaultyFileItem(Throwable throwable) {
this.logger.warning("Something failed when processing file\n" + throwable);
return true;
}

private boolean processFileItem(FileItem fi) {
String fileName = fi.name;

String fileContents;
try {
fileContents = new String(Files.readAllBytes(Paths.get(fileName)), StandardCharsets.UTF_8);
tokenizerOut.writeUTF(fileContents);
} catch (IOException e) {
e.printStackTrace();
return false;
}
int[] tokenizedText;
try {
int numTokens = tokenizerIn.readUnsignedShort();
if (numTokens == 0){
logger.warning("Failed to tokenize text. Encoding's incompatible");
return true;
}
tokenizedText = new int[numTokens];
for (int i = 0; i < numTokens; ++i) {
tokenizedText[i] = tokenizerIn.readInt();
}
} catch (IOException e) {
e.printStackTrace();
return false;
}

updateIndex(tokenizedText, fileName);
synchronized(System.out) {
System.out.println("Working with document " + fileName);
int[] tokTextDebug = tokenizedText;
int tokTextLen = tokenizedText.length;

for(int i = 0; i < tokTextLen; ++i) {
int t = tokTextDebug[i];
System.out.print((String)this.vocab.getIdToWordMap().get(t) + " ");
}

System.out.println(fi);
return true;
}
}

private void updateIndex(int[] tokenizedText, String fileName) {
Arrays.stream(tokenizedText).parallel().forEach(wordId ->
map.computeIfAbsent(wordId, k -> new ArrayList<>()).add(fileName));
}

What's this code block starting with synchronized(System.out)? When dealing with a program that has parallel code in code, debugging things may be non-trivial. In our case, multiple instances of the Indexer class run simultaneously and tell which is doing what we want to print to stdout. One of the ways to do it is by using a synchronized block. When a program gets to the line synchronized(System.out), the System.out is locked from others until this particular Indexer finishes printing its results. Hence, we have no races on who prints first, leading to wrecked logs.

But the Indexer doesn't convert words to tokens. Instead, its job is to update the inverted index with a piece of text this Indexer instance was given from the IndexerController.

Wondering who does this conversion? Our next step is to look at another service and a critical system component — a tokenization server.

Somebody said "tokenization"?

Python is a simple yet powerful language, rich for language processing libraries such as NLTK, SpaCy, etc. This Python server will "do one thing and do it well" — convert tweets to their vectorized representation. However, to serve many requests that we expect from Indexers, we need to incorporate parallelism in our approach. For this purpose, I will use asynchronous Python programming as a best practice for client-server applications:

import asyncio
import logging

from normalizer import Normalizer
from tokenizer import Tokenizer

TOKENIZER_HOST = 'localhost'
TOKENIZER_PORT = 11030
STOP_PHRASE
= '_quit_'
tokenizer = Tokenizer()
normalizer = Normalizer()

LOGGER = logging.getLogger(__name__)


def convert_request_to_response(request: str) -> str:
response = normalizer.normalize(request)
print(f"normalized {request}")
response = tokenizer.tokenize(response)
print(f"tokenized {response}")
response = tokenizer.convert_tokens_to_ids(response)
print(f"converted to ids: {response}")
return response


def send_response(writer, response: str) -> None:
writer.write(len(response).to_bytes(2, byteorder='big'))
LOGGER.info(f'Send from tokenizer: {response}')
response = [x.to_bytes(4, byteorder='big') for x in response]
for x in response:
writer.write(x)


async def handle_client(reader, writer):
request = None
print("handling client..")
while request != STOP_PHRASE:
length_of_message = int.from_bytes(await reader.read(2), byteorder='big')
print(f'Received {length_of_message} bytes')
LOGGER.info(f'Received {length_of_message} bytes')
request = (await reader.read(length_of_message))
print("read all bytes")

try:
request = request.decode('utf-8')
except UnicodeDecodeError:
LOGGER.info("Emojis in tweets are not yet supported")
writer.write(len([]).to_bytes(2, byteorder='big'))
continue

response = convert_request_to_response(request)
send_response(writer, response)
await writer.drain()
writer.close()


async def run_server():
server = await asyncio.start_server(handle_client, TOKENIZER_HOST, TOKENIZER_PORT)
async with server:
await server.serve_forever()


if __name__ == "__main__":
print('running')
logging.basicConfig(level=logging.INFO)
asyncio.run(run_server(), debug=True)

A typical text tokenization pipeline consists of three steps:

cleaning -> extracting roots of words -> converting word to integer (token)

Text normalization, which consists of the former two, is represented by a Normalizer class, code you can find accompanying this article repository. Then we convert a word to the token and send it back to a client.

Note the communication "protocol" between the client and the server. First, we receive the length of the message and, knowing how many bytes it will take, wait for precisely this amount. This way of communicating is a typical pattern in low-level data transmission. Note that it is language-agnostic as well.

Let's get back to our Indexer that waits for tokenization.

int[] tokenizedText;
try {
int numTokens = tokenizerIn.readUnsignedShort();
if (numTokens == 0){
logger.warning("Failed to tokenize text. Encoding's incompatible");
return true;
}
tokenizedText = new int[numTokens];
for (int i = 0; i < numTokens; ++i) {
tokenizedText[i] = tokenizerIn.readInt();
}
} catch (IOException e) {
e.printStackTrace();
return false;
}

The same communication protocol that we used to send our request we reuse to receive a response:

  1. Length of data in bytes.
  2. The data itself.

We are good to go with processing the results,

updateIndex(tokenizedText, fileName);

which means to (atomically) update the inverted index:

private void updateIndex(int[] tokenizedText, String fileName) {
Arrays.stream(tokenizedText).parallel().forEach(wordId ->
map.computeIfAbsent(wordId, k -> new ArrayList<>()).add(fileName));
}

Recall that the map attribute has the following definition:

private final ConcurrentHashMap<Integer, ArrayList<String>> map;

I refer you to this excellent article for details on lock-free Java programming, specifically when working with a hash table data structure.

Serving the app in a client-server way

First of all, I assume you are familiar with client-server system design. I encourage you to read this article for a quick recap.

In our case, we have a client who wants to search for some words in tweets, and a server with access to an inverted index can easily find matching tweets in the database.

Main.java

public class Main {

public static final int PORT = 3333;
public static LinkedList<SearchServer> serverList = new LinkedList<>();
public static Semaphore numAvailableConnections = new Semaphore(2);

public static void main(String[] args) throws IOException {
ConcurrentHashMap<Integer, ArrayList<String>> wordToDoc = new ConcurrentHashMap<>();

IndexingController controller = new IndexingController(wordToDoc);
controller.start();

try (ServerSocket server = new ServerSocket(PORT)) {
System.out.println("Server Started");
while (true) {
Main.numAvailableConnections.acquire();
Socket socket = server.accept();
try {
SearchServer searchServer = new SearchServer(socket, wordToDoc);
serverList.add(searchServer);
searchServer.start();
} finally {
System.out.println("Available connection slots: " +
Main.numAvailableConnections.availablePermits());
}
}
} catch (InterruptedException e) {
e.printStackTrace();
}

}
}

First, we initialize our most critical service which processes tweets from a streaming data source — IndexingController. Then, only one instance of this class needs to run simultaneously to implement it with a vanilla Threads API.

SearchServer is a class for handling actual client requests. We are limiting the number of such servers with a semaphore. This blocking mechanism stops the execution if the number of connections exceeds a limit and zero clients have finished their sessions.

Client.java

public class Client {

private Socket socket;
BufferedReader in;

public Client(String addr, int port) {
try {
socket = new Socket(addr, port);
in = new BufferedReader(new InputStreamReader(socket.getInputStream()));
} catch (IOException e) {
System.err.println("Socket failed");
}
}

public void send() throws IOException {
DataOutputStream dOut = new DataOutputStream(socket.getOutputStream());
String word;
Scanner sc = new Scanner(System.in);
while ((word = sc.nextLine()) != null) {
try {
dOut.writeUTF(word);
DataInputStream dIn = new DataInputStream(socket.getInputStream());
String docs = dIn.readUTF();
System.out.println(docs);
} catch (IOException e) {
e.printStackTrace();
Client.this.downService();
}
}
}

private void downService() {
try {
if (!socket.isClosed()) {
socket.close();
}
} catch (IOException ignored) {}
}

}

It looks familiar, doesn't it? The same communication pattern we have seen in Indexer class implementation. Only this time, we are communicating with a SearchServer that opened a dedicated socket. And, have you noticed a Scanner class instance? It helps us to process the input from clients as we are typing our requests to the application via a command line.

Results

We are interested in collecting the entire context of what people say when they mention, for example, the words "war" or “President” in their tweets. Let's try it out!

Following the setup guide in the repository, we launch a Kafka cluster, the primary server, and the client. From the client's console, type: “war”. The output that follows could be similar to the following: [./data/TM/tweet_12.txt]

Go to the data/TM directory and find the file called tweet_12.txt which contains the target word along with the entire tweet. Trying out other words such as “Ocean” and “President”, we may obtain:
Ocean
[]
President
[./data/frances reed/tweet_35.txt, ./data/Susan 🌊/tweet_37.txt, ./data/TruthSeeker/tweet_48.txt]

observing that people currently talk more about politics. After some time let’s test out the word ''war'' again:
war
[./data/TM/tweet_12.txt, ./data/Most wanted👌👊💥💯/tweet_51.txt, ./data/messihiluf93/tweet_53.txt, ./data/Paul

Eb/tweet_54.txt, ./data/Blaine Col/tweet_58.txt]

The first tweet we got at the beginning of our test is still there (./data/TM/tweet_12.txt) while new tweets mentioning the word “war” were added to the list.

Summary and takeaways

It appears that we may trace who used this word in their tweets. We are watching all of this in real time, using a couple of excellent practices along the way. And several takeaways from this short journey that I can name right away are:

  • Our System is a set of microservices, each busy with their specific task and interacting over the network.
  • Client-server communication that processes requests asynchronously for several clients at a time.
  • From the real-world data, our app is separated by an ingestion bus.
  • Parallel programming in Java using atomics is a clean and reliable way to use the full power of your computing device.
  • Python for natural language processing. This language has an enormous ecosystem for NLP, much richer and simpler to use than, for example, Java analogs.

And much more. Does it sound like a great data engineering experience? I would say it is one and hope that you agree!

What requires our additional attention is a Kafka cluster that acts as a buffer between the real world and our application. And this is coming. In the next part, we will examine this System's build & automation process and create a CI/CD pipeline. Containers, anyone? A must-have for a microservice system. Stay tuned!

For more details, please refer to the complete code.

Update: see the next part where we containerize our application.

--

--