news 2025/12/25 7:55:42

带注意力机制的seq2seq实例与测试(Bahdanau Attention)

作者头像

张小明

前端开发工程师

1.2k 24
文章封面图
带注意力机制的seq2seq实例与测试(Bahdanau Attention)

意力机制(Bahdanau Attention)

举一个例子:在日常生活中,比如我们看一幅黑白画(画中有一个红色的苹果,其他的都是黑白的物体,例如香蕉),这个时候我们无意识的看一眼画,很有可能第一个关注的就是这个红色的苹果,但是我有意识的控制眼睛集中去看香蕉,这个时候我关注的就是香蕉。

在上面的例子中,我们的注意力,最开始是无意识的看苹果,后面有意识的注意香蕉,这里面的区别就是我们在这个动作里面加了:意识。当加了意识后,我们就可以有选择的根据条件来关注这幅画的我想关注的地方。

然后我们可以对上面的现象进行建模:

R

=

A

t

t

e

n

t

i

o

n

(

Q

,

K

)

V

,这里我们将Attention当作意识,V当作黑白画的特征,Q是画中是什么?K是V的标签(你可以把K当作是V有关联的部分,不同的K,对应的不同的V),如果没有Attention,R就是苹果,有了Attention,R就可以是香蕉。

我们回头想一想上一篇文的seq2seq中,我们的encoder的output是最后一层rnn的所有时间步的隐藏状态(num_steps,batch_size,num_hiddens),这里包含了我们的序列数据在不同时间步的特征变化,当我们在做decoder的时候,我们是拿着这个encoder的最后一层rnn的最后一个时间步的隐藏状态(1,batch_size,num_hiddens)来作为context的,是一个固定的值,这样有几个问题:

对于长序列来说,context可能丢失信息。

我们从固定context中解码信息,导致了我们对序列在特定解码步骤中,对context关注重点是一样的。

针对上面seq2seq的问题,Bahdanau设计了一种模型,可以解决我们遇到的问题,其定义如下:

c

t

=

T

t

=

1

α

(

s

t

1

,

h

t

)

h

t

,看公式我们可以知道,这里定义了Q(decoder的上一次隐藏态

s

t

1

)/K(encoder的output的部分

h

t

)/V(encoder的output的部分

h

t

)三个概念,含义就是通过Q+K来计算一个权重矩阵W(通过softmax归一化),然后然后将W和V进行计算,得到了我们通过W关注到的新的

V

n

e

w

,这里的W就是我们的注意力矩阵,代表我们关注V中的哪些部分。整个计算过程,就相当于我们生成了新成context具备了注意力机制。

带注意力机制的seq2seq 英文翻译中文 的实例

下面的代码和上一篇文章的代码只有decoder部分有比较大的差别,其他的基本类似。如dataset部分的内容,请参考上一篇文章。

seq2seq完整代码如下

import os

import random

import torch

import math

from torch import nn

from torch.nn import functional as F

import numpy as np

import time

import visdom

import collections

import dataset

class Accumulator:

"""在n个变量上累加"""

def __init__(self, n):

"""Defined in :numref:`sec_softmax_scratch`"""

self.data = [0.0] * n

def add(self, *args):

self.data = [a + float(b) for a, b in zip(self.data, args)]

def reset(self):

self.data = [0.0] * len(self.data)

def __getitem__(self, idx):

return self.data[idx]

class Timer:

"""记录多次运行时间"""

def __init__(self):

"""Defined in :numref:`subsec_linear_model`"""

self.times = []

self.start()

def start(self):

"""启动计时器"""

self.tik = time.time()

def stop(self):

"""停止计时器并将时间记录在列表中"""

self.times.append(time.time() - self.tik)

return self.times[-1]

def avg(self):

"""返回平均时间"""

return sum(self.times) / len(self.times)

def sum(self):

"""返回时间总和"""

return sum(self.times)

def cumsum(self):

"""返回累计时间"""

return np.array(self.times).cumsum().tolist()

class Encoder(nn.Module):

"""编码器-解码器架构的基本编码器接口"""

def __init__(self, **kwargs):

# 调用父类nn.Module的构造函数,确保正确初始化

super(Encoder, self).__init__(**kwargs)

def forward(self, X, *args):

# 抛出未实现错误,意味着该方法需要在子类中具体实现

raise NotImplementedError

class Decoder(nn.Module):

"""编码器-解码器架构的基本解码器接口

Defined in :numref:`sec_encoder-decoder`"""

def __init__(self, **kwargs):

# 调用父类nn.Module的构造函数,确保正确初始化

super(Decoder, self).__init__(**kwargs)

def init_state(self, enc_outputs, *args):

# 抛出未实现错误,意味着该方法需要在子类中具体实现

raise NotImplementedError

def forward(self, X, state):

# 抛出未实现错误,意味着该方法需要在子类中具体实现

raise NotImplementedError

class EncoderDecoder(nn.Module):

"""编码器-解码器架构的基类

Defined in :numref:`sec_encoder-decoder`"""

def __init__(self, encoder, decoder, **kwargs):

# 调用父类nn.Module的构造函数,确保正确初始化

super(EncoderDecoder, self).__init__(**kwargs)

# 将传入的编码器实例赋值给类的属性

self.encoder = encoder

# 将传入的解码器实例赋值给类的属性

self.decoder = decoder

def forward(self, enc_X, dec_X, enc_X_valid_len, *args):

# 调用编码器的前向传播方法,处理输入的编码器输入数据enc_X

enc_outputs = self.encoder(enc_X, *args)

# 调用解码器的init_state方法,根据编码器的输出初始化解码器的状态

dec_state = self.decoder.init_state(enc_outputs, enc_X_valid_len)

# 调用解码器的前向传播方法,处理输入的解码器输入数据dec_X和初始化后的状态

return self.decoder(dec_X, dec_state)

def masked_softmax(X, valid_lens): #@save

"""

执行带掩码的 Softmax 操作。

参数:

X (torch.Tensor): 待 Softmax 的张量,通常是注意力机制中的“分数”(scores)。

其形状通常为 (批量大小, 查询数量/序列长度, 键值对数量/序列长度)。

valid_lens (torch.Tensor): 序列的有效长度张量。

形状可以是 (批量大小,) 或 (批量大小, 键值对数量)。

用于指示每个序列的哪个部分是有效的(非填充)。

返回:

torch.Tensor: 经过 Softmax 归一化且填充部分被忽略的概率分布张量。

"""

# 辅助函数:创建一个序列掩码,并用特定值覆盖被掩码(填充)的元素

def _sequence_mask(X, valid_len, value=0):

"""

根据有效长度(valid_len)创建掩码,并应用于张量 X。

参数:

X (torch.Tensor): 形状为 (批量大小 * 查询数量, 最大长度) 的张量。

valid_len (torch.Tensor): 形状为 (批量大小 * 查询数量,) 的有效长度向量。

value (float): 用于替换被掩码元素的填充值。

返回:

torch.Tensor: 被填充值覆盖后的张量 X。

"""

# 获取序列的最大长度(张量的第二个维度)

maxlen = X.size(1)

# 核心掩码逻辑:

# 1. torch.arange((maxlen), ...) 创建一个从 0 到 maxlen-1 的序列(代表时间步索引)

# 2. [None, :] 使其形状变为 (1, maxlen),用于广播

# 3. valid_len[:, None] 使有效长度形状变为 (批量大小 * 查询数量, 1),用于广播

# 4. < 比较操作:当索引 < 有效长度时,结果为 True(有效元素),否则为 False(填充元素)

mask = torch.arange((maxlen), dtype=torch.float32,

device=X.device)[None, :] < valid_len[:, None]

# 逻辑非 ~mask 得到填充部分的掩码(True 表示填充部分)

# 使用填充值(value,通常是 -1e6)覆盖填充元素

X[~mask] = value

return X

# 1. 处理无需掩码的情况

if valid_lens is None:

# 如果未提供有效长度,则执行标准 Softmax

return nn.functional.softmax(X, dim=-1)

# 2. 处理需要掩码的情况

else:

# 备份原始形状,用于后续重塑

shape = X.shape

# 统一 valid_lens 的形状,使其与 X 的前两个维度相匹配

if valid_lens.dim() == 1:

# 适用于批量中每个序列只有一个有效长度的情况(例如,K-V 序列是等长的)

# 将 valid_lens 重复 shape[1] 次,匹配 X 的查询/序列长度维度

valid_lens = torch.repeat_interleave(valid_lens, shape[1])

else:

# 适用于每个查询-键值对的有效长度都不同的情况

# 将 2D 张量展平为 1D 向量

valid_lens = valid_lens.reshape(-1)

# 预处理 Softmax 输入:将 X 调整为 2D 矩阵 (批量*查询数量, 键值对数量)

# 并在最后一个轴(Softmax 轴)上,用一个非常大的负值替换被掩码的元素

# Softmax 时 exp(-1e6) 趋近于 0,从而忽略填充部分。

X = _sequence_mask(X.reshape(-1, shape[-1]), valid_lens, value=-1e6)

# 对经过掩码处理的 X 执行 Softmax

# 结果张量 X 被重塑回原始的 3D 形状 (批量大小, 查询数量, 键值对数量)

# 并在最后一个维度(dim=-1)上进行归一化,得到注意力权重

return nn.functional.softmax(X.reshape(shape), dim=-1)

class AdditiveAttention(nn.Module): #@save

"""

加性注意力(Additive Attention)模块。

通过将 Query 和 Key 投影到相同的维度后相加,再通过 tanh 激活和线性层计算注意力分数。

公式核心:score(Q, K) = w_v^T * tanh(W_q*Q + W_k*K)

"""

def __init__(self, num_hiddens, dropout, **kwargs):

"""

初始化加性注意力模块。

参数:

num_hiddens (int): 隐藏层维度,Q 和 K 投影后的维度。

dropout (float): Dropout 率。

"""

super(AdditiveAttention, self).__init__(**kwargs)

# W_k:将 Key (K) 向量投影到 num_hiddens 维度的线性层

# 使用 nn.LazyLinear 延迟初始化,直到第一次 forward 传入 K 的维度

self.W_k = nn.LazyLinear(num_hiddens, bias=False)

# W_q:将 Query (Q) 向量投影到 num_hiddens 维度的线性层

# 使用 nn.LazyLinear 延迟初始化

self.W_q = nn.LazyLinear(num_hiddens, bias=False)

# w_v:将激活后的特征向量 (W_q*Q + W_k*K) 投影成一个标量分数(维度为 1)

# 使用 nn.LazyLinear 延迟初始化

self.w_v = nn.LazyLinear(1, bias=False)

# Dropout 层,用于防止过拟合

self.dropout = nn.Dropout(dropout)

def forward(self, queries, keys, values, valid_lens):

"""

执行前向传播计算。

参数:

queries (torch.Tensor): 查询向量 Q。形状通常为 (批量大小, 查询数量, 查询维度)。

keys (torch.Tensor): 键向量 K。形状通常为 (批量大小, 键值对数量, 键维度)。

values (torch.Tensor): 值向量 V。形状通常为 (批量大小, 键值对数量, 值维度)。

valid_lens (torch.Tensor): 键值对序列的有效长度,用于掩盖填充部分。

返回:

torch.Tensor: 注意力加权后的值向量。形状为 (批量大小, 查询数量, 值维度)。

"""

# 1. 线性变换:分别对 Q 和 K 进行投影

queries, keys = self.W_q(queries), self.W_k(keys)

# 2. 维度扩展与相加(Attention Scoring 的核心步骤)

# queries.unsqueeze(2): 形状从 (批量大小, 查询数量, num_hiddens)

# 变为 (批量大小, 查询数量, 1, num_hiddens)。

# keys.unsqueeze(1): 形状从 (批量大小, 键值对数量, num_hiddens)

# 变为 (批量大小, 1, 键值对数量, num_hiddens)。

# 两个张量通过广播机制相加,得到 features,形状为:

# (批量大小, 查询数量, 键值对数量, num_hiddens)

features = queries.unsqueeze(2) + keys.unsqueeze(1)

# 3. 激活函数:应用 tanh 激活(加性注意力机制的要求)

features = torch.tanh(features)

# 4. 投影到标量分数

# self.w_v(features): 将 features 的最后一个维度(num_hiddens)投影成 1。

# scores.squeeze(-1): 移除最后一个单维度 (1),得到最终的注意力分数张量。

# 形状为:(批量大小, 查询数量, 键值对数量)

scores = self.w_v(features).squeeze(-1)

# 5. 归一化(Softmax):使用带掩码的 Softmax 得到注意力权重

# 填充部分的得分会被设置为一个极小的负值,Softmax 后权重趋近于 0。

self.attention_weights = masked_softmax(scores, valid_lens)

# 6. 加权求和

# torch.bmm: 批量矩阵乘法 (Batch Matrix Multiplication)。

# 将 [注意力权重] (批量, Q数量, K数量) 与 [值向量] (批量, K数量, V维度) 相乘

# 得到最终的注意力输出,形状为:(批量大小, 查询数量, 值维度)

# 在 BMM 之前,对注意力权重应用 Dropout。

return torch.bmm(self.dropout(self.attention_weights), values)

#@save

class Seq2SeqEncoder(Encoder):

"""用于序列到序列学习的循环神经网络编码器"""

def __init__(self, vocab_size, embed_size, num_hiddens, num_layers,

dropout=0, **kwargs):

super(Seq2SeqEncoder, self).__init__(**kwargs)

# 嵌入层

self.embedding = nn.Embedding(vocab_size, embed_size)

self.rnn = nn.GRU(embed_size, num_hiddens, num_layers,

dropout=dropout)

# self.lstm = nn.LSTM(embed_size, num_hiddens, num_layers)

def forward(self, X, *args):

# 输入X.shape = (batch_size,num_steps)

# 输出'X'的形状:(batch_size,num_steps,embed_size)

X = self.embedding(X)

# 在循环神经网络模型中,第一个轴对应于时间步

X = X.permute(1, 0, 2)

# 如果未提及状态,则默认为0

output, state = self.rnn(X)

# output : 这个返回值是所有时间步的隐藏状态序列

# output的形状:(num_steps,batch_size,num_hiddens)

# hn (hidden) : 这是每一层rnn的最后一个时间步的隐藏状态

# state的形状:(num_layers,batch_size,num_hiddens)

return output, state

class AttentionDecoder(Decoder): #@save

"""The base attention-based decoder interface."""

def __init__(self):

super().__init__()

@property

def attention_weights(self):

raise NotImplementedError

class Seq2SeqAttentionDecoder(AttentionDecoder):

def __init__(self, vocab_size, embed_size, num_hiddens, num_layers,

dropout=0):

super().__init__()

self.attention = AdditiveAttention(num_hiddens, dropout)

self.embedding = nn.Embedding(vocab_size, embed_size)

self.rnn = nn.GRU(

embed_size + num_hiddens, num_hiddens, num_layers,

dropout=dropout)

self.dense = nn.LazyLinear(vocab_size)

# self.apply(d2l.init_seq2seq)

def init_state(self, enc_outputs, enc_valid_lens):

# Shape of outputs: (num_steps, batch_size, num_hiddens).

# Shape of hidden_state: (num_layers, batch_size, num_hiddens)

outputs, hidden_state = enc_outputs

return (outputs.permute(1, 0, 2), hidden_state, enc_valid_lens)

def forward(self, X, state):

# Shape of enc_outputs: (batch_size, num_steps, num_hiddens).

# Shape of hidden_state: (num_layers, batch_size, num_hiddens)

enc_outputs, hidden_state, enc_valid_lens = state

# Shape of the output X: (num_steps, batch_size, embed_size)

X = self.embedding(X).permute(1, 0, 2)

outputs, self._attention_weights = [], []

for x in X:

# Shape of query: (batch_size, 1, num_hiddens)

query = torch.unsqueeze(hidden_state[-1], dim=1)

# Shape of context: (batch_size, 1, num_hiddens)

context = self.attention(

query, enc_outputs, enc_outputs, enc_valid_lens)

# Concatenate on the feature dimension

x = torch.cat((context, torch.unsqueeze(x, dim=1)), dim=-1)

# Reshape x as (1, batch_size, embed_size + num_hiddens)

out, hidden_state = self.rnn(x.permute(1, 0, 2), hidden_state)

outputs.append(out)

self._attention_weights.append(self.attention.attention_weights)

# After fully connected layer transformation, shape of outputs:

# (num_steps, batch_size, vocab_size)

outputs = self.dense(torch.cat(outputs, dim=0))

return outputs.permute(1, 0, 2), [enc_outputs, hidden_state,

enc_valid_lens]

@property

def attention_weights(self):

return self._attention_weights

def try_gpu(i=0):

"""如果存在,则返回gpu(i),否则返回cpu()

Defined in :numref:`sec_use_gpu`"""

if torch.cuda.device_count() >= i + 1:

return torch.device(f'cuda:{i}')

return torch.device('cpu')

def sequence_mask(X, valid_len, value=0):

"""在序列中屏蔽不相关的项"""

maxlen = X.size(1)

mask = torch.arange((maxlen), dtype=torch.float32,

device=X.device)[None, :] < valid_len[:, None]

X[~mask] = value

return X

class MaskedSoftmaxCELoss(nn.CrossEntropyLoss):

"""带遮蔽的softmax交叉熵损失函数"""

# pred的形状:(batch_size,num_steps,vocab_size)

# label的形状:(batch_size,num_steps)

# valid_len的形状:(batch_size,)

def forward(self, pred, label, valid_len):

weights = torch.ones_like(label)

weights = sequence_mask(weights, valid_len)

self.reduction='none'

unweighted_loss = super(MaskedSoftmaxCELoss, self).forward(

pred.permute(0, 2, 1), label)

weighted_loss = (unweighted_loss * weights).mean(dim=1)

return weighted_loss

def grad_clipping(net, theta): #@save

"""裁剪梯度"""

if isinstance(net, nn.Module):

params = [p for p in net.parameters() if p.requires_grad]

else:

params = net.params

norm = torch.sqrt(sum(torch.sum((p.grad ** 2)) for p in params))

if norm > theta:

for param in params:

param.grad[:] *= theta / norm

def train_seq2seq(net, data_iter, lr, num_epochs, tgt_vocab, device):

"""训练序列到序列模型"""

def xavier_init_weights(m):

if type(m) == nn.Linear:

nn.init.xavier_uniform_(m.weight)

if type(m) == nn.GRU:

for param in m._flat_weights_names:

if "weight" in param:

nn.init.xavier_uniform_(m._parameters[param])

net.apply(xavier_init_weights)

net.to(device)

optimizer = torch.optim.Adam(net.parameters(), lr=lr)

loss = MaskedSoftmaxCELoss()

net.train()

vis = visdom.Visdom(env=u'test1', server="http://127.0.0.1", port=8097)

animator = vis

for epoch in range(num_epochs):

timer = Timer()

metric = Accumulator(2) # 训练损失总和,词元数量

for batch in data_iter:

#清零(reset)优化器中的梯度缓存

optimizer.zero_grad()

# x.shape = [batch_size, num_steps]

X, X_valid_len, Y, Y_valid_len = [x.to(device) for x in batch]

# bos.shape = batch_size 个 bos-id

bos = torch.tensor([tgt_vocab['<bos>']] * Y.shape[0],

device=device).reshape(-1, 1)

# dec_input.shape = (batch_size, num_steps)

# 解码器的输入通常由序列的起始标志 bos 和目标序列(去掉末尾的部分 Y[:, :-1])组成。

dec_input = torch.cat([bos, Y[:, :-1]], 1) # 强制教学

# Y_hat的形状:(batch_size,num_steps,vocab_size)

Y_hat, _ = net(X, dec_input, X_valid_len)

l = loss(Y_hat, Y, Y_valid_len)

l.sum().backward() # 损失函数的标量进行“反向传播”

grad_clipping(net, 1)

num_tokens = Y_valid_len.sum()

optimizer.step()

with torch.no_grad():

metric.add(l.sum(), num_tokens)

if (epoch + 1) % 10 == 0:

# print(predict('你是?'))

# print(epoch)

# animator.add(epoch + 1, )

if epoch == 9:

# 清空图表:使用空数组来替换现有内容

vis.line(X=np.array([0]), Y=np.array([0]), win='train_ch8', update='replace')

# _loss_val = l

# _loss_val = _loss_val.cpu().sum().detach().numpy()

vis.line(

X=np.array([epoch + 1]),

Y=[ metric[0] / metric[1]],

win='train_ch8',

update='append',

opts={

'title': 'train_ch8',

'xlabel': 'epoch',

'ylabel': 'loss',

'linecolor': np.array([[0, 0, 255]]), # 蓝色线条

}

)

print(f'loss {metric[0] / metric[1]:.3f}, {metric[1] / timer.stop():.1f} '

f'tokens/sec on {str(device)}')

torch.save(net.cpu().state_dict(), 'model_h.pt') # [[6]]

torch.save(net.cpu(), 'model.pt') # [[6]]

def predict_seq2seq(net, src_sentence, src_vocab, tgt_vocab, num_steps,

device, save_attention_weights=False):

"""序列到序列模型的预测"""

# 在预测时将net设置为评估模式

net.eval()

src_tokens = src_vocab[src_sentence.lower().split(' ')] + [

src_vocab['<eos>']]

enc_valid_len = torch.tensor([len(src_tokens)], device=device)

src_tokens = dataset.truncate_pad(src_tokens, num_steps, src_vocab['<pad>'])

# 添加批量轴

enc_X = torch.unsqueeze(

torch.tensor(src_tokens, dtype=torch.long, device=device), dim=0)

enc_outputs = net.encoder(enc_X, enc_valid_len)

dec_state = net.decoder.init_state(enc_outputs, enc_valid_len)

# 添加批量轴

dec_X = torch.unsqueeze(torch.tensor(

[tgt_vocab['<bos>']], dtype=torch.long, device=device), dim=0)

output_seq, attention_weight_seq = [], []

for _ in range(num_steps):

Y, dec_state = net.decoder(dec_X, dec_state)

# 我们使用具有预测最高可能性的词元,作为解码器在下一时间步的输入

dec_X = Y.argmax(dim=2)

pred = dec_X.squeeze(dim=0).type(torch.int32).item()

# 保存注意力权重(稍后讨论)

if save_attention_weights:

attention_weight_seq.append(net.decoder.attention_weights[0].reshape(num_steps).cpu())

# 一旦序列结束词元被预测,输出序列的生成就完成了

if pred == tgt_vocab['<eos>']:

break

output_seq.append(pred)

return ' '.join(tgt_vocab.to_tokens(output_seq)), attention_weight_seq

def bleu(pred_seq, label_seq, k): #@save

"""计算BLEU"""

pred_tokens, label_tokens = pred_seq.split(' '), [i for i in label_seq]

len_pred, len_label = len(pred_tokens), len(label_tokens)

score = math.exp(min(0, 1 - len_label / len_pred))

for n in range(1, k + 1):

num_matches, label_subs = 0, collections.defaultdict(int)

for i in range(len_label - n + 1):

label_subs[' '.join(label_tokens[i: i + n])] += 1

for i in range(len_pred - n + 1):

if label_subs[' '.join(pred_tokens[i: i + n])] > 0:

num_matches += 1

label_subs[' '.join(pred_tokens[i: i + n])] -= 1

score *= math.pow(num_matches / (len_pred - n + 1), math.pow(0.5, n))

return score

from matplotlib import pyplot as plt

import matplotlib

# from matplotlib_inline import backend_inline

def show_heatmaps(matrices, xlabel, ylabel, titles=None, figsize=(2.5, 2.5),

cmap='Reds'):

"""

显示矩阵的热图(Heatmaps)。

这个函数旨在以子图网格的形式绘制多个矩阵,通常用于可视化注意力权重等。

参数:

matrices (numpy.ndarray 或 torch.Tensor 数组):

一个四维数组,形状应为 (num_rows, num_cols, height, width)。

其中,num_rows 和 num_cols 决定了子图网格的布局,

height 和 width 是每个热图(即每个矩阵)的维度。

xlabel (str):

所有最底行子图的 x 轴标签。

ylabel (str):

所有最左列子图的 y 轴标签。

titles (list of str, optional):

一个包含 num_cols 个标题的列表,用于设置每一列子图的标题。默认 None。

figsize (tuple, optional):

整个图形(figure)的大小。默认 (2.5, 2.5)。

cmap (str, optional):

用于绘制热图的颜色映射(colormap)。默认 'Reds'。

"""

# 导入所需的 matplotlib 模块,确保图形在 Jupyter/IPython 环境中正确显示为 SVG 格式

# (假设在包含这个函数的环境中已经导入了 matplotlib 的 backend_inline)

# backend_inline.set_matplotlib_formats('svg')

matplotlib.use('TkAgg')

# 从输入的 matrices 形状中解构出子图网格的行数和列数

# 假设 matrices 的形状是 (num_rows, num_cols, height, width)

num_rows, num_cols, _, _ = matrices.shape

# 创建一个包含多个子图(axes)的图形(fig)

# fig: 整个图形对象

# axes: 一个 num_rows x num_cols 的子图对象数组

fig, axes = plt.subplots(

num_rows, num_cols,

figsize=figsize,

sharex=True, # 所有子图共享 x 轴刻度

sharey=True, # 所有子图共享 y 轴刻度

squeeze=False # 即使只有一行或一列,也强制返回二维数组的 axes,方便后续循环

)

# 遍历子图的行和对应的矩阵行

# i 是行索引, row_axes 是当前行的子图数组, row_matrices 是当前行的矩阵数组

for i, (row_axes, row_matrices) in enumerate(zip(axes, matrices)):

# 遍历当前行中的子图和对应的矩阵

# j 是列索引, ax 是当前的子图对象, matrix 是当前的待绘矩阵

for j, (ax, matrix) in enumerate(zip(row_axes, row_matrices)):

# 使用 ax.imshow() 绘制热图

# matrix.detach().numpy():将 PyTorch Tensor 转换为 numpy 数组,并从计算图中分离(如果它是 Tensor)

# cmap:指定颜色映射

pcm = ax.imshow(matrix.detach().numpy(), cmap=cmap)

# --- 设置轴标签和标题 ---

# 只有最底行 (i == num_rows - 1) 的子图才显示 x 轴标签

if i == num_rows - 1:

ax.set_xlabel(xlabel)

# 只有最左列 (j == 0) 的子图才显示 y 轴标签

if j == 0:

ax.set_ylabel(ylabel)

# 如果提供了标题列表,则设置当前列的子图标题(所有行共享列标题)

if titles:

ax.set_title(titles[j])

# --- 添加颜色条(Colorbar) ---

# 为整个图形添加一个颜色条,用于表示数值和颜色的对应关系

# pcm: 之前绘制的第一个热图返回的 Colormap

# ax=axes: 颜色条将参照整个子图网格进行定位和缩放

# shrink=0.6: 缩小颜色条的高度/长度,使其只占图形高度的 60%

fig.colorbar(pcm, ax=axes, shrink=0.6)

plt.show()

if __name__ == '__main__':

embed_size, num_hiddens, num_layers, dropout = 32, 32, 2, 0.1

batch_size, num_steps = 64, 10

lr, num_epochs, device = 0.005, 2000, try_gpu()

# train_iter 每个迭代输出:(batch_size, num_steps)

train_iter, src_vocab, tgt_vocab, source, target = dataset.load_data(batch_size, num_steps)

encoder = Seq2SeqEncoder(len(src_vocab), embed_size, num_hiddens, num_layers,

dropout)

decoder = Seq2SeqAttentionDecoder(len(tgt_vocab), embed_size, num_hiddens, num_layers,

dropout)

net = EncoderDecoder(encoder, decoder)

is_train = False

is_show = False

if is_train:

train_seq2seq(net, train_iter, lr, num_epochs, tgt_vocab, device)

elif is_show:

state_dict = torch.load('model_h.pt')

net.load_state_dict(state_dict)

net.to(device)

src_text = "Call us."

translation, attention_weight_seq = predict_seq2seq(

net, src_text, src_vocab, tgt_vocab, num_steps, device, True)

# attention_weights = torch.eye(10).reshape((1, 1, 10, 10))

# (num_rows, num_cols, height, width)

print(f'translation={translation}')

print(attention_weight_seq)

stacked_tensor = torch.stack(attention_weight_seq, dim=0)

stacked_tensor = stacked_tensor.unsqueeze(0).unsqueeze(0)

show_heatmaps(

stacked_tensor,

xlabel='Attention weight', ylabel='Decode Step')

else:

state_dict = torch.load('model_h.pt')

net.load_state_dict(state_dict)

net.to(device)

C = 0

C1 = 0

for i in range(2000):

# print(source[i])

# print(target[i])

translation, attention_weight_seq = predict_seq2seq(

net, source[i], src_vocab, tgt_vocab, num_steps, device)

score = bleu(translation, target[i], k=2)

if score > 0.0:

C = C + 1

if score > 0.8:

C1 = C1 + 1

print(f'{source[i]} => {translation}, bleu {score:.3f}')

print(f'Counter(bleu > 0) = {C}')

print(f'Valid-Counter(bleu > 0.8) = {C1}')

下面是encoder过程的简单分析:

将x通过nn.Embedding得到了(batch_size,num_steps,embed_size)的输入嵌入向量。

将嵌入向量传给nn.GRU,得到了两个输出,并返回:

output,最后一层rnn的所有时间步的隐藏状态(num_steps,batch_size,num_hiddens)。

h_n,所有rnn层的,最后一个时间步的隐藏状态(num_layers,batch_size,num_hiddens)。

下面是decoder过程的简单分析:

将decoder_x通过nn.Embedding得到了(batch_size,num_steps,embed_size)的输入嵌入向量。

将嵌入向量沿着num_steps进行单步运行,每一步经过Attention过程,得到最终的output,以及最后一个时间步的所有rnn层的h_n,每一步执行如下步骤:

将rnn最后一层的隐藏态作为Q(第一次Q是来自于encoder,后续都是decoder的每一次运行过程产生的隐藏态)

将encoder的output作为K,V,得到当前动态的上下文 context

将decoder_x_step 和 context进行组合,得到decoder_x_step_new

将decoder_x_step_new送入nn.GRU,得到当前时间步的output, h_t

将每一步的output收集起来作为输出,将h_t作为下一个时间步的Q循环起来

将所有的output经过nn.LazyLinear 映射为(num_steps, batch_size, vocab_size),并和h_t返回

和原版本的seq2seq进行对比可知:

在原版中,我们的decoder依赖于一个固定的enc_outputs进行循环解码

在新版中,我们的decoder每次界面,都会有一个Q(第一次是来至于encoder,后续都是decoder的每一次运行过程产生的隐藏态)来计算enc_outputs的权重分数,然后根据权重分数得到一个动态的enc_outputs,这样可以让解码器每一步都关注enc_outputs中的不同的重点。

attention weight 的解释:

把Encoder Output(num_steps,1,num_hiddens)作为K,V

版权声明: 本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若内容造成侵权/违法违规/事实不符,请联系邮箱:809451989@qq.com进行投诉反馈,一经查实,立即删除!
网站建设 2025/12/13 18:45:54

深度学习python项目--垃圾图像分类识别 关键模型:VGG19DenseNet121Res...

深度学习python项目--垃圾图像分类识别 关键模型:VGG19DenseNet121ResNeXt101 包含内容:数据集ppt文档代码搞图像分类项目的时候&#xff0c;选模型总让人头大。这次垃圾识别项目我试了三个经典CNN架构&#xff1a;VGG19、DenseNet121和ResNeXt101。这三个老将放在垃圾数据集上…

作者头像 李华
网站建设 2025/12/16 23:07:36

3000亿参数AI大模型部署终极指南:4步实现低成本企业级应用

3000亿参数AI大模型部署终极指南&#xff1a;4步实现低成本企业级应用 【免费下载链接】ERNIE-4.5-300B-A47B-W4A8C8-TP4-Paddle 项目地址: https://ai.gitcode.com/hf_mirrors/baidu/ERNIE-4.5-300B-A47B-W4A8C8-TP4-Paddle 在AI大模型技术快速发展的今天&#xff0c;…

作者头像 李华
网站建设 2025/12/13 18:42:48

Manim 3D螺旋动画:从DNA到宇宙的数学可视化之旅

Manim 3D螺旋动画&#xff1a;从DNA到宇宙的数学可视化之旅 【免费下载链接】manim A community-maintained Python framework for creating mathematical animations. 项目地址: https://gitcode.com/GitHub_Trending/man/manim 想象一下&#xff0c;你能用代码编织出…

作者头像 李华
网站建设 2025/12/13 18:42:43

基于Simulink的UR5机械臂的变阻抗控制及平面力跟踪仿真

基于位置的阻抗控制&#xff0c;自适应变阻抗控制&#xff0c;平面力跟踪仿真&#xff0c;有结果图&#xff0c;simscape simulink matlab&#xff0c;机械臂采用ur5直接上干货。咱今天聊机械臂的力控制&#xff0c;拿UR5当例子&#xff0c;在Simulink里搞基于位置的阻抗控制。…

作者头像 李华
网站建设 2025/12/13 18:39:14

快速上手Codebox:开源云端IDE的终极配置指南

快速上手Codebox&#xff1a;开源云端IDE的终极配置指南 【免费下载链接】codebox Open source cloud & desktop IDE 项目地址: https://gitcode.com/gh_mirrors/co/codebox Codebox是一款功能强大的开源云端和桌面集成开发环境&#xff0c;让你在本地或云端都能享受…

作者头像 李华