Text Summarization [Part 2 — Abstractive]

In this article, I will show you how you can create an abstractive text summarizer using deep learning.

Lidor ES
9 min readMar 29, 2023
Google Image Search — Abstractive Text Summarization

This is part 2, in the first part I showed how to create an extractive text summarization, you can find it here.

Table of Contents

  1. Introduction
  2. Text Summarization Approaches
  3. Goal
  4. Data
  5. Data Preprocessing
  6. Tokenizing
  7. Glove File for Word Embeddings
  8. Encoder
  9. Decoder
  10. Attention mechanism with coverage vector
  11. Data Generator Function
  12. Loss Functions
  13. Model Training
  14. Model Evaluation

Full code can be found in my GitHub Repo, here.

Introduction

We often have long textual data as in articles/books, and more.
We can’t always read everything and we do not always have someone to summarize it for us, for this, we have text summarization ways, in this article I will show you one way to create an abstractive approach to text summarization, if you are looking for an extractive way or a predefined abstractive way you can refer to the Part 1 article.

Text Summarization Approaches

If you read Part 1, you can skip to the Goal step, because it is the same as it was.

In text summarization, there are mainly two approaches: the Extractive approach and the Abstractive approach:

  • Extractive text summarization involves identifying the salient information by selecting essential sentences or phrases from the original text to create a concise summary. This approach preserves the wording and structure of the original text, resulting in a summary that accurately represents the content. However, it may also include irrelevant or redundant information, as the summary is created solely from extracted portions of the original text.
  • Abstractive text summarization involves generating a summary by building an internal semantic representation of the original text and using natural language processing to rewrite it in new words.
    This approach is more complex and requires machine learning models or deep learning (such as neural networks) to produce a summary that captures the essence of the original text. While abstractive summarization can produce more concise and readable summaries that capture the meaning and nuance of the original text, it may only sometimes be accurate or faithful to the original content. Additionally, the abstractive approach is more challenging than the extractive approach, as it involves creating new phrases and terms to summarize the content.

Goal

My goal here is to create a text summarization model using deep learning in the abstractive approach, which can be deployed later.

Data

The data I will be using here will be the CNN dataset and a predefined glove vector for the word embeddings of 6B parameters (you can use the 30, 42, etc as well).

Data Preprocessing

Note: not all code for this section is here, for full code please refer to my GitHub repo whithin this 2 files here and here, you can also find the text data analysis there (TDA)

For that there are a few functions needed, let’s start with the function that reads an article from the dataset and returns the actual text of it based on its utf-8 encoding:

def load_article(file_name):
file = open(file_name, encoding = 'utf-8')
text = file.read()
file.close()
return text

Now let’s create a function that will split an article text into a summary and a highlight:

def split_article_story_highlight(article):
index = article.find('@highlight')
story, highlights = article[: index], article[index :].split('@highlight')
highlights = [h.strip() for h in highlights if len(h) > 0]
return story, highlights

Now, sometimes in articles, we have contraction words such as won't (will not), shan’t (shall not), I’ve (I have), They’re (They are), Didn’t (Did not), and more. to fix it let’s create the following function (I’ve used a few, feel free to add to this function as much as you’d like or don’t):

def decontracted(raw_phrase):
phrase = re.sub(r"won't", "will not", raw_phrase)
phrase = re.sub(r"can\'t", "can not", phrase)
phrase = re.sub(r"shan\'t", "shall not", phrase)
phrase = re.sub(r"n\'t", " not", phrase)
phrase = re.sub(r"\'re", " are", phrase)
phrase = re.sub(r"\'s", " is", phrase)
phrase = re.sub(r"\'d", " would", phrase)
phrase = re.sub(r"\'ll", " will", phrase)
phrase = re.sub(r"\'t", " not", phrase)
phrase = re.sub(r"\'ve", " have", phrase)
phrase = re.sub(r"\'m", " am", phrase)
return phrase

Now, after checking the results I noticed that some texts still have symbols that are unrelated or unnecessary numbers, and more. to handle it I have used a for loop for that:

for i in CNN.article.values:
new_text = re.sub(r'\n', ' ', i)
new_text = re.sub(r'(CNN)', ' ', new_text)
new_text = re.sub(r'LRB', ' ', new_text)
new_text = re.sub(r'RRB', ' ', new_text)
new_text = re.sub(r'<', ' ', new_text)
new_text = re.sub(r'>', ' ', new_text)
new_text = re.sub(r'[" "]+', " ", new_text)
new_text = re.sub(r'-- ', ' ', new_text)
new_text = re.sub(r"([?!¿])", r" \1 ", new_text)
new_text = re.sub(r'-', ' ', new_text)
new_text = re.sub(r'\s+', ' ', new_text)
new_text = re.sub('[^A-Za-z0-9.,]+', ' ', new_text)
new_text = decontracted(new_text)
new_text = new_text.replace('/', ' ')
new_text = new_text.lower()
article_text.append(new_text)

The for loop exactly will be applied to the summary section too CNN.summary.values.

Part of the POS tagging is as follows (for the full, please refer to my GitHub repo):

data_cleaned['Summary'] = data_cleaned['Summary'].apply(lambda x : '_START_ '+ x + ' _END_')
max_text_len = 330
max_summary_len = 40
# Select only the summaries and article text between max length that above
cleaned_text = np.array(data_cleaned['Article'])
cleaned_summary = np.array(data_cleaned['Summary'])
short_text = []
short_summary = []
for i in range(len(cleaned_text)):
if(len(cleaned_summary[i].split()) <= max_summary_len and len(cleaned_text[i].split()) <= max_text_len):
doc1 = nlp(cleaned_text[i])
doc2 = nlp(cleaned_summary[i])
st = (" ".join([t.text if not t.ent_type_ else t.ent_type_ for t in doc1]))
ss = (" ".join([t.text if not t.ent_type_ else t.ent_type_ for t in doc2]))
st = st.lower()
ss = ss.lower()
short_text.append(st)
short_summary.append(ss)
post_pre = pd.DataFrame({'text': short_text, 'summary': short_summary})
post_pre = post_pre.to_csv(clean_csv_ent_path)

I choose _START_ and _END_ as start and end sentence tokens, but you can choose what you’d like, just change the rest accordingly.
Read more about Part of Speech tagging here.

Tokenizing

After splitting the data into the train set and test set, let’s consider rare words as an ‘ukn’ token:

x_tokenizer = Tokenizer() 
x_tokenizer.fit_on_texts(list(x_train))
rare_word = []
for key, value in x_tokenizer.word_counts.items():
if value < 2:
rare_word.append(key)
tokenrare = []
for i in range(len(rare_word)):
tokenrare.append('ukn')
dictionary_1 = dict(zip(rare_word, tokenrare))
x_trunk = []
for i in x_train:
for word in i.split():
if word.lower() in dictionary_1:
i = i.replace(word, dictionary_1[word.lower()])
x_trunk.append(i)
x_tokenizer = Tokenizer(oov_token = 'ukn')
x_tokenizer.fit_on_texts(list(x_trunk))

And now let’s convert the text sequences into integer sequences (one-hot), padding it, getting its size

# 1) Convert
x_tr_seq = x_tokenizer.texts_to_sequences(x_trunk)
x_val_seq = x_tokenizer.texts_to_sequences(x_validation)
# 2) Padding
x_tr = pad_sequences(x_tr_seq, maxlen = max_text_len, padding = 'post')
x_val = pad_sequences(x_val_seq, maxlen = max_text_len, padding = 'post')
# 3) Size
x_voc = len(x_tokenizer.word_index) + 1

The same goes for the y set.

Glove File for Word Embeddings

Now let’s use the glove file for the word embeddings (I’ve used the 300 dimension glove file):

embeddings_dictionary = dict()
glove_file = open(glov_file, encoding = "utf-8")
for line in glove_file:
records = line.split()
word = records[0]
vector_dimensions = np.asarray(records[1:], dtype = 'float32')
embeddings_dictionary [word] = vector_dimensions
glove_file.close()

Now let’s update the dictionary for the x set with the pre-trained glove embeddings values (same goes for the y set):

embedding_matrix_x = np.zeros((x_voc + 1, 300))
for word, index in x_tokenizer.word_index.items():
embedding_vector = embeddings_dictionary.get(word)
if embedding_vector is not None:
embedding_matrix_x[index] = embedding_vector
embedding_matrix_x.shape

Encoder

Now let’s define a simple encoder for the model:

class Encoder(tf.keras.Model):
def __init__(self, vocab_size, embedding_dim, embedding_matrix_x, hidden_units):
super().__init__()
self.hidden_units = hidden_units
self.embedding = tf.keras.layers.Embedding(vocab_size, embedding_dim, weights = [embedding_matrix_x])
self.bi_gru = tf.keras.layers.Bidirectional(tf.keras.layers.GRU(hidden_units, return_sequences = True, return_state = True, recurrent_initializer = 'glorot_uniform', dropout = 0.08, recurrent_dropout = 0.05))

def call(self, encoder_input, encoder_states):
encoder_emb = self.embedding(encoder_input)
encoder_output, state_fwd, state_back = self.bi_gru(encoder_emb, initial_state = encoder_states)
encoder_states = [state_fwd, state_back]
return encoder_output, encoder_states

Decoder

And also there is a decoder for the model:

class Decoder(tf.keras.Model):
def __init__(self, vocab_size, embedding_dim, embedding_matrix_y, hidden_units):
super().__init__()
self.hidden_units = hidden_units
self.embedding = tf.keras.layers.Embedding(vocab_size, embedding_dim, weights = [embedding_matrix_y])
self.gru = tf.keras.layers.GRU(hidden_units, return_sequences = True, return_state = True, recurrent_initializer = 'glorot_uniform')
self.W1 = tf.keras.layers.Dense(hidden_units)
self.W2 = tf.keras.layers.Dense(vocab_size)

def call(self, decoder_input, decoder_state, encoder_output, context_vector):
decoder_emb = self.embedding(decoder_input)
decoder_output , decoder_state = self.gru(decoder_emb, initial_state = decoder_state)
concat_vector = tf.concat([context_vector, decoder_state], axis =- 1)
concat_vector = tf.reshape(concat_vector, (-1, concat_vector.shape[1]))
p_vocab = tf.nn.log_softmax(self.W2(self.W1(concat_vector)))
return p_vocab, decoder_state

Attention mechanism with coverage vector

First of all, a brief explanation about the reason for the attention layer with coverage; The main problem with the seq-to-seq attention mechanism is the repetition of words to overcome this we use a coverage vector in the attention mechanism.

To read more about it, refer here.

class additiveAttention(tf.keras.layers.AdditiveAttention):
def __init__(self, hidden_units, is_coverage = False):
super().__init__()
self.Wh = tf.keras.layers.Dense(hidden_units)
self.Ws = tf.keras.layers.Dense(hidden_units)
self.wc = tf.keras.layers.Dense(1)
self.V = tf.keras.layers.Dense(1)
self.coverage = is_coverage
if self.coverage is False:
self.wc.trainable = False

def call(self,keys):
value = keys[0]
query = keys[1]
ct = keys[2]
value = tf.expand_dims(value, 1)
ct = tf.expand_dims(ct, 1)
score = self.V(tf.nn.tanh(self.Wh(query) + self.Ws(value) + self.wc(ct)))
attention_weights = tf.nn.softmax(score, axis = 1)
ct = tf.squeeze(ct, 1)
if self.coverage is True:
ct += tf.squeeze(attention_weights)
context_vector = attention_weights * query
context_vector = tf.reduce_sum(context_vector, axis = 1)
return context_vector, attention_weights, ct

Data Generator Function

The below function is used to generate new tokenized data sets:

def data_generator(X, y, BATCH_SIZE, BUFFER_ZISE, shuffle = True):
dataset = tf.data.Dataset.from_tensor_slices((X, y))
if shuffle:
dataset = dataset.cache().shuffle(len(X) * BUFFER_ZISE).batch(BATCH_SIZE, drop_remainder = True)
else:
dataset = dataset.cache().batch(BATCH_SIZE, drop_remainder = True)
dataset = dataset.prefetch(buffer_size = tf.data.experimental.AUTOTUNE)
return dataset

For the train data I used shuffle and for the test data I didnt.

Loss Functions

Here I’ve used 2 loss functions together to calculate the actual loss, below are the functions:

def nll_loss(p_vocab, target):
mask = tf.math.logical_not(tf.math.equal(target, 0))
loss = -p_vocab
mask = tf.cast(mask, dtype = loss.dtype)
loss *= mask
return loss

def attention_coverage_loss(attention_weights, coverage_vector, target):
mask = tf.math.logical_not(tf.math.equal(target, 0))
coverage_vector = tf.expand_dims(coverage_vector, axis = 2)
ct_min = tf.reduce_min(tf.concat([attention_weights, coverage_vector], axis = 2), axis = 2)
cov_loss = tf.reduce_sum(ct_min, axis = 1)
mask = tf.cast(mask, dtype = cov_loss.dtype)
cov_loss *= mask
return cov_loss

Model Training

For the model training, I created a train function and a validation function, you can see the training function (function got self-documentation):

@tf.function
def train_step(encoder_input, decoder_target):
"""
Function which performs one training step-batch at a time
"""
# Init base values
loss_value = tf.zeros(BATCH_SIZE)
lambda_cov = 1
with tf.GradientTape() as gradient_tape:
# Run input through encoder
encoder_init_states = [tf.zeros((BATCH_SIZE, encoder.hidden_units)) for i in range(2)]
encoder_output, encoder_states = encoder(encoder_input, encoder_init_states)
# Initialize decoder with encoder forward state
decoder_state = encoder_states[0]
coverage_vector = tf.zeros((64, encoder_input.shape[1]))
# Loop over each word
for dt in range(decoder_target.shape[1]-1):
# Run decoder input through decoder and generate vocabulary distribution
decoder_input_vals = decoder_target[:, dt]
decoder_target_vals = decoder_target[:, dt + 1]
# Get attention scores
context_vector, attention_weights, coverage_vector = attention([decoder_state, encoder_output, coverage_vector])
# Get vocabulary distribution for each batch at time dt
p_vocabulary, decoder_state = decoder(tf.expand_dims(decoder_input_vals, 1), decoder_state, encoder_output, context_vector)
# For each step-batch get the probability of the target word at time dt + 1
p_vocabulary_list = []
for i in range(len(decoder_target_vals)):
p_vocabulary_list.append(p_vocabulary[i, decoder_target_vals[i]])
p_vocabulary_target = tf.stack(p_vocabulary_list)
# Calculate the loss at each time step-batch dt and add to current loss (both loss functions)
loss_value += nll_loss(p_vocabulary_target, decoder_target_vals) + lambda_cov * attention_coverage_loss(attention_weights, coverage_vector, decoder_target_vals)
# Get the non-padded length of each sequence in the step-batch
sequence_len_mask = tf.cast(tf.math.logical_not(tf.math.equal(decoder_target, 0)), tf.float32)
batch_sequence_len = tf.reduce_sum(sequence_len_mask, axis = 1)
# Get step-batch loss by dividing the loss of each step-batch by the target sequence length and mean
batch_loss = tf.reduce_mean(loss_value / batch_sequence_len)
# Update global trainable variables
variables = encoder.trainable_variables + decoder.trainable_variables
gradients = gradient_tape.gradient(batch_loss, variables)
optimizer.apply_gradients(zip(gradients, variables))
return batch_loss

For the validation function, it’s almost the same with some minor tweaks (watch in GitHub because I don't want this article to be too long).

Model Evaluation

To test the model I created a few functions decode_sequence, sequence2text, sequence2summary, sequence_search, which are self-explanatory by their names.

The evaluation metric I used to test the model is the Rouge metric, I’ve used the ROUGE-1, ROUGE-2, and ROUGE-L, you can read about it more here (HuggingFace), here (TowardsDataScience), and here (wiki).

Basically what the rouge metric state is that the closer the result is to 1 the better and the closer the result is to 0 the worse.

The results I got from this metric are:

  • ROUGE-1: 0.892
  • ROUGE-2: 0.727
  • ROUGE-L: 0.856

Keep in mind, I used a limited memory and ram machine, if you want to get better results (the results I got are pretty good) there are a few things to do for example you can use a stronger machine, bigger dataset, better-processed dataset, use a bigger glove file (I used the 50 and 100 dimensioned vectors) and more.

--

--

Lidor ES

Data Scientist & Engineer and Software Engineering student