Text-to-speech Data Collection

Natnael Melese
7 min readOct 8, 2022

--

Data Engineering: text-to-speech data collection with Kafka, Airflow, and Spark

Produce a tool that can be deployed to process posting and receiving text and audio files from and into a data lake, apply transformation in a distributed manner, and load it into a warehouse in a suitable format to train a speech-to-text model.

Overview and Objective

The process of acquiring and measuring audio data is known as voice or speech data collection. Virtual assistants and other AI systems have become more user-friendly thanks to the utilization of this data, which may be gleaned from various sources and is used by machines to comprehend and process audio input.

AI, which recognizes speech, is a cognitive service that attempts to imitate human action. Speech recognition technology gives machines a similar ability to how humans can hear speech, retain what is said, and respond appropriately.

For crucial use cases like transcription and voice analysis, speech recognition uses AI to translate spoken language into text.

There are many text corpora for Amharic and Swahili languages, build a data engineering pipeline that allows recording millions of Amharic and Swahili speakers reading digital texts in-app and on web platforms.

This platform will act as a data source for clients’ speech-to-text ML engines by providing accurate audios of a text corpus in a suitable format for training a speech-to text-model.

Technologies and tools used to build the data capture pipeline

Kafka

  • Kafka is a new kind of data infrastructure Instead of treating data like
    things, Kafka treats data like events, It does not just store these events
    as long as necessary it also handles stream processes dynamically
    which means it can scale to handle any number of applications.
  • As the number of audio recorders (the producers) grew or shrinks as the case may be, Kafka will dynamically meet up with the demand.
  • During the consumption of a topic, you can also configure a group with
    multiple consumers. Each of the consumers in a specific group will
    access messages from a particular subset of partitions within the topics
    they subscribe to. This will assure that every message is delivered to
    one consumer in the group, and all of the messages that carry the same
    key make it to the same consumer.

Principal Benefits of Kafka

  • Scalability: Kafka is a distributed system, which is able to be scaled
    quickly and easily without incurring any downtime. Apache Kafka is able to handle many terabytes of data without incurring much at all in the way of overhead.
  • Durability: Kafka persists the messages on the disks, which provides
    intra-cluster replication. This makes for a highly durable messaging
    system.
  • Reliability: Kafka replicates data and is able to support multiple
    subscribers. Additionally, it automatically balances consumers in the
    event of failure. That means that it’s more reliable than similar
    messaging services available.
  • High Performance: Kafka delivers high throughput for both publishings
    and subscribing, utilizing disk structures that are capable of offering
    constant levels of performance, even when dealing with many
    terabytes of stored messages.

Airflow

  • Airflow is one of the platforms that is used for workflow management in
    the data engineering pipeline. It can be used as a data orchestration
    tool to programmatically author, schedule, and monitor workflows.
  • It streamlines increasingly complex enterprise procedures and is based
    on the idea of “Configuration as a code.”
  • Directed acyclic graphs (DAG) are used by Airflow to control workflow
    orchestration. It enables data engineers to put together and control
    workflows made up of many data sources.

Principal Benefits of Airflow

  • Task Dependency management: It is extremely good at
    managing different sorts of dependencies, be it task completion,
    dag runs status, file, or partition presence through the specific
    sensor. Airflow also handles task dependency concepts such as
    branching.
  • DAGs: are a way to set up workflows, they can set up a sequence
    of operations that can be individually retried on failure and
    restarted where the operation failed. Dags provides a nice
    abstraction to a series of operations.

Spark

  • Provides a unified environment that accepts data from many different
    forms and allows all the tools to work together on the same data,
    passing a data set from one step to the next.
  • It provides this service in a single or clustered environment. We will be
    using spark to perform data transformations on the text corpus.

Principal Benefits of Spark

  • Spark can be deployed in a variety of ways, and provides native
    bindings for Java, Scala, Python, and R programming
    languages, and supports SQL, streaming data, machine learning,
    and graph processing. Apache Spark is an essential tool for a
    large-scale data pipeline project such as ours, it allows us to
    allow us to perform processing tasks on a large dataset.

Exploratory Data Analysis and Insights

About the data

There are a number of large text corpora we will use, but for the purpose of testing the backend development, we used the recently released Amharic news text classification dataset with a baseline performance dataset

The data has the following columns.

  • headline
  • category
  • date
  • views
  • article
  • link
df.category.unique()array(['ስፖርት', 'መዝናኛ', 'ሀገር አቀፍ ዜና', 'ቢዝነስ', 'ዓለም አቀፍ ዜና', 'ፖለቲካ', nan],
dtype=object)

Word Length

df['word_len'] = df['article'].str.split().str.len()
df.head()
Word length
df.info()
df.info()
df.word_len.mean()
248.9586853912942

Implementations of the Project

  • We will divide our topics among our data categories.
topic_names = {
"ሀገር አቀፍ ዜና": "g2-national_news",
"መዝናኛ": "g2-entertainment",
"ቢዝነስ": "g2-business",
"ዓለም አቀፍ ዜና": "g2-international_news",
"ፖለቲካ":"g2-politics",
"ስፖርት":"g2-sport"
}
  • We will have an API endpoint that is connected to a producer that takes
    the audio recording and passes it to the Kafka cluster.
#api.pyproducer = KafkaProducer(bootstrap_servers=['b-   1.batch6w7.6qsgnf.c19.kafka.us-east-1.amazonaws.com:9092','b-  2.batch6w7.6qsgnf.c19.kafka.us-east-1.amazonaws.com:9092'],
client_id='g2-text-producer',value_serializer=lambda x: dumps(x).encode('utf-8'))
headline = request.form.get("headline")
article = request.form.get("article")
json_id = request.form.get("json_id")
audio_file = request.files["file"]
if audio_file.filename != '':
filename = secure_filename(audio_file.filename)
file_path = app.config["UPLOAD_FOLDER"] + filename + f"{datetime.now()}"
audio_file.save(file_path)
print(file_path)
producer.send("g2-audio-output", {"headline":headline,"article":article,"json_id":json_id,"file_path":file_path}).get(timeout=30)

return jsonify({"status": "success","file_path":file_path})

# Record.jsx
async function handleAudioUpload(file) {
// console.log(file);
let formData = new FormData();
formData.append('file', file)
formData.append('json_id',454)
formData.append('headline', text['headline'])
formData.append('article', text['article'])
try{
setIsLoading(true)
let res = await axios.post(`${BASE_URL}/get_audio`, formData);
console.log(res)
let data = res.data;
if(data!==undefined){
if(data.status === "success"){
alert("Recording submitted")
}else{
alert(data.message)
}
}
else{
alert("Something went wrong")
}
}catch(e){
console.log(e)
alert(e.message)
}finally{
setIsLoading(false)
}
}
<Recorder
record={true}
title={"New recording"}
audioURL={audioDetails.url}
showUIAudio
handleAudioStop={(data) => handleAudioStop(data)}
handleOnChange={(value) => handleOnChange(value, "firstname")}
handleAudioUpload={(data) => handleAudioUpload(data)}
handleReset={handleRest}
mimeTypeToUseWhenRecording={`audio/webm`}
/>
  • We will have an API endpoint that is connected to a Kafka consumer
    who is subscribed to the data stream coming from our Amharic news
    data.
#api.py@app.route('/return_processed_audio', methods=['POST'])
def return_processed_audio():
try:
if request.method == "POST":
conn = sqlite3.connect('processed_audio.db')
json_id = request.get_json()["json_id"]
cursor = conn.execute("SELECT json_id, headline, article, file_path from Audio)
for row in cursor:
json_id = row[0]
headline = row[1]
article = row[2]
file_path = row[3]
audio_file = open("{}".format(file_path), "rb").read()
return jsonify({"status": "success","json_id":json_id,"headline":headline,"article":article,"audio_file":audio_file})
else:
return{
"status": "error",
"message": f"{request.method} is not allowed"
}
except Exception as e:
return{
"status": "error",
"message": str(e)
}
  • We will use random partitions to balance data across our partitions
    when writing topics.
  • We will configure your producer to wait for acknowledgments; to make
    sure the message has actually made it to the partition on the broker.
  • We will tune our consumer socket buffers for high bandwidth ingestion.
  • Will use pyspark to ensure there aren’t any blank audio files being sent
    to the cluster.
  • We will have a consumer who will be connected to our S3 bucket which
    is subscribed to the audio data stream coming from our Kafka cluster.
  • We will have a producer connected to our S3 bucket that sends a
    datastream of Amharic news data to its subscribers.

Lessons learned

  • Creating and setting up kafka cluster.
  • How to use dags to Orchestrate storage of events collected to a database.
  • How to create an S3 bucket data lake.
  • Media recording API for both vanilla js and ReactJS implementations.
  • How to transform and load data to and from the data lake.

Future plans

  • Implementing more effective models to preprocess the uploaded audio.
  • Providing a way for users to download data(audio + text corpus).
  • Improving the data’s structure so that it may be immediately applied to an ML model.
  • Completing the streaming pipeline.

References

  1. https://dev.to/florimondmanca/breaking-news-everything-is-an-event-streams-kafka-and-you-2n9j
  2. https://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html
  3. https://towardsdatascience.com/why-apache-airflow-is-a-great-choice-for-managing-data-pipelines-48effcce3e41
  4. https://github.com/confluentinc/kafka-rest
  5. https://www.confluent.io/blog/event-sourcing-using-apache-kafka/
  6. https://www.kai-waehner.de/blog/2020/08/07/apache-kafka-handling-large-messages-and-files-for-image-video-audio-processing/
  7. https://docs.databricks.com/getting-started/try-databricks.html
  8. https://www.w3schools.com/js/js_whereto.asp

--

--

Natnael Melese

A Computer Programmer and an aspiring Machine Learning Engineer. My interests include programming, mathematics, AI research, and theoretical computer science.