在很早的一篇博文中 在 keras 模型中使用预训练的词向量 , 我们介绍了如何使用 Keras 实现一个卷积神经网络的文本分类模型. 基于 CNN 的文本分类模型简直是新手学习一个深度学习框架的绝佳练手项目, 所以, 这篇博文我们将介绍一个基于 tensorflow 的卷积神经网络的文本分类模型, 记录一下自己在学习 tensorflow 时候的爬坑过程.
模型简介
模型架构如图所示:
从左向右看, 第一层是词嵌入层, 主要将输入文本词映射到一个低维空间中; 第二层是卷积层, 通过不同宽度的卷积核在词矩阵上进行卷积操作,, 例如卷积核的大小为 3,4,5, 以此捕获 n-gram 语义信息; 第三层是池化层, 我们对卷积后的向量进行最大池化操作并将向量拼接在一起, 捕获句子中最明显的特征; 最后, 我们将向量送入全连接层中, 进行最终的分类.
我们的模型大致就是按照上述架构进行搭建的, 使用预训练的词向量(glove.6B.100), 你也可以使用其它流行的预训练词向量, 如 elmo 或者基于 BERT 的预训练词向量.
卷积神经网络的文本分类模型是基于 tensorflow 实现的, 主要分下三个部分: 数据预处理, 模型模块, 模块训练.
数据预处理
$\color{red}{炼丹的道路千万条, 数据预处理第一条}$. 我们首先来看如何对数据进行处理
第一步是加载数据, 加载完数据, 我们对数据进行构建一个字典, 为后续处理提供基础:
- def char_mapping(sentences,lower):
- #获取词的列表
- chars = [[x.lower() if lower() else x for x in s.split()] for s in sentence]
- dico = cretae_dico(chars)
- char_to_id, id_to_char = create_mapping(dico)
- print("Found %i unique words (%i in total)" %(len(dico),sum(len(x) for x in chats)
在词典和预训练词向量之间建立起映射, 获得预训练的词向量矩阵:
- def load_word2vec(emb_path, id_to_word, word_dim):
- """
- Load word embedding from pre-trained file
- embedding size must match
- """
- new_weights = np.zeors(len(id_to_word),word_dim)
- print('Loading pretrained embeddings from {}...'.format(emb_path))
- pre_trained = {}
- emb_invalid = 0
- for i, line in enumerate(codecs.open(emb_path, 'r', 'utf-8')):
- line = line.rstrip().split()
- if len(line) == word_dim + 1:
- pre_trained[line[0]] = np.array(
- [float(x) for x in line[1:]]
- ).astype(np.float32)
- else:
- emb_invalid += 1
- if emb_invalid> 0:
- print('WARNING: %i invalid lines' % emb_invalid)
- c_found = 0
- c_lower = 0
- c_zeros = 0
- n_words = len(id_to_word)
- # Lookup table initialization
- for i in range(n_words):
- Word = id_to_word[i]
- if(i==0):
- new_wwights[i+1] = np.random.uniform(-0.25,0.25,word_dim)
- if Word in pre_trained:
- new_weights[i] = pre_trained[Word]
- c_found += 1
- elif Word.lower() in pre_trained:
- new_weights[i] = pre_trained[Word.lower()]
- c_lower += 1
- elif re.sub('\d', '0', Word.lower()) in pre_trained: #replace numbers to zero
- new_weights[i] = pre_trained[
- re.sub('\d', '0', Word.lower())
- ]
- c_zeros += 1
- print('Loaded %i pretrained embeddings.' % len(pre_trained))
- print('%i / %i (%.4f%%) words have been initialized with'
- 'pretrained embeddings.' % (
- c_found + c_lower + c_zeros, n_words,
- 100. * (c_found + c_lower + c_zeros) / n_words)
- )
- print('%i found directly, %i after lowercasing,'
- '%i after lowercasing + zero.' % (
- c_found, c_lower, c_zeros
- ))
- return new_weights
然后是文本数据映射成 id, 数据补齐以及迭代生成批次数据送入到模型中:
- def pad_data(data,length):
- chars = []
- max_length = length
- for line in data:
- if(len(line)<max_length):
- padding = [0]*(max_length-len(line))
- chars.append(line+padding)
- else:
- chars.append(line[0:length])
- return chars
- def batch_iter(data, batch_size, num_epochs, shuffle=True):
- """
- 生成一个批次的数据.
- """
- data = np.array(data)
- data_size = len(data)
- num_batches_per_epoch = int((len(data)-1)/batch_size) + 1
- for epoch in range(num_epochs):
- # 将每一轮的数据打乱
- if shuffle:
- shuffle_indices = np.random.permutation(np.arange(data_size))
- shuffled_data = data[shuffle_indices]
- else:
- shuffled_data = data
- for batch_num in range(num_batches_per_epoch):
- start_index = batch_num * batch_size
- end_index = min((batch_num + 1) * batch_size, data_size)
- yield shuffled_data[start_index:end_index]
模型搭建
- class TextCNN(object):
- """
- 文本分类模型
- 词嵌入层, 卷积层, 池化层, 分类层
- """
- def __init__(
- self, embeddings,sequence_length, num_classes, vocab_size,
- embedding_size, filter_sizes, num_filters, l2_reg_lambda=0.0):
- # 占位符, 输入, 输出, dropout
- self.input_x = tf.placeholder(tf.int32, [None, sequence_length], name="input_x")
- self.input_y = tf.placeholder(tf.float32, [None, num_classes], name="input_y")
- self.dropout_keep_prob = tf.placeholder(tf.float32, name="dropout_keep_prob")
- # Keeping track of l2 regularization loss (optional)
- l2_loss = tf.constant(0.0)
- # 词向量层 随机初始化
- ''' with tf.device('/cpu:0'), tf.name_scope("embedding"):
- self.W = tf.Variable(
- tf.random_uniform([vocab_size, embedding_size], -1.0, 1.0),
- name="W")
- self.embedded_chars = tf.nn.embedding_lookup(self.W, self.input_x)
- self.embedded_chars_expanded = tf.expand_dims(self.embedded_chars, -1)
- '''
- #使用预训练的词向量
- with tf.device('/cpu:0'), tf.name_scope("embedding"):
- self.W = tf.Variable(
- shape = [vocab_size, embedding_size],
- embeddings,
- name="W")
- #查表
- self.embedded_chars = tf.nn.embedding_lookup(self.W, self.input_x)
- pooled_outputs = []
- for kernel_size in filter_sizes:
- with.tf.variable_scope("CNN-max-pooling-%s" % kernel_size):
- conv = tf.layer.conv1d(self.embedd_chars,num_filters,kernel_size,name='conv')
- gmp = tf.reduce_max(conv,reduction_indices=[1],name='gmp')
- pooled_outputs.append(gmp)
- self.h_pool = tf.concat(pooled_outputs,1)
- with tf.name_scope("score"):
- # 全连接层, 后面接 dropout 以及 relu 激活
- fc = tf.layers.dense(gmp, self.config.hidden_dim, name='fc1')
- fc = tf.contrib.layers.dropout(fc, self.keep_prob)
- fc = tf.nn.relu(fc)
- # 分类器
- self.logits = tf.layers.dense(fc, self.config.num_classes, name='fc2')
- self.y_pred_cls = tf.argmax(tf.nn.softmax(self.logits), 1) # 预测类别
- with tf.name_scope("optimize"):
- # 损失函数, 交叉熵
- cross_entropy = tf.nn.softmax_cross_entropy_with_logits(logits=self.logits, labels=self.input_y)
- self.loss = tf.reduce_mean(cross_entropy)
- # 优化器
- self.optim = tf.train.AdamOptimizer(learning_rate=self.config.learning_rate).minimize(self.loss)
- with tf.name_scope("accuracy"):
- # 准确率
- correct_pred = tf.equal(tf.argmax(self.input_y, 1), self.y_pred_cls)
- self.acc = tf.reduce_mean(tf.cast(correct_pred, tf.float32))
在模型搭建过程中, 文本经过词嵌入层获得每个词的 embedding 维度, 即[bacth_size,seq_length,embedding_dim], 如果要想像图像一样在两个维度上进行卷积操作, 我们需要对维度进行转换, 即再增加一维. 这里, 我们只使用一维卷积, 然后使用最大池化, 获取全局最大的特征.
模型训练
- # Parameters
- # ==================================================
- def preprocess():
- # Data Preparation
- # ==================================================
- '''
- # Load data
- print("Loading data...")
- x_text, y = data_helpers.load_data_and_labels(FLAGS.positive_data_file, FLAGS.negative_data_file)
- def train(x_train, y_train, vocab_processor, x_dev, y_dev,pre_weights):
- # 训练
- # ==================================================
- with tf.Graph().as_default():
- session_conf = tf.ConfigProto(
- allow_soft_placement=FLAGS.allow_soft_placement,
- log_device_placement=FLAGS.log_device_placement)
- sess = tf.Session(config=session_conf)
- with sess.as_default():
- cnn = TextCNN(
- embeddings = pre_weights,
- sequence_length=x_train.shape[1],
- num_classes=y_train.shape[1],
- vocab_size=len(vocab_processor.vocabulary_),
- embedding_size=FLAGS.embedding_dim,
- filter_sizes=list(map(int, FLAGS.filter_sizes.split(","))),
- num_filters=FLAGS.num_filters,
- l2_reg_lambda=FLAGS.l2_reg_lambda)
- # 训练步数
- global_step = tf.Variable(0, name="global_step", trainable=False)
- optimizer = tf.train.AdamOptimizer(1e-3)
- grads_and_vars = optimizer.compute_gradients(cnn.loss)
- train_op = optimizer.apply_gradients(grads_and_vars, global_step=global_step)
- # Keep track of gradient values and sparsity (optional)
- grad_summaries = []
- for g, v in grads_and_vars:
- if g is not None:
- grad_hist_summary = tf.summary.histogram("{}/grad/hist".format(v.name), g)
- sparsity_summary = tf.summary.scalar("{}/grad/sparsity".format(v.name), tf.nn.zero_fraction(g))
- grad_summaries.append(grad_hist_summary)
- grad_summaries.append(sparsity_summary)
- grad_summaries_merged = tf.summary.merge(grad_summaries)
- # 模型的保存目录
- timestamp = str(int(time.time()))
- out_dir = os.path.abspath(os.path.join(os.path.curdir, "runs", timestamp))
- print("Writing to {}\n".format(out_dir))
- # 损失和准确率的标量
- loss_summary = tf.summary.scalar("loss", cnn.loss)
- acc_summary = tf.summary.scalar("accuracy", cnn.accuracy)
- # Train Summaries
- train_summary_op = tf.summary.merge([loss_summary, acc_summary, grad_summaries_merged])
- train_summary_dir = os.path.join(out_dir, "summaries", "train")
- train_summary_writer = tf.summary.FileWriter(train_summary_dir, sess.graph)
- # Dev summaries
- dev_summary_op = tf.summary.merge([loss_summary, acc_summary])
- dev_summary_dir = os.path.join(out_dir, "summaries", "dev")
- dev_summary_writer = tf.summary.FileWriter(dev_summary_dir, sess.graph)
- # Checkpoint directory. Tensorflow assumes this directory already exists so we need to create it
- checkpoint_dir = os.path.abspath(os.path.join(out_dir, "checkpoints"))
- checkpoint_prefix = os.path.join(checkpoint_dir, "model")
- if not os.path.exists(checkpoint_dir):
- os.makedirs(checkpoint_dir)
- saver = tf.train.Saver(tf.global_variables(), max_to_keep=FLAGS.num_checkpoints)
- # 字典保存
- #vocab_processor.save(os.path.join(out_dir, "vocab"))
- # Initialize all variables
- sess.run(tf.global_variables_initializer())
- def train_step(x_batch, y_batch):
- """
训练过程的一步
- """
- feed_dict = {
- cnn.input_x: x_batch,
- cnn.input_y: y_batch,
- cnn.dropout_keep_prob: FLAGS.dropout_keep_prob
- }
- #传入变量, 得到变量的返回值
- _, step, summaries, loss, accuracy = sess.run(
- [train_op, global_step, train_summary_op, cnn.loss, cnn.accuracy],
- feed_dict)
- time_str = datetime.datetime.now().isoformat()
- print("{}: step {}, loss {:g}, acc {:g}".format(time_str, step, loss, accuracy))
- train_summary_writer.add_summary(summaries, step)
- def dev_step(x_batch, y_batch, writer=None):
- """
- Evaluates model on a dev set
- """
- feed_dict = {
- cnn.input_x: x_batch,
- cnn.input_y: y_batch,
- cnn.dropout_keep_prob: 1.0
- }
- step, summaries, loss, accuracy = sess.run(
- [global_step, dev_summary_op, cnn.loss, cnn.accuracy],
- feed_dict)
- time_str = datetime.datetime.now().isoformat()
- print("{}: step {}, loss {:g}, acc {:g}".format(time_str, step, loss, accuracy))
- if writer:
- writer.add_summary(summaries, step)
- # Generate batches
- batches = data_helpers.batch_iter(
- list(zip(x_train, y_train)), FLAGS.batch_size, FLAGS.num_epochs)
- # Training loop. For each batch...
- for batch in batches:
- x_batch, y_batch = zip(*batch)
- train_step(x_batch, y_batch)
- current_step = tf.train.global_step(sess, global_step)
- if current_step % FLAGS.evaluate_every == 0:
- print("\nEvaluation:")
- dev_step(x_dev, y_dev, writer=dev_summary_writer)
- print("")
- if current_step % FLAGS.checkpoint_every == 0:
- path = saver.save(sess, checkpoint_prefix, global_step=current_step)
- print("Saved model checkpoint to {}\n".format(path))
- def main(argv=None):
- #x_train, y_train, vocab_processor, x_dev, y_dev,pre_weights = preprocess()
- train(x_train, y_train, vocab_processor, x_dev, y_dev,pre_weights)
- if __name__ == '__main__':
- tf.App.run()
tensorflow 的训练过程, 没有同 keras 一样简单容易操作, 写起来相当复杂, 对初学者不友好. 我们要将数据处理好, 一批一批的送入到定义好的图中, 让图运行起来. 需要注意的是, tensorflow 中步数 (global step) 是一个计数器, 每训练一个 batch_size 的数据, 步数就加一 , 这样训练一轮 (epoch) 会有多个步数, 是通过计算得来的. 另外, 函数 sess.run()中不仅传入数据, 还传入运行过程中需要返回查看的一些值, 如 accuracy,loss 等.
至此, 基于 tensorflow 的 cnn 文本分类模型介绍结束. 模型不难, 初学者容易看懂, 通过这个学会 tensorflow 的基本操作, 基础的炼丹应该不难了, 在学一些热腾腾的模型, 以后可以快乐的炼丹了.
现在 BERT 横扫各大比赛榜单, 各种模型都预先使用 BERT 来提取特征, 不会使用 BERT, 都不好意思说自己是搞深度学习的. 所以, 这里我就稍微介绍一下如何 Tensorflow 来加载 BERT 模型.
- # 初始化 BERT
- model = modeling.BertModel(
- config=bert_config,
- is_training=False,
- input_ids=input_ids,
- input_mask=input_mask,
- token_type_ids=segment_ids,
- use_one_hot_embeddings=False)
- # 加载 BERT
- tvars = tf.trainable_variables()
- (assignment, initialized_variable_names) = modeling.get_assignment_map_from_checkpoint(tvars, init_check_point)
- tf.train.init_from_checkpoint(init_checkpoint, assignment)
- # 获得 BERT 的输出[b,s,e]
- encoder_last_layer = model.get_sequence_output()
获得 BERT 的输出后, 就可以愉快的将 BERT 输出送入到后续网络层中, 进行各种风骚的操作了.
参考
BERT 的简单使用
来源: http://www.tuicool.com/articles/Er2I3iz