大家好,又见面了,我是你们的朋友全栈君。如果您正在找激活码,请点击查看最新教程,关注关注公众号 “全栈程序员社区” 获取激活教程,可能之前旧版本教程已经失效.最新Idea2022.1教程亲测有效,一键激活。
Jetbrains全系列IDE使用 1年只要46元 售后保障 童叟无欺
代码来源 少量中文注解 纯学习https://github.com/joosthub/PyTorchNLPBook/blob/master/chapters/chapter_5/5_2_CBOW/5_2_Continuous_Bag_of_Words_CBOW.ipynb
import json import os from argparse import Namespace from tqdm import tqdm import numpy as np import pandas as pd import torch import torch.nn as nn import torch.nn.functional as F import torch.optim as optim from torch.utils.data import Dataset, DataLoader class Vocabulary(object): """构建处理文本过程中抽取可映射的词表""" def __init__(self, token_to_idx=None, mask_token="<MASK>", add_unk=True, unk_token="<UNK>"): """ :param token_to_idx:字符索引 字典 :param mask_token: 掩码字符(占位符)字符串 :param add_unk: 缺失值 布尔类型 :param unk_token: 缺失值 字符 """ if token_to_idx is None: token_to_idx = {} self._token_to_idx = token_to_idx self._idx_to_token = {idx: token for token, idx in self._token_to_idx.items()} self._add_unk = add_unk self._unk_token = unk_token self._mask_token = mask_token self.mask_index = self.add_token(self._mask_token) self.unk_index = -1 if add_unk: self.unk_index = self.add_token(unk_token) def to_serializable(self): """返回可序列化的字典""" return { 'token_to_idx': self._token_to_idx, 'add_unk': self._add_unk, 'unk_token': self._unk_token, 'mask_token': self._mask_token } @classmethod def from_serializable(cls, contents): """由序列化的字典构建Vocabulary实例""" return cls(**contents) def add_token(self, token): """ 获取字符的索引,字符不存在,则加入到2个字典中 :param token: 字符 :return: 返回字符的索引 """ if token in self._token_to_idx: index = self._token_to_idx[token] else: index = len(self._token_to_idx) self._token_to_idx[token] = index self._idx_to_token[index] = token return index def add_many(self, tokens): """添加字符列表到2个字典中""" return [self.add_token(token) for token in tokens] def lookup_token(self, token): if self.unk_index >= 0: return self._token_to_idx.get(token, self.unk_index) else: return self._token_to_idx[token] def lookup_index(self, index): if index not in self._idx_to_token: raise KeyError("the index (%d) is not in the Vocabulary" % index) return self._idx_to_token[index] def __str__(self): return "<Vocabulary(size=%d)>" % len(self) def __len__(self): return len(self._token_to_idx) class CBOWVectorizer(object): """读取数据,将文本生成词典,具有返回向量vectorize的功能""" def __init__(self, cbow_vocab): """ 加载词典 :param cbow_vocab:用于CBOW的Vocabulary词典 """ self.cbow_vocab = cbow_vocab def vectorize(self, context, vector_length=-1): """ 将文本转为数字向量 :param context: 文本 :param vector_length:是否设置向量的长度, :return: 返回数字向量 array """ indices = [self.cbow_vocab.lookup_token(token) for token in context.split(" ")] if vector_length < 0: vector_length = len(indices) out_vector = np.zeros(vector_length, dtype=np.int64) out_vector[:len(indices)] = indices out_vector[len(indices):] = self.cbow_vocab.mask_index return out_vector @classmethod def from_dataframe(cls, cbow_df): """ 由表格DataFrame生成CBOWVectorizer的实例 :param cbow_df: 表格 :return: 生成CBOWVectorizer的实例 """ cbow_vocab = Vocabulary() for index, row in cbow_df.iterrows(): for token in row.context.split(" "): cbow_vocab.add_token(token) cbow_vocab.add_token(row.target) return cls(cbow_vocab) @classmethod def from_serializable(cls, contents): cbow_vocab = Vocabulary.from_serializable(contents['cbow_vocab']) return cls(cbow_vocab=cbow_vocab) def to_serializable(self): return {'cbow_vocab': self.cbow_vocab.to_serializable()} class CBOWDataset(Dataset): def __init__(self, cbow_df, vectorizer): """ :param cbow_df: 数据表格 :param vectorizer: 向量转化器 """ self.cbow_df = cbow_df self._vectorizer = vectorizer measure_len = lambda context: len(context.split(" ")) self._max_seq_length = max(map(measure_len, cbow_df.context)) self.train_df = self.cbow_df[self.cbow_df.split == 'train'] self.train_size = len(self.train_df) self.val_df = self.cbow_df[self.cbow_df.split == 'val'] self.validation_size = len(self.val_df) self.test_df = self.cbow_df[self.cbow_df.split == 'test'] self.test_size = len(self.test_df) self._lookup_dict = { 'train': (self.train_df, self.train_size), 'val': (self.val_df, self.validation_size), 'test': (self.test_df, self.test_size) } self.set_split('train') def set_split(self, split='train'): """根据某列的类别标签选择对应的数据集""" self._target_split = split self._target_df, self._target_size = self._lookup_dict[split] @classmethod def load_dataset_and_make_vectorizer(cls, cbow_csv): """通过读取csv文件载入数据集和相应的vectorizer""" cbow_df = pd.read_csv(cbow_csv) train_cbow_df = cbow_df[cbow_df.split == 'train'] return cls(cbow_df, CBOWVectorizer.from_dataframe(train_cbow_df)) @classmethod def load_dataset_and_load_vectorizer(cls, cbow_csv, vectorizer_filepath): cbow_df = pd.read_csv(cbow_csv) vectorizer = cls.load_vectorizer_only(vectorizer_filepath) return cls(cbow_df, vectorizer) @staticmethod def load_vectorizer_only(vectorizer_filepath): with open(vectorizer_filepath) as fp: return CBOWVectorizer.from_serializable(json.load(fp)) def save_vectorizer(self, vectorizer_filepath): with open(vectorizer_filepath, 'w') as fp: json.dump(self._vectorizer.to_serializable(), fp) def get_vectorizer(self): return self._vectorizer def __len__(self): return self._target_size def __getitem__(self, index): """ pytorch 数据集的主入口 :param index: dataframe 的索引(行标),用于取行数据 :return: 字典 """ row = self._target_df.iloc[index] context_vector = self._vectorizer.vectorize(row.context, self._max_seq_length) target_index = self._vectorizer.cbow_vocab.lookup_token(row.target) return {"x_data": context_vector, "y_target": target_index} def get_num_batches(self, batch_size): return len(self) // batch_size def generate_batches(dataset, batch_size, shuffle=True, drop_last=True, device="cpu"): dataloader = DataLoader(dataset=dataset, batch_size=batch_size, shuffle=shuffle, drop_last=drop_last) for data_dict in dataloader: out_data_dict = {} for name, tensor in data_dict.items(): out_data_dict[name] = data_dict[name].to(device) yield out_data_dict class CBOWClassifier(nn.Module): def __init__(self, vocabulary_size, embedding_size, padding_idx=0): """ :param vocabulary_size: 字符集的个数 :param embedding_size: 字符集的嵌入层维度 :param padding_idx: 填充的字符索引,嵌入层为[0]*embedding_size """ super(CBOWClassifier, self).__init__() self.embedding = nn.Embedding(num_embeddings=vocabulary_size, embedding_dim=embedding_size, padding_idx=padding_idx) self.fc1 = nn.Linear(in_features=embedding_size, out_features=vocabulary_size) def forward(self, x_in, apply_softmax=False): x_embedded_sum = F.dropout(self.embedding(x_in).sum(dim=1), 0.3) # x_in是【batch,vocabulary_size,embedding_size】 y_out = self.fc1(x_embedded_sum) if apply_softmax: y_out = F.softmax(y_out, dim=1) return y_out def set_seed_everywhere(seed, cuda): np.random.seed(seed) torch.manual_seed(seed) if cuda: torch.cuda.manual_seed_all(seed) def handle_dirs(dirpath): if not os.path.exists(dirpath): os.makedirs(dirpath) def make_train_state(args): return {'stop_early': False, 'early_stopping_step': 0, 'early_stopping_best_val': 1e8, 'learning_rate': args.learning_rate, 'epoch_index': 0, 'train_loss': [], 'train_acc':[], 'val_loss': [], 'val_acc': [], 'test_loss': -1, 'test_acc': -1, 'model_filename': args.model_state_file } def update_train_state(args, model, train_state): """ :param args: 主要的参数字典 :param model: 训练模型 :param train_state: 训练的状态数据 :return: 生成新的训练状态 """ # 保存第一次训练状态 if train_state['epoch_index'] == 0: torch.save(model.state_dict(), train_state['model_filename']) train_state['stop_early'] = False # 保存提升性能时的训练状态 elif train_state['epoch_index'] >= 1: loss_tml, loss_t = train_state['val_loss'][-2:] # loss变差,增加提前终止的步数 if loss_t >= train_state['early_stopping_best_val']: train_state['early_stopping_step'] += 1 else: # 保存最佳模型,终止条件 if loss_t < train_state['early_stopping_best_val']: torch.save(model.state_dict(), train_state['model_filename']) # 重置提前终止的步数 train_state['early_stopping_step'] = 0 train_state['stop_early'] = train_state['early_stopping_step'] >= args.early_stopping_criteria return train_state def compute_accuracy(y_pred, y_target): _, y_pred_indices = y_pred.max(dim=1) n_correct = torch.eq(y_pred_indices, y_target).sum().item() return n_correct / len(y_pred_indices) * 100 if __name__ == '__main__': args = Namespace( # Data and Path information cbow_csv="data/frankenstein_with_splits.csv", vectorizer_file="vectorizer.json", model_state_file="model.pth", save_dir="model_storage/ch5/cbow", # Model hyper parameters embedding_size=50, # Training hyper parameters seed=1337, num_epochs=100, learning_rate=0.0001, batch_size=32, early_stopping_criteria=5, # Runtime options cuda=True, catch_keyboard_interrupt=True, reload_from_files=True, expand_filepaths_to_save_dir=True ) if args.expand_filepaths_to_save_dir: args.vectorizer_file = os.path.join(args.save_dir, args.vectorizer_file) args.model_state_file = os.path.join(args.save_dir, args.model_state_file) print("Expanded filepaths: ") print("\t{}".format(args.vectorizer_file)) print("\t{}".format(args.model_state_file)) if not torch.cuda.is_available(): args.cuda = False args.device = torch.device("cuda" if args.cuda else "cpu") print("Using CUDA: {}".format(args.cuda)) set_seed_everywhere(args.seed, args.cuda) handle_dirs(args.save_dir) if args.reload_from_files: print("Loading dataset and load vectorizer") dataset = CBOWDataset.load_dataset_and_load_vectorizer(args.cbow_csv, args.vectorizer_file) else: print("Loading dataset and creating vectorizer") dataset = CBOWDataset.load_dataset_and_make_vectorizer(args.cbow_csv) dataset.save_vectorizer(args.vectorizer_file) vectorizer = dataset.get_vectorizer() classifier = CBOWClassifier(vocabulary_size=len(vectorizer.cbow_vocab), embedding_size=args.embedding_size) classifier = classifier.to(args.device) loss_func = nn.CrossEntropyLoss() optimizer = optim.Adam(classifier.parameters(), lr=args.learning_rate) scheduler = optim.lr_scheduler.ReduceLROnPlateau(optimizer=optimizer,mode='min',factor=0.5, patience=1) train_state = make_train_state(args) epoch_bar = tqdm(desc="training routine", total=args.num_epochs, position=0) dataset.set_split("train") train_bar = tqdm(desc="split=train", total=dataset.get_num_batches(args.batch_size), position=1, leave=True) dataset.set_split("val") val_bar = tqdm(desc="split=val", total=dataset.get_num_batches(args.batch_size), position=2, leave=True) try: for epoch_index in range(args.num_epochs): train_state["epoch_index"] = epoch_index dataset.set_split("train") batch_generator = generate_batches(dataset, batch_size=args.batch_size, device=args.device) running_loss = 0.0 running_acc =0.0 classifier.train() for batch_index, batch_dict in enumerate(batch_generator): optimizer.zero_grad() y_pred = classifier(x_in=batch_dict['x_data']) loss = loss_func(y_pred, batch_dict['y_target']) loss_t = loss.item() running_loss +=(loss_t-running_loss)/(batch_index+1) loss.backward() optimizer.step() acc_t = compute_accuracy(y_pred, batch_dict['y_target']) running_acc +=(acc_t-running_acc)/(batch_index+1) train_bar.set_postfix(loss=running_loss, acc=running_acc, epoch=epoch_index) train_bar.update() train_state['train_loss'].append(running_loss) train_state['train_acc'].append(running_acc) dataset.set_split("val") batch_generator = generate_batches(dataset, batch_size=args.batch_size, device=args.device) running_loss = 0. running_acc = 0. classifier.eval() for batch_index, batch_dict in enumerate(batch_generator): y_pred = classifier(x_in=batch_dict['x_data']) loss = loss_func(y_pred, batch_dict['y_target']) loss_t = loss.item() running_loss +=(loss_t-running_loss)/(batch_index+1) val_bar.set_postfix(loss=running_loss, acc=running_acc, epoch=epoch_index) val_bar.update() train_state['val_loss'].append(running_loss) train_state['val_acc'].append(running_acc) train_state = update_train_state(args=args, model=classifier, train_state=train_state) scheduler.step(train_state['val_loss'][-1]) if train_state['stop_early']: break train_bar.n = 0 val_bar.n = 0 epoch_bar.update() except KeyboardInterrupt: print("Exiting loop") classifier.load_state_dict(torch.load(train_state['model_filename'])) classifier = classifier.to(args.device) loss_func = nn.CrossEntropyLoss() dataset.set_split("test") batch_generator = generate_batches(dataset, batch_size=args.batch_size, device=args.device) running_loss = 0. running_acc = 0. classifier.eval() for batch_index, batch_dict in enumerate(batch_generator): y_pred = classifier(x_in=batch_dict["x_data"]) loss = loss_func(y_pred, batch_dict["y_target"]) loss_t = loss.item() running_loss += (loss_t - running_loss)/(batch_index + 1) acc_t = compute_accuracy(y_pred, batch_dict['y_target']) running_acc +=(acc_t-running_acc)/(batch_index+1) train_state["test_loss"] = running_loss train_state["test_acc"] = running_acc print("Test loss: {};".format(train_state['test_loss'])) print("Test Accuracy: {}".format(train_state['test_acc'])) def pretty_print(results): for item in results: print("...[%.2f] - %s"% (item[1], item[0])) def get_closest(target_word, word_to_idx, embeddings, n=5): word_embedding =embeddings[word_to_idx[target_word.lower()]] distances = [] for word, index in word_to_idx.items(): if word == "<MASK>" or word == target_word: continue distances.append((word, torch.dist(word_embedding, embeddings[index]))) results = sorted(distances, key=lambda x: x[1])[1:n+2] return results word = input("Enter a word: ") embeddings = classifier.embedding.weight.data word_to_idx = vectorizer.cbow_vocab._token_to_idx pretty_print(get_closest(word, word_to_idx, embeddings, n=5)) target_words = ['frankenstein', 'monster', 'science', 'sickness', 'lonely', 'happy'] embeddings = classifier.embedding.weight.data word_to_idx = vectorizer.cbow_vocab._token_to_idx for target_word in target_words: print(f"======={target_word}=======") if target_word not in word_to_idx: print("Not in vocabulary") continue pretty_print(get_closest(target_word, word_to_idx, embeddings, n=5))
发布者:全栈程序员-用户IM,转载请注明出处:https://javaforall.cn/196991.html原文链接:https://javaforall.cn
【正版授权,激活自己账号】: Jetbrains全家桶Ide使用,1年售后保障,每天仅需1毛
【官方授权 正版激活】: 官方授权 正版激活 支持Jetbrains家族下所有IDE 使用个人JB账号...