使用多层感知机进行姓氏分类
2024-06-29
本文是我大三下学期的NLP课堂作业的整理。从CSDN这个垃圾站转载而来:
https://blog.csdn.net/zincles/article/details/140069566
以后我大概会做点更详细的教程吧。
简介
提到深度学习,感知机模型一定是绕不开的话题。作为最古老的机器学习模型之一,感知机很适合完全没接触过机器学习的小白(比如我这种)尝试入门。
感知机是最简单的神经网络形式,只包含输入层和输出层,没有隐藏层,易于理解和实现,且容易可视化:只需要几行代码就能看到决策边界和分类的效果(本文不涉及上述内容)。
虽然单个感知机的功能有限(只能解决线性可分问题),但它是构建更复杂网络(如多层感知机和深度学习模型)的基石。理解它,能为学习更高级的神经网络模型打下了基础。
本文将构建一个相当简单的感知机,并使用姓氏进行训练。我们会构建一个用于分类姓氏所属国籍的感知机,输入姓氏,输出预测国籍的概率。
准备数据集
https://wwb.lanzouq.com/ipAX123289af
请把该数据集放在和你的Jupyter笔记本同一目录下。
不想准备工程也没关系,在文章的末尾我准备了工程文件,包含了数据集以及本文所需的所有代码。你可以直接使用。
编写代码
你需要事先安装好对应的python环境。我使用的是python==3.9.19, torch==2.3.1, 请按需安装剩余的包。
准备多层感知机
首先导入该工程所需的所有依赖:
# 导入所有依赖
from argparse import Namespace
import json
import os
import numpy as np
import pandas as pd
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
from tqdm import tqdm_notebook
编写我们的多层感知机类。
在Torch中,感知机层也可以被称为线性层,其计算也已经被预定义完毕了,我们只需要调用即可。
我们在__init__()中准备了fc1、fc2两层,在forward()中将它们连接起来并连接到输出。
class SurnameClassifier(nn.Module):
"""用于分类姓氏的2层多层感知机"""
def __init__(self, input_dim, hidden_dim, output_dim):
"""
参数:
input_dim (int): 输入向量的大小
hidden_dim (int): 第一个线性层的输出大小
output_dim (int): 第二个线性层的输出大小
"""
super(SurnameClassifier, self).__init__()
self.fc1 = nn.Linear(input_dim, hidden_dim)
self.fc2 = nn.Linear(hidden_dim, output_dim)
def forward(self, x_in, apply_softmax=False):
"""分类器的前向传播
参数:
x_in (torch.Tensor): 输入数据张量。
x_in的形状应该是(batch, input_dim)
apply_softmax (bool): softmax激活的标志
如果与交叉熵损失一起使用,则应为false
返回:
结果张量。张量的形状应该是(batch, output_dim)
"""
intermediate_vector = F.relu(self.fc1(x_in))
prediction_vector = self.fc2(intermediate_vector)
if apply_softmax:
prediction_vector = F.softmax(prediction_vector, dim=1)
return prediction_vector
准备文本类
接下来,我们需要准备处理文本所需要的类。
首先是准备词汇表:
class Vocabulary(object):
"""用于处理文本数据的词汇表类"""
def __init__(self, token_to_idx=None, add_unk=True, unk_token="<UNK>"):
"""
token_to_idx (dict): 一个预先存在的将标记映射到索引的字典
add_unk (bool): 一个指示是否添加UNK标记的标志
unk_token (str): 要添加到词汇表中的UNK标记
"""
if token_to_idx is None:
token_to_idx = {}
self._token_to_idx = token_to_idx
self._idx_to_token = {idx: token for token, idx in self._token_to_idx.items()}
self._add_unk = add_unk
self._unk_token = unk_token
self.unk_index = -1
if add_unk:
self.unk_index = self.add_token(unk_token)
def to_serializable(self):
"""返回可序列化的字典"""
return {"token_to_idx": self._token_to_idx, "add_unk": self._add_unk, "unk_token": self._unk_token}
@classmethod
def from_serializable(cls, contents):
"""从序列化的字典实例化Vocabulary"""
return cls(**contents)
def add_token(self, token):
"""
基于标记更新映射字典
参数: token (str): 要添加到词汇表中的项
返回: index (int): 与标记对应的整数
"""
try:
index = self._token_to_idx[token]
except KeyError:
index = len(self._token_to_idx)
self._token_to_idx[token] = index
self._idx_to_token[index] = token
return index
def add_many(self, tokens):
"""
将一个标记列表添加到词汇表中
参数: tokens (list): 一个字符串标记列表
返回: indices (list): 与标记对应的索引列表
"""
return [self.add_token(token) for token in tokens]
def lookup_token(self, token):
"""检索与标记相关联的索引,如果标记不存在,则返回UNK索引
参数: token (str): 要查找的标记
返回: index (int): 与标记对应的索引
注意: unk_index 必须 >=0(已添加到词汇表中)才能使用UNK功能
"""
if self.unk_index >= 0:
return self._token_to_idx.get(token, self.unk_index)
else:
return self._token_to_idx[token]
def lookup_index(self, index):
"""返回与索引相关联的标记
参数: index (int): 要查找的索引
返回: token (str): 与索引对应的标记
异常: KeyError: 如果索引不在词汇表中
"""
if index not in self._idx_to_token:
raise KeyError("索引(%d)不在词汇表中" % index)
return self._idx_to_token[index]
def __str__(self):
return "<词汇表(大小=%d)>" % len(self)
def __len__(self):
return len(self._token_to_idx)
然后是准备词向量工具。
词需要被转化为词向量才能用于我们的模型。因此这里是我们的词向量类:
class SurnameVectorizer(object):
"""向量化器"""
def __init__(self, surname_vocab, nationality_vocab):
"""参数:
surname_vocab (Vocabulary): 将字符映射到整数
nationality_vocab (Vocabulary): 将国籍映射到整数
"""
self.surname_vocab = surname_vocab
self.nationality_vocab = nationality_vocab
def vectorize(self, surname):
"""
参数: surname (str): 姓氏
返回: one_hot (np.ndarray): 折叠后的独热编码
"""
vocab = self.surname_vocab
one_hot = np.zeros(len(vocab), dtype=np.float32)
for token in surname:
one_hot[vocab.lookup_token(token)] = 1
return one_hot
@classmethod
def from_dataframe(cls, surname_df):
"""从数据集的dataframe实例化向量化器
参数: surname_df (pandas.DataFrame): 姓氏数据集
返回: SurnameVectorizer的一个实例
"""
surname_vocab = Vocabulary(unk_token="@")
nationality_vocab = Vocabulary(add_unk=False)
for index, row in surname_df.iterrows():
for letter in row.surname:
surname_vocab.add_token(letter)
nationality_vocab.add_token(row.nationality)
return cls(surname_vocab, nationality_vocab)
@classmethod
def from_serializable(cls, contents):
surname_vocab = Vocabulary.from_serializable(contents["surname_vocab"])
nationality_vocab = Vocabulary.from_serializable(contents["nationality_vocab"])
return cls(surname_vocab=surname_vocab, nationality_vocab=nationality_vocab)
def to_serializable(self):
return {
"surname_vocab": self.surname_vocab.to_serializable(),
"nationality_vocab": self.nationality_vocab.to_serializable(),
}
准备数据集类
准备好了模型和词向量相关类,接下来我们需要想办法导入我们先前准备的数据集,供torch使用。
class SurnameDataset(Dataset):
def __init__(self, surname_df, vectorizer):
"""
参数:
surname_df (pandas.DataFrame): 数据集
vectorizer (SurnameVectorizer): 从数据集实例化的向量化器
"""
self.surname_df = surname_df
self._vectorizer = vectorizer
self.train_df = self.surname_df[self.surname_df.split == "train"]
self.train_size = len(self.train_df)
self.val_df = self.surname_df[self.surname_df.split == "val"]
self.validation_size = len(self.val_df)
self.test_df = self.surname_df[self.surname_df.split == "test"]
self.test_size = len(self.test_df)
self._lookup_dict = {
"train": (self.train_df, self.train_size),
"val": (self.val_df, self.validation_size),
"test": (self.test_df, self.test_size),
}
self.set_split("train")
# 类别权重
class_counts = surname_df.nationality.value_counts().to_dict()
def sort_key(item):
return self._vectorizer.nationality_vocab.lookup_token(item[0])
sorted_counts = sorted(class_counts.items(), key=sort_key)
frequencies = [count for _, count in sorted_counts]
self.class_weights = 1.0 / torch.tensor(frequencies, dtype=torch.float32)
@classmethod
def load_dataset_and_make_vectorizer(cls, surname_csv):
"""加载数据集并从头创建一个新的向量化器
参数: surname_csv (str): 数据集的位置
返回: SurnameDataset的一个实例
"""
surname_df = pd.read_csv(surname_csv)
train_surname_df = surname_df[surname_df.split == "train"]
return cls(surname_df, SurnameVectorizer.from_dataframe(train_surname_df))
@classmethod
def load_dataset_and_load_vectorizer(cls, surname_csv, vectorizer_filepath):
"""加载数据集和相应的向量化器。
在向量化器已经被缓存以便重用的情况下使用
参数:
surname_csv (str): 数据集的位置
vectorizer_filepath (str): 保存的向量化器的位置
返回:
SurnameDataset的一个实例
"""
surname_df = pd.read_csv(surname_csv)
vectorizer = cls.load_vectorizer_only(vectorizer_filepath)
return cls(surname_df, vectorizer)
@staticmethod
def load_vectorizer_only(vectorizer_filepath):
"""从文件加载向量化器的静态方法
参数: vectorizer_filepath (str): 序列化向量化器的位置
返回: SurnameVectorizer的一个实例
"""
with open(vectorizer_filepath) as fp:
return SurnameVectorizer.from_serializable(json.load(fp))
def save_vectorizer(self, vectorizer_filepath):
"""使用json将向量化器保存到磁盘
参数: vectorizer_filepath (str): 保存向量化器的位置
"""
with open(vectorizer_filepath, "w") as fp:
json.dump(self._vectorizer.to_serializable(), fp)
def get_vectorizer(self):
"""返回向量化器"""
return self._vectorizer
def set_split(self, split="train"):
"""使用dataframe中的一列选择数据集的分割"""
self._target_split = split
self._target_df, self._target_size = self._lookup_dict[split]
def __len__(self):
return self._target_size
def __getitem__(self, index):
"""PyTorch数据集的主要入口方法
参数: index (int): 数据点的索引
返回:
一个包含数据点的字典:{
features (x_surname)
label (y_nationality)
}
"""
row = self._target_df.iloc[index]
surname_vector = self._vectorizer.vectorize(row.surname)
nationality_index = self._vectorizer.nationality_vocab.lookup_token(row.nationality)
return {"x_surname": surname_vector, "y_nationality": nationality_index}
def get_num_batches(self, batch_size):
"""给定一个批量大小,返回数据集中的批量数量
参数: batch_size (int)
返回: 数据集中的批量数量
"""
return len(self) // batch_size
def generate_batches(dataset, batch_size, shuffle=True, drop_last=True, device="cpu"):
"""
封装了PyTorch DataLoader的生成器函数。能确保每个张量都在正确的设备位置上。
"""
dataloader = DataLoader(dataset=dataset, batch_size=batch_size, shuffle=shuffle, drop_last=drop_last)
for data_dict in dataloader:
out_data_dict = {}
for name, tensor in data_dict.items():
out_data_dict[name] = data_dict[name].to(device)
yield out_data_dict
准备训练
定义一些超参数、属性。
如果你没有GPU,请将cuda=True属性改为cuda=False。
args = Namespace(
# 数据、路径和保存参数
surname_csv="data/surnames/surnames_with_splits.csv",
vectorizer_file="vectorizer.json",
model_state_file="model.pth",
save_dir="model_storage/ch4/surname_mlp",
# 模型超参数
hidden_dim=300,
# 训练阶段超参数
seed=1337,
num_epochs=100,
early_stopping_criteria=5,
learning_rate=0.001,
batch_size=256,
# 运行选项
cuda=True,
reload_from_files=False,
expand_filepaths_to_save_dir=True,
)
if args.expand_filepaths_to_save_dir:
args.vectorizer_file = os.path.join(args.save_dir, args.vectorizer_file)
args.model_state_file = os.path.join(args.save_dir, args.model_state_file)
print("Expanded filepaths: ")
print("\t{}".format(args.vectorizer_file))
print("\t{}".format(args.model_state_file))
# 检查CUDA是否可用
if not torch.cuda.is_available():
args.cuda = False
args.device = torch.device("cuda" if args.cuda else "cpu")
print("Using CUDA: {}".format(args.cuda))
set_seed_everywhere(args.seed, args.cuda) # 设置种子,确保实验的可重复性
handle_dirs(args.save_dir)
# 如果可以的话,尝试加载已经训练好的模型,而不是从头训练
if args.reload_from_files:
# 自检查点训练
print("Reloading!")
dataset = SurnameDataset.load_dataset_and_load_vectorizer(args.surname_csv, args.vectorizer_file)
else:
# 创建数据集和向量化器
print("Creating fresh!")
dataset = SurnameDataset.load_dataset_and_make_vectorizer(args.surname_csv)
dataset.save_vectorizer(args.vectorizer_file)
vectorizer = dataset.get_vectorizer()
classifier = SurnameClassifier(
input_dim=len(vectorizer.surname_vocab), hidden_dim=args.hidden_dim, output_dim=len(vectorizer.nationality_vocab)
)
训练
万事俱备,接下来,训练我们的模型。
定义分类器、权重、损失函数、优化器、调度器,分割训练集测试集,在每个迭代进行训练并计算损失。
classifier = classifier.to(args.device)
dataset.class_weights = dataset.class_weights.to(args.device)
loss_func = nn.CrossEntropyLoss(dataset.class_weights)
optimizer = optim.Adam(classifier.parameters(), lr=args.learning_rate)
scheduler = optim.lr_scheduler.ReduceLROnPlateau(optimizer=optimizer, mode="min", factor=0.5, patience=1)
train_state = make_train_state(args)
epoch_bar = tqdm_notebook(desc="training routine", total=args.num_epochs, position=0)
dataset.set_split("train")
train_bar = tqdm_notebook(desc="split=train", total=dataset.get_num_batches(args.batch_size), position=1, leave=True)
dataset.set_split("val")
val_bar = tqdm_notebook(desc="split=val", total=dataset.get_num_batches(args.batch_size), position=1, leave=True)
try:
for epoch_index in range(args.num_epochs):
train_state["epoch_index"] = epoch_index
# 遍历训练数据集
# 设置批量生成器,将损失和准确率设置为0,开启训练模式
dataset.set_split("train")
batch_generator = generate_batches(dataset, batch_size=args.batch_size, device=args.device)
running_loss = 0.0
running_acc = 0.0
classifier.train()
# 训练
for batch_index, batch_dict in enumerate(batch_generator):
# 步骤1. 清零梯度
optimizer.zero_grad()
# 步骤2. 计算输出
y_pred = classifier(batch_dict["x_surname"])
# 步骤3. 计算损失
loss = loss_func(y_pred, batch_dict["y_nationality"])
loss_t = loss.item()
running_loss += (loss_t - running_loss) / (batch_index + 1)
# 步骤4. 使用损失计算梯度
loss.backward()
# 步骤5. 使用优化器进行梯度步骤
optimizer.step()
# -----------------------------------------
# 计算准确率
acc_t = compute_accuracy(y_pred, batch_dict["y_nationality"])
running_acc += (acc_t - running_acc) / (batch_index + 1)
# 更新进度条
train_bar.set_postfix(loss=running_loss, acc=running_acc, epoch=epoch_index)
train_bar.update()
train_state["train_loss"].append(running_loss)
train_state["train_acc"].append(running_acc)
# 遍历验证数据集
# 设置:批量生成器,将损失和准确率设置为0;开启评估模式
dataset.set_split("val")
batch_generator = generate_batches(dataset, batch_size=args.batch_size, device=args.device)
running_loss = 0.0
running_acc = 0.0
classifier.eval()
for batch_index, batch_dict in enumerate(batch_generator):
# 计算输出
y_pred = classifier(batch_dict["x_surname"])
# 步骤3. 计算损失
loss = loss_func(y_pred, batch_dict["y_nationality"])
loss_t = loss.to("cpu").item()
running_loss += (loss_t - running_loss) / (batch_index + 1)
# 计算准确率
acc_t = compute_accuracy(y_pred, batch_dict["y_nationality"])
running_acc += (acc_t - running_acc) / (batch_index + 1)
val_bar.set_postfix(loss=running_loss, acc=running_acc, epoch=epoch_index)
val_bar.update()
train_state["val_loss"].append(running_loss)
train_state["val_acc"].append(running_acc)
train_state = update_train_state(args=args, model=classifier, train_state=train_state)
scheduler.step(train_state["val_loss"][-1])
if train_state["stop_early"]:
break
train_bar.n = 0
val_bar.n = 0
epoch_bar.update()
except KeyboardInterrupt:
print("Exiting loop")
保存模型、计算精度
classifier.load_state_dict(torch.load(train_state["model_filename"]))
classifier = classifier.to(args.device)
dataset.class_weights = dataset.class_weights.to(args.device)
loss_func = nn.CrossEntropyLoss(dataset.class_weights)
dataset.set_split("test")
batch_generator = generate_batches(dataset, batch_size=args.batch_size, device=args.device)
running_loss = 0.0
running_acc = 0.0
classifier.eval()
for batch_index, batch_dict in enumerate(batch_generator):
# 计算输出
y_pred = classifier(batch_dict["x_surname"])
# 计算损失
loss = loss_func(y_pred, batch_dict["y_nationality"])
loss_t = loss.item()
running_loss += (loss_t - running_loss) / (batch_index + 1)
# 计算准确率
acc_t = compute_accuracy(y_pred, batch_dict["y_nationality"])
running_acc += (acc_t - running_acc) / (batch_index + 1)
train_state["test_loss"] = running_loss
train_state["test_acc"] = running_acc
print("Test loss: {};".format(train_state['test_loss']))
print("Test Accuracy: {}".format(train_state['test_acc']))
检验效果
我们可以尝试给某个具体的姓氏进行分类。把姓氏输入模型,使用文本类解码,并检查概率。
def predict_topk_nationality(name, classifier, vectorizer, k=5):
vectorized_name = vectorizer.vectorize(name)
vectorized_name = torch.tensor(vectorized_name).view(1, -1)
prediction_vector = classifier(vectorized_name, apply_softmax=True)
probability_values, indices = torch.topk(prediction_vector, k=k)
# returned size is 1,k
probability_values = probability_values.detach().numpy()[0]
indices = indices.detach().numpy()[0]
results = []
for prob_value, index in zip(probability_values, indices):
nationality = vectorizer.nationality_vocab.lookup_index(index)
results.append({"nationality": nationality, "probability": prob_value})
return results
new_surname = "Linus"
classifier = classifier.to("cpu")
k = 10
if k > len(vectorizer.nationality_vocab):
print("Sorry! That's more than the # of nationalities we have.. defaulting you to max size :)")
k = len(vectorizer.nationality_vocab)
predictions = predict_topk_nationality(new_surname, classifier, vectorizer, k=k)
print("Top {} predictions:".format(k))
print("===================")
for prediction in predictions:
print("{} -> {} (p={:0.5f})".format(new_surname, prediction["nationality"], prediction["probability"]))
工程文件
如果你懒得阅读上述内容,这里有该项目的原始工程文件:
https://wwb.lanzouq.com/iua9D232cayj
多层感知机很适合初学者入门,但性能有限,实际使用中,还是得考虑更加复杂的模型。
对于本次的分类姓氏任务来说,实际测试里精度只有40%左右。也许后续也许可以考虑更复杂的模型。