NLP 實戰教學-BERT 情緒分析(下)

HsiuChun
35 min readOct 7, 2023

--

接續上一節,這一篇進入到模型的部分~我們在上節講完資料的包法後,這節要來建模型啦!

接著進入到模型

  • 基於 pretrained model 的模型架構

雖然 BERT 已經很貼心的提供各個模型 API 給我們,但在這邊會教大家怎麼在既有的 BERT 模型下去添加客製化的線性分類層~

首先,仿照 BertForSequenceClassification 的寫法,創建一個繼承 BertPretrainedModel 的 class。

在 __init__ 這邊,可以看到有一個 config 變數,它是一個很重要的參數,用來抓取你想要使用的 pretrained Model 的參數,args 是作者自行添加,為了訓練調參數方便自己定義的。在 __init__ 裡面,我們需定義所有會用到的層,。並記得賦值到加上 self. 的變數,才可以在接下來做使用。

往下看到 forward 函式,這裡會定義你的模型是怎麼接收資料並傳遞的,對於 Bert 分類模型,首先會需要將資料傳入 bert 模型,並將其輸出傳入線性分類層 bert 需要什麼樣的輸入,上節有提到一點點,若想了解各個參數在幹嘛,可以去查看相關說明文件

from transformers.models.bert.modeling_bert import BertPreTrainedModel, BertModel
import torch.nn as nn

# BERT Model
class BertClassifier(BertPreTrainedModel):
def __init__(self, config, args):
super(BertClassifier, self).__init__(config)
self.bert = BertModel(config)
self.num_labels = args["num_class"]
self.dropout = nn.Dropout(args["dropout"])
self.classifier = nn.Linear(config.hidden_size, self.num_labels)
self.init_weights()

# forward function, data in model will do this
def forward(self, input_ids=None, attention_mask=None, token_type_ids=None, position_ids=None,
head_mask=None, inputs_embeds=None, labels=None, output_attentions=None,
output_hidden_states=None, return_dict=None):

return_dict = return_dict if return_dict is not None else self.config.use_return_dict

# bert output
outputs = self.bert(
input_ids,
attention_mask=attention_mask,
token_type_ids=token_type_ids,
position_ids=position_ids,
head_mask=head_mask,
inputs_embeds=inputs_embeds,
output_attentions=output_attentions,
output_hidden_states=output_hidden_states,
return_dict=return_dict
)

# get its [CLS] logits
pooled_output = outputs[1]
pooled_output = self.dropout(pooled_output) # add dropout
logits = self.classifier(pooled_output) # add linear classifier

return logits

有時候,會覺得直接將 bert 輸出接分類層會太簡單,這時我們可以為其多加一些層數。一般來說,一層 NN 由 Linear、Dropout及 activation 組成,在 tensorflow 中通常會寫作 Dense 層。 若有兩層以上(含)的 Linear 層,就需要添加 activation 層,才不會白做工(具體原因可回憶一下梯度更新的計算方式,跟 Backpropagation 有關);只有一層的話可加也可不加(看自己,出來的數值還是會有差)。

import torch.nn as nn
import copy

# define different activation function
def get_activation(activation):
if activation == 'Prelu':
return nn.PReLU()
elif activation == 'relu':
return nn.ReLU()
elif activation == 'sigmoid':
return nn.Sigmoid()
elif activation == 'gelu':
return nn.GELU()
elif activation == 'LeakyReLU':
return nn.LeakyReLU()
else:
return nn.Tanh()
# Dense Layer
# It is composed of linear, dropout, and activation layers.
class Dense(nn.Module):
def __init__(self, input_dim, output_dim, dropout_rate, activation='tanh'):
super(Dense, self).__init__()
self.hidden_layer = nn.Linear(input_dim, output_dim)
self.dropout = nn.Dropout(dropout_rate)
self.activation = get_activation(activation) # default tanh
nn.init.xavier_uniform_(self.hidden_layer.weight) # you also can change the initialize method
def forward(self, inputs):
logits = self.hidden_layer(inputs)
logits = self.dropout(logits)
logits = self.activation(logits)
return logits
# multi-layers
def _get_clones(module, N):
return nn.ModuleList([copy.deepcopy(module) for i in range(N)])
# Hidden Layers
# It means there are many dense layers with the same dimension
class HiddenLayers(nn.Module):
def __init__(self, dense_layer, num_layers):
super(HiddenLayers, self).__init__()
self.hidden_layers = _get_clones(dense_layer, num_layers)
def forward(self, output):
for layer in self.hidden_layers:
output = layer(output)
return output

假設我們在原有的線性分類層外,在中間多添加一層 NN ,將原始 BERT 輸出的 D = 768 降低至 D = 384,壓縮資訊後再傳入線性分類層。

from transformers.models.bert.modeling_bert import BertPreTrainedModel, BertModel
import torch.nn as nn

# BERT Model
class BertClassifier(BertPreTrainedModel):
def __init__(self, config, args):
super(BertClassifier, self).__init__(config)
self.bert = BertModel(config)
self.num_labels = args["num_class"]
self.dense = Dense(config.hidden_size, args["hidden_dim"], args["dropout"], args["activation"])
self.classifier = Dense(args["hidden_dim"], self.num_labels, args["dropout"], args["activation"])
self.init_weights()
# forward function, data in the model will do this
def forward(self, input_ids=None, attention_mask=None, token_type_ids=None, position_ids=None,
head_mask=None, inputs_embeds=None, labels=None, output_attentions=None,
output_hidden_states=None, return_dict=None):
return_dict = return_dict if return_dict is not None else self.config.use_return_dict
# bert output
outputs = self.bert(
input_ids,
attention_mask=attention_mask,
token_type_ids=token_type_ids,
position_ids=position_ids,
head_mask=head_mask,
inputs_embeds=inputs_embeds,
output_attentions=output_attentions,
output_hidden_states=output_hidden_states,
return_dict=return_dict
)
'''
outputs.keys() -> odict_keys(['last_hidden_state', 'pooler_output'])
outs.last_hidden_state.shape -> torch.Size([batch_size, 512, 768])
outs.pooler_output.shape -> torch.Size([batch_size, 768])
'''
# get its [CLS] logits
pooled_output = outputs[1] # (batch_size, 768)
# add dense layer
pooled_output = self.dense(pooled_output) # (batch_size, 384)
# add linear classifier
logits = self.classifier(pooled_output) # (batch_size, 2)
return logits
  • 訓練與評估-模型輸入與輸出

那假設我們現在丟 500 筆資料進模型,我們可以想像一下它的維度是怎麼改變的~在做資料集時,我們將文本最大長度設為 512,在 Dataset 階段,資料維度為 (500, 4)以訓練集來說,batch_size 設為 16,那 DataLoader 會將資料以 (32, batch_size, 4)的方式呈現,這裡的 32 指的是迭代次數,也就是總共有 32 批資料,其每批資料維度為 (batch_size, 4),4 為在 Dataset 中所定義好回傳的四個值。

但,由於資料往往不會剛好可以被我們設定的 batch_size 整除,若我們未更改 drop_last 參數 表示是否要捨棄無法被整除的餘數資料 ,那最後一批次的資料維度將為 (4, 4)

取單筆資料來看,可以發現有 4 個元素 ,前三者為 (batch_size, 512) -> input_ids, attention_masks, token_type_ids,最後一個為 (batch_size) -> label

圖1.輸入資料維度示意圖

輸入的資料維度知道後,我們就得來看輸出長怎樣了!若是純粹將資料丟進 BERT 模型,會得到一個看起來頗複雜的輸出 XD,根據官方文件,我們發現應包含 6 個回傳值,分別為 last_hidden_state、pooler_output、hidden_states、attentions、cross_attentions、past_key_values ,但後四者若沒有預先設定就不會回傳任何東西 其值為None

圖2.BERT 輸出示意圖

下述兩個變數為我們主要會使用的,還記得上面有提到原始 BERT 的輸出,維度為 768 嗎?就是從這裡知道的!根據官方文件的解說,我們選擇 pooler_output 用來做分類任務~

  • last_hidden_state (torch.FloatTensor of shape (batch_size, sequence_length, hidden_size)) — Sequence of hidden-states at the output of the last layer of the model.
  • pooler_output (torch.FloatTensor of shape (batch_size, hidden_size)) — Last layer hidden-state of the first token of the sequence (classification token) after further processing through the layers used for the auxiliary pretraining task. E.g. for BERT-family of models, this returns the classification token after processing through a linear layer and a tanh activation function. The linear layer weights are trained from the next sentence prediction (classification) objective during pretraining.

當然你也可以 Follow Transformer 的架構,採用最後一層的結果,但需注意 last_hidden_state 的維度為(batch_size, 512, 768),這時候往往會使用 512 字中的第一個字或最後一個字代表該句話,但大多會採用第一個字(根據 BERT 定義 [CLS] 含有整句輸入的 context);最後一個字為原始 Transformer 的定義。

  • 訓練與評估-計算指標

通常分類任務在計算 loss 時會去看各類別的預測機率與實際標籤的差異,但有注意到我們目前並沒有在模型內加入任何將輸出結果轉為機率的程式,也還沒有算 loss,所以下一步就是講解這一 Part 了! 當然你也可以跟官方一樣這樣寫,本篇的教學是寫在模型外面,感覺比較方便跟著訓練隨時替換?

分類模型主要會採用 softmax,也就是 CrossEntropyLoss 去計算,在這邊需特別提醒,根據 PyTorch 中 CrossEntropyLoss 的定義,它不用特意先把模型輸出做 softmax 轉成機率,只需要將原本輸出的 logits 和 labels 一起丟進去它就會幫你算了。

而除了 loss 外,我們最關心的當然是模型的 performance 囉~這邊教學放的會是使用 sklearn 實作,根據 sklearn.metrics 中我們欲使用的評估指標,可以知道它需要使用的輸入為兩個 list,且需為 label,因此我們在這邊幫 logits 做一個 argmax,已取得其預測標籤 一般而言,直接用 logits 做 argmax 的結果與 softmax 後的 argmax 結果會一致,所以可以省略 softmax 步驟

from sklearn.metrics import accuracy_score, precision_score, f1_score, recall_score# get predict result

def get_pred(logits):
y_pred = torch.argmax(logits, dim = 1)
return y_pred

# calculate confusion metrics
def cal_metrics(pred, ans, method):
'''
Parameter
---------
pred: [list], predict class
ans: [list], true class
method: 'micro', 'weighted', 'macro'. # 如果有多分類的話計算上會有差別
---------
'''
if pred.get_device() != 'cpu':
pred = pred.detach().cpu().numpy()
if ans.get_device() != 'cpu':
ans = ans.detach().cpu().numpy()
# 將 zero_division 設為 0,表示當所有預測皆錯誤時,將結果視為 0
rec = recall_score(pred, ans, average=method, zero_division=0)
f1 = f1_score(pred, ans, average=method, zero_division=0)
prec = precision_score(pred, ans, average=method, zero_division=0)
acc = accuracy_score(pred, ans)
return acc, f1, rec, prec
  • 訓練與評估-超參數

一般來說,會建議把所有可調控的超參數寫成一個 dict() 比較方便調用與更改 寫成py檔會更方便!!只要改一個檔案就好,還可以把每次訓練結果的參數一起保存。

from datetime import datetime
parameters = {
"num_class": 2,
"time": str(datetime.now()).replace(" ", "_"), # I like to annotate when I trained
"seed": 1111,
# Hyperparameters
"model_name": 'BERT', # If U have a lot of different models, it is easy for U to know what it is
"config": 'bert-base-uncased', # which pre-trained model config U use
"learning_rate": 1e-4, # the speed that model learn
"epochs": 3, # If U would fine-tune it, the epochs didn't need to set too much
"max_len": 512, # the max length of input tokens in the BERT model
"batch_size": 16,
"dropout": 0.1, # how random amount will be give up
"activation": 'tanh',
"hidden_dim": 384,
}
  • 訓練與評估-注意事項

在 PyTorch 中,模型和資料需要在同一裝置上才可以進行訓練,所以需要將兩者都丟到 GPU 上就是cuda啦,但如果沒有 gpu 也是可以用 cpu,只是會很慢==,可以在該變數後加上 .device 就知道該物件在哪一個裝置囉。 然後宣告 loss function 和 optimizer,如果有想使用 scheduler 也可以,但要記得更新是以 step 去計算哦 要自己換算一下,如果是想在特定哪幾個 epoch 改變 lr 的話~當然這幾個都可以自己定義想要怎麼跑,之後有空在貼幾個範例給大家

import torch

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = BertClassifier.from_pretrained(parameters['config'], parameters).to(device)
loss_fct = nn.CrossEntropyLoss() # we use cross entrophy loss

## You can custom your optimizer (e.g. SGD .etc) ##
# we use Adam here
optimizer = torch.optim.Adam(model.parameters(), lr=parameters['learning_rate'], betas=(0.9, 0.999), eps=1e-9)

## You also can add your custom scheduler ##
# num_train_steps = len(train_loader) * parameters['epochs]
# scheduler = get_cosine_schedule_with_warmup(optimizer, num_warmup_steps=int(0.1 * num_train_steps), num_training_steps=num_train_steps, num_cycles=1)
  • 訓練與評估-驗證階段

對於驗證與測試階段,需添加 model.eval() 以及 with torch.no_grad() 以調整模型狀態。此處計算整體 performance 需將全部批次的結果加總平均後才會是該 epoch 的綜合結果 也就是說,每一個 epoch 需將所有資料跑過一遍才能得到該 epoch 的 metrics

import torch.nn as nn
# evaluate dataloader
def evaluate(model, data_loader, device):
val_loss, val_acc, val_f1, val_rec, val_prec = 0.0, 0.0, 0.0, 0.0, 0.0
step_count = 0
loss_fct = nn.CrossEntropyLoss()
model.eval()
with torch.no_grad():
for data in data_loader:
ids, masks, token_type_ids, labels = [t.to(device) for t in data]

logits = model(input_ids = ids,
token_type_ids = token_type_ids,
attention_mask = masks)
acc, f1, rec, prec = cal_metrics(get_pred(logits), labels, 'macro')
loss = loss_fct(logits, labels) # 直接丟就好,不用特意做轉換(但如果非二分類,需考慮 one-hot 標籤的轉換)

val_loss += loss.item()
val_acc += acc
val_f1 += f1
val_rec += rec
val_prec += prec
step_count+=1

val_loss = val_loss / step_count
val_acc = val_acc / step_count
val_f1 = val_f1 / step_count
val_rec = val_rec / step_count
val_prec = val_prec / step_count

return val_loss, val_acc, val_f1, val_rec, val_prec
  • 訓練與評估-訓練階段

對於訓練階段,需添加 model.train() 以及 optimizer.zero_grad()、loss.backward()、optimizer.step() 以更新模型權重 具體請回憶模型是如何進行學習的。其他寫法與 evaluate() 大同小異,通常會在此印出每一個 epoch 的結果。 作者的寫法是將 epoch 的 for loop 放在 train() 裡面,也有一些人會將 for loop 放到外面,另外寫一個 Trainer 之類的。

import time
def train(model, train_loader, val_loader, optimizer, args, device):

metrics = ['loss', 'acc', 'f1', 'rec', 'prec']
mode = ['train_', 'val_']
record = {s+m :[] for s in mode for m in metrics}

loss_fct = nn.CrossEntropyLoss()

for epoch in range(args["epochs"]):

st_time = time.time()
train_loss, train_acc, train_f1, train_rec, train_prec = 0.0, 0.0, 0.0, 0.0, 0.0
step_count = 0

model.train()
for data in train_loader:

ids, masks, token_type_ids, labels = [t.to(device) for t in data]

optimizer.zero_grad()

logits = model(input_ids = ids,
token_type_ids = token_type_ids,
attention_mask = masks)

acc, f1, rec, prec = cal_metrics(get_pred(logits), labels, 'macro')
loss = loss_fct(logits, labels)

loss.backward()
optimizer.step()

train_loss += loss.item()
train_acc += acc
train_f1 += f1
train_rec += rec
train_prec += prec
step_count += 1

val_loss, val_acc, val_f1, val_rec, val_prec = evaluate(model, val_loader, device)

train_loss = train_loss / step_count
train_acc = train_acc / step_count
train_f1 = train_f1 / step_count
train_rec = train_rec / step_count
train_prec = train_prec / step_count

print('[epoch %d] cost time: %.4f s'%(epoch + 1, time.time() - st_time))
print(' loss acc f1 rec prec')
print('train | %.4f, %.4f, %.4f, %.4f, %.4f'%(train_loss, train_acc, train_f1, train_rec, train_prec))
print('val | %.4f, %.4f, %.4f, %.4f, %.4f\n'%(val_loss, val_acc, val_f1, val_rec, val_prec))

# record training metrics of each training epoch
record['train_loss'].append(train_loss)
record['train_acc'].append(train_acc)
record['train_f1'].append(train_f1)
record['train_rec'].append(train_rec)
record['train_prec'].append(train_prec)

record['val_loss'].append(val_loss)
record['val_acc'].append(val_acc)
record['val_f1'].append(val_f1)
record['val_rec'].append(val_rec)
record['val_prec'].append(val_prec)

# save model
save_checkpoint(args["model_name"] + '_' + args["time"].split('_')[0] + '.pt', model)

return record

全部訓練完後,要記得將模型結果儲存下來,才不會每次都要重新訓練。至於儲存的方式是將模型當前的狀態儲存起來,以利後續重複使用。而讀取的方式往往需要一個具有相同初始化 __init__ 的模型以載入先前狀態。 這邊提供一個普遍的操作是,在使用相同的__init__載入模型後,forward 採取與原模型不同的做法(例如原本只跑一層 dense,改成跑好幾層),該作法與我們使用既有的 pretrained model,再去添加其他層數有異曲同工之妙

# save model to path
def save_checkpoint(save_path, model):
if save_path == None:
return
torch.save(model.state_dict(), save_path)
print(f'Model saved to ==> {save_path}')

# load model from path
def load_checkpoint(load_path, model, device):
if load_path==None:
return
state_dict = torch.load(load_path, map_location=device)
print(f'\nModel loaded from <== {load_path}')

model.load_state_dict(state_dict)
return model

為了將訓練結果以圖表呈現,通常會將每個 epoch 的結果儲存下來。matplotlib 的細節使用就不多贅述,大家可以上網找找看自己喜歡的樣式 作者這邊有點懶,將每一個指標個別輸出,但你可以把全部的圖合到同一張圖(設定子圖)

import matplotlib.pyplot as plt

# draw the learning curve
def draw_pic(record, name, img_save=False, show=False):
x_ticks = range(1, parameters['epochs']+1)

plt.figure(figsize=(6, 3))

plt.plot(x_ticks, record['train_'+name], '-o', color='lightskyblue',
markeredgecolor="teal", markersize=3, markeredgewidth=1, label = 'Train')
plt.plot(x_ticks, record['val_'+name], '-o', color='pink',
markeredgecolor="salmon", markersize=3, markeredgewidth=1, label = 'Val')
plt.grid(color='lightgray', linestyle='--', linewidth=1)

plt.title('Model', fontsize=14)
plt.ylabel(name, fontsize=12)
plt.xlabel('Epoch', fontsize=12)
plt.xticks(x_ticks, fontsize=12)
plt.yticks(fontsize=12)
plt.legend(loc='lower right' if not name.lower().endswith('loss') else 'upper right')

# define saved figure or not
if img_save:
plt.savefig(name+'.png', transparent=False, dpi=300)
if show:
plt.show()

plt.close()

全部都定義完就可以開始訓練了!記得回傳那些數值才可以畫圖阿,不然就是要把畫圖也在裡面一起做囉,否則程式執行完就不見了,只看的到印出來的提示訊息喔~

history = train(model, train_loader, val_loader, optimizer, parameters, device)

# draw all metrics figure
draw_pic(history, 'loss', img_save=True, show=False)
draw_pic(history, 'acc', img_save=True, show=False)
draw_pic(history, 'f1', img_save=True, show=False)
draw_pic(history, 'rec', img_save=True, show=False)
draw_pic(history, 'prec', img_save=True, show=False)

files = []
files.append('loss.png')
files.append('acc.png')
files.append('f1.png')
files.append('rec.png')
files.append('prec.png')
send_email(parameters, files)
圖1.訓練過程提示訊息
圖2.訓練結果曲線圖

🌟 個人習慣會喜歡將結果透過 email 直接寄給自己,就不用隨時在想什麼時候才會訓練好囉~

import smtplib
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
from email.mime.image import MIMEImage
from email.mime.base import MIMEBase
from email import encoders

def send_email(hyperparameters, png_files):

# create MIMEMultipart
msg = MIMEMultipart()

# define sender and receiver
msg['From'] = 'YOUR EMAIL ACCOUNT'
msg['To'] = 'YOUR EMAIL ACCOUNT'

msg['Subject'] = 'Training Result'

# create text content
text = MIMEText(f'{hyperparameters}\n\n')
msg.attach(text)

# read png
for file in png_files:
with open(file, 'rb') as fp:
image = MIMEBase('application', "octet-stream")
image.set_payload(fp.read())
encoders.encode_base64(image)
image.add_header('Content-Disposition', 'attachment', filename=file)
msg.attach(image)

# 建立 SMTP 服務
server = smtplib.SMTP('smtp.gmail.com', 587)
server.ehlo()
server.starttls()
server.login('YOUR EMAIL ACCOUNT', 'YOUR EMAIL PASSWORD')

server.send_message(msg) # send email
server.quit() # close the email channel
  • 使用與預測

訓練完後,又該怎麼使用訓練好的模型來做預測呢?簡單來說,過程跟驗證與測試階段沒有任何不同,只會隨使用者端的要求而有不同的調整。

例如:單筆預測並轉換預測機率為標籤。

def Softmax(x):
return torch.exp(x) / torch.exp(x).sum()

# predict a single sentence
def predict_one(query, model):

tokenizer = AutoTokenizer.from_pretrained(parameters['config'])
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

model.eval()
with torch.no_grad():
inputs = tokenizer.encode_plus(
query,
max_length = parameters['max_len'],
truncation = True,
padding = 'max_length',
return_tensors = 'pt'
)

input_ids = inputs['input_ids'].to(device)
attention_mask = inputs['attention_mask'].to(device)
token_type_ids = inputs["token_type_ids"].to(device)

# forward pass
logits = model(input_ids, attention_mask, token_type_ids)
probs = Softmax(logits) # get each class-probs
label_index = torch.argmax(probs[0], dim=0)
pred = label_index.item()

return probs, pred

前面有提到,模型內沒有做 softmax,所以在使用時需要記得自己添加,然後一樣使用 argmax 就可以得到預測出來的 label 為何了!

# You can load the model from the existing result
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
init_model = BertClassifier.from_pretrained(parameters['config'], parameters) # build an initial model
model = load_checkpoint('./bert.pt', init_model, device).to(device) # and load the weight of model from specify file

%%time 是在 colab 中一個非常方便計算執行時間的指令,畢竟多數的系統都對於速度有一定要求,如果預測花費時間太長,對於企業來說,它的使用程度會大幅降低。

%%time
probs, pred = predict_one("This movie doesn't attract me", model)
print(probs, pred)

'''
tensor([[0.9779, 0.0221]], device='cuda:0') 0
CPU times: user 78.1 ms, sys: 4 ms, total: 82.1 ms
Wall time: 340 ms
'''

到這裡,你就已經完成一份基於 BERT 模型的情緒分析實作了!接下來,可以嘗試替換各式各樣的 pretrained model 來練習喔~

--

--