-
Notifications
You must be signed in to change notification settings - Fork 197
/
Copy pathtransformer_postag.py
122 lines (105 loc) · 4.83 KB
/
transformer_postag.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
# Defined in Section 4.7.3
import math
import torch
from torch import nn, optim
from torch.nn import functional as F
from torch.utils.data import Dataset, DataLoader
from torch.nn.utils.rnn import pad_sequence, pack_padded_sequence
from collections import defaultdict
from vocab import Vocab
from utils import load_treebank, length_to_mask
#tqdm是一个Pyth模块,能以进度条的方式显式迭代的进度
from tqdm.auto import tqdm
class TransformerDataset(Dataset):
def __init__(self, data):
self.data = data
def __len__(self):
return len(self.data)
def __getitem__(self, i):
return self.data[i]
def collate_fn(examples):
lengths = torch.tensor([len(ex[0]) for ex in examples])
inputs = [torch.tensor(ex[0]) for ex in examples]
targets = [torch.tensor(ex[1]) for ex in examples]
# 对batch内的样本进行padding,使其具有相同长度
inputs = pad_sequence(inputs, batch_first=True, padding_value=vocab["<pad>"])
targets = pad_sequence(targets, batch_first=True, padding_value=vocab["<pad>"])
return inputs, lengths, targets, inputs != vocab["<pad>"]
class PositionalEncoding(nn.Module):
def __init__(self, d_model, dropout=0.1, max_len=512):
super(PositionalEncoding, self).__init__()
pe = torch.zeros(max_len, d_model)
position = torch.arange(0, max_len, dtype=torch.float).unsqueeze(1)
div_term2 = torch.pow(torch.tensor(10000.0), torch.arange(0, d_model, 2).float() / d_model)
div_term1 = torch.pow(torch.tensor(10000.0), torch.arange(1, d_model, 2).float() / d_model)
# 高级切片方式,即从0开始,两个步长取一个。即奇数和偶数位置赋值不一样。直观来看就是每一句话的
pe[:, 0::2] = torch.sin(position * div_term2)
pe[:, 1::2] = torch.cos(position * div_term1)
pe = pe.unsqueeze(0).transpose(0, 1)
self.register_buffer('pe', pe)
def forward(self, x):
x = x + self.pe[:x.size(0), :]
return x
class Transformer(nn.Module):
def __init__(self, vocab_size, embedding_dim, hidden_dim, num_class,
dim_feedforward=512, num_head=2, num_layers=2, dropout=0.1, max_len=512, activation: str = "relu"):
super(Transformer, self).__init__()
# 词嵌入层
self.embedding_dim = embedding_dim
self.embeddings = nn.Embedding(vocab_size, embedding_dim)
self.position_embedding = PositionalEncoding(embedding_dim, dropout, max_len)
# 编码层:使用Transformer
encoder_layer = nn.TransformerEncoderLayer(hidden_dim, num_head, dim_feedforward, dropout, activation)
self.transformer = nn.TransformerEncoder(encoder_layer, num_layers)
# 输出层
self.output = nn.Linear(hidden_dim, num_class)
def forward(self, inputs, lengths):
inputs = torch.transpose(inputs, 0, 1)
hidden_states = self.embeddings(inputs)
hidden_states = self.position_embedding(hidden_states)
attention_mask = length_to_mask(lengths) == False
hidden_states = self.transformer(hidden_states, src_key_padding_mask=attention_mask).transpose(0, 1)
logits = self.output(hidden_states)
log_probs = F.log_softmax(logits, dim=-1)
return log_probs
embedding_dim = 128
hidden_dim = 128
batch_size = 32
num_epoch = 5
#加载数据
train_data, test_data, vocab, pos_vocab = load_treebank()
train_dataset = TransformerDataset(train_data)
test_dataset = TransformerDataset(test_data)
train_data_loader = DataLoader(train_dataset, batch_size=batch_size, collate_fn=collate_fn, shuffle=True)
test_data_loader = DataLoader(test_dataset, batch_size=1, collate_fn=collate_fn, shuffle=False)
num_class = len(pos_vocab)
#加载模型
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model = Transformer(len(vocab), embedding_dim, hidden_dim, num_class)
model.to(device) #将模型加载到GPU中(如果已经正确安装)
#训练过程
nll_loss = nn.NLLLoss()
optimizer = optim.Adam(model.parameters(), lr=0.001) #使用Adam优化器
model.train()
for epoch in range(num_epoch):
total_loss = 0
for batch in tqdm(train_data_loader, desc=f"Training Epoch {epoch}"):
inputs, lengths, targets, mask = [x.to(device) for x in batch]
log_probs = model(inputs, lengths)
loss = nll_loss(log_probs[mask], targets[mask])
optimizer.zero_grad()
loss.backward()
optimizer.step()
total_loss += loss.item()
print(f"Loss: {total_loss:.2f}")
#测试过程
acc = 0
total = 0
for batch in tqdm(test_data_loader, desc=f"Testing"):
inputs, lengths, targets, mask = [x.to(device) for x in batch]
with torch.no_grad():
output = model(inputs, lengths)
acc += (output.argmax(dim=-1) == targets)[mask].sum().item()
total += mask.sum().item()
#输出在测试集上的准确率
print(f"Acc: {acc / total:.2f}")