本文所使用的开源数据集(kaggle 猫狗大战):
https://www.kaggle.com/c/dogs-vs-cats
国内百度网盘下载地址:
https://pan.baidu.com/s/12ab32UNYI_6o4i9HyX-n8Q
利用本文代码训练并生成的模型(对应项目中的 model 文件夹):
https://pan.baidu.com/s/1tBkVQKoH2Fc_HsBxtdAxnQ
简单介绍:
(需要预先安装 pip install opencv-python, pip install flask, pip install tensorflow/pip install tensorflow-gpu) 本文使用 Python3,TensorFlow 实现适合新手的 VGG16 模型 (不了解 VGG16 的同学可以自行百度一下, 本文没有使用 slim 或者 keras 实现, 对 VGG16 逐层实现, 便于新手理解, 有经验的同学可以用高级库重写这部分) 可应用于单标签分类 (一张图片要么是猫, 要么是狗) 任务.
预告: 之后会写一篇多标签分类任务, 与单标签分类有些区别 juejin.im/post/5c073b...
整体训练逻辑:
0, 使用 pipeline 方式异步读取训练集图片, 节省内存消耗, 提高效率
1, 将图像传入到 CNN(VGG16)中提取特征
2, 将特征图拉伸输入到 FC layer 中得出分类预测向量
3, 通过 softmax 交叉熵函数对预测向量和标签向量进行训练, 得出最终模型
整体预测逻辑:
1, 将图像传入到 CNN(VGG16)中提取特征
2, 将特征图拉伸输入到 FC layer 中得出分类预测向量
3, 将预测向量做 softmax 操作, 取向量中的最大值, 并映射到对应类别中
制作成 Web 服务:
利用 flask 框架将整个项目启动成 Web 服务, 使得项目支持 http 方式调用
启动服务后调用以下地址测试:
后续优化逻辑:
可以采用迁移学习, 模型融合等方案进一步提高 acc
可以左右翻转图片, 将训练集翻倍
运行命令:
对数据集进行训练: python http://DogVsCat.py train
对新的图片进行测试: python http://DogVsCat.py test
启动成 http 服务: python http://DogVsCat.py start
项目整体目录结构:
model 结构:
训练过程:
整体代码如下:
- # coding:utf-8
- import tensorflow as tf
- import os, sys, random
- import numpy as np
- import cv2
- from flask import request
- from flask import Flask
- import JSON
- App = Flask(__name__)
- class DogVsCat:
- def __init__(self):
- # 可调参数
- self.save_epoch = 1 # 每相隔多少个 epoch 保存一次模型
- self.train_max_num = 25000 # 训练时读取的最大图片数目 0~25000 之间, 内存不足的可以调小
- self.epoch_max = 13 # 最大迭代 epoch 次数
- self.batch_size = 16 # 训练时每个批次参与训练的图像数目, 显存不足的可以调小
- self.class_num = 2 # 分类数目, 猫狗共两类
- self.val_num = 20 * self.batch_size # 不能大于 self.train_max_num 做验证集用
- self.lr = 1e-4 # 初始学习率
- # 无需修改参数
- self.x_val = []
- self.y_val = []
- self.x = None # 每批次的图像数据
- self.y = None # 每批次的 one-hot 标签
- self.learning_rate = None # 学习率
- self.sess = None # 持久化的 tf.session
- self.pred = None # cnn 网络结构的预测
- self.keep_drop = tf.placeholder(tf.float32) # dropout 比例
- def dogOrCat(self, img_path):
- """
- 猫狗分类
- :param img_path:
- :return:
- """
- im = cv2.imread(img_path)
- im = cv2.resize(im, (224, 224))
- im = [im]
- im = np.array(im, dtype=np.float32)
- im -= 147
- output = self.sess.run(self.output, feed_dict={self.x: im, self.keep_drop: 1.})
- ret = output.tolist()[0]
- ret = 'It is a cat' if ret[0] <= ret[1] else 'It is a dog'
- return ret
- def test(self, img_path):
- """
- 测试接口
- :param img_path:
- :return:
- """
- self.x = tf.placeholder(tf.float32, [None, 224, 224, 3]) # 输入数据
- self.pred = self.CNN()
- self.output = tf.nn.softmax(self.pred)
- saver = tf.train.Saver()
- # tfconfig = tf.ConfigProto(allow_soft_placement=True)
- # tfconfig.gpu_options.per_process_gpu_memory_fraction = 0.3 # 占用显存的比例
- # self.ses = tf.Session(config=tfconfig)
- self.sess = tf.Session()
- self.sess.run(tf.global_variables_initializer()) # 全局 tf 变量初始化
- # 加载 w,b 参数
- saver.restore(self.sess, './model/DogVsCat-13')
- im = cv2.imread(img_path)
- im = cv2.resize(im, (224, 224))
- im = [im]
- im = np.array(im, dtype=np.float32)
- im -= 147
- output = self.sess.run(self.output, feed_dict={self.x: im, self.keep_drop: 1.})
- ret = output.tolist()[0]
- ret = 'It is a cat' if ret[0] <= ret[1] else 'It is a dog'
- print(ret)
- def train(self):
- """
- 开始训练
- :return:
- """
- self.x = tf.placeholder(tf.float32, [None, 224, 224, 3]) # 输入数据
- self.y = tf.placeholder(tf.float32, [None, self.class_num]) # 标签数据
- self.learning_rate = tf.placeholder(tf.float32) # 学习率
- # 生成训练用数据集
- x_train_list, y_train_list, x_val_list, y_val_list = self.getTrainDataset()
- print('开始转换 tensor 队列')
- x_train_list_tensor = tf.convert_to_tensor(x_train_list, dtype=tf.string)
- y_train_list_tensor = tf.convert_to_tensor(y_train_list, dtype=tf.float32)
- x_val_list_tensor = tf.convert_to_tensor(x_val_list, dtype=tf.string)
- y_val_list_tensor = tf.convert_to_tensor(y_val_list, dtype=tf.float32)
- x_train_queue = tf.train.slice_input_producer(tensor_list=[x_train_list_tensor], shuffle=False)
- y_train_queue = tf.train.slice_input_producer(tensor_list=[y_train_list_tensor], shuffle=False)
- x_val_queue = tf.train.slice_input_producer(tensor_list=[x_val_list_tensor], shuffle=False)
- y_val_queue = tf.train.slice_input_producer(tensor_list=[y_val_list_tensor], shuffle=False)
- train_im, train_label = self.dataset_opt(x_train_queue, y_train_queue)
- train_batch = tf.train.batch(tensors=[train_im, train_label], batch_size=self.batch_size, num_threads=2)
- val_im, val_label = self.dataset_opt(x_val_queue, y_val_queue)
- val_batch = tf.train.batch(tensors=[val_im, val_label], batch_size=self.batch_size, num_threads=2)
- # VGG16 网络
- print('开始加载网络')
- self.pred = self.CNN()
- # 损失函数
- self.loss = tf.nn.softmax_cross_entropy_with_logits(logits=self.pred, labels=self.y)
- # 优化器
- self.opt = tf.train.AdamOptimizer(learning_rate=self.learning_rate).minimize(self.loss)
- # acc
- self.acc_tf = tf.equal(tf.argmax(self.pred, 1), tf.argmax(self.y, 1))
- self.acc = tf.reduce_mean(tf.cast(self.acc_tf, tf.float32))
- with tf.Session() as self.sess:
- # 全局 tf 变量初始化
- self.sess.run(tf.global_variables_initializer())
- coordinator = tf.train.Coordinator()
- threads = tf.train.start_queue_runners(sess=self.sess, coord=coordinator)
- # 模型保存
- saver = tf.train.Saver()
- batch_max = len(x_train_list) // self.batch_size
- total_step = 1
- for epoch_num in range(self.epoch_max):
- lr = self.lr * (1 - (epoch_num/self.epoch_max) ** 2) # 动态学习率
- for batch_num in range(batch_max):
- x_train_tmp, y_train_tmp = self.sess.run(train_batch)
- self.sess.run(self.opt, feed_dict={self.x: x_train_tmp, self.y: y_train_tmp, self.learning_rate: lr, self.keep_drop: 0.5})
- # 输出评价标准
- if total_step % 20 == 0 or total_step == 1:
- print()
- print('epoch:%d/%d batch:%d/%d step:%d lr:%.10f' % ((epoch_num + 1), self.epoch_max, (batch_num + 1), batch_max, total_step, lr))
- # 输出训练集评价
- train_loss, train_acc = self.sess.run([self.loss, self.acc], feed_dict={self.x: x_train_tmp, self.y: y_train_tmp, self.keep_drop: 1.})
- print('train_loss:%.10f train_acc:%.10f' % (np.mean(train_loss), train_acc))
- # 输出验证集评价
- val_loss_list, val_acc_list = [], []
- for i in range(int(self.val_num/self.batch_size)):
- x_val_tmp, y_val_tmp = self.sess.run(val_batch)
- val_loss, val_acc = self.sess.run([self.loss, self.acc], feed_dict={self.x: x_val_tmp, self.y: y_val_tmp, self.keep_drop: 1.})
- val_loss_list.append(np.mean(val_loss))
- val_acc_list.append(np.mean(val_acc))
- print('val_loss:%.10f val_acc:%.10f' % (np.mean(val_loss), np.mean(val_acc)))
- total_step += 1
- # 保存模型
- if (epoch_num + 1) % self.save_epoch == 0:
- print('正在保存模型:')
- saver.save(self.sess, './model/DogVsCat', global_step=(epoch_num + 1))
- coordinator.request_stop()
- coordinator.join(threads)
- def CNN(self):
- """
- VGG16 + FC
- :return:
- """
- # 权重
- weight = {
- # 输入 batch_size*224*224*3
- # 第一层
- 'wc1_1': tf.get_variable('wc1_1', [3, 3, 3, 64]), # 卷积 输出: batch_size*224*224*64
- 'wc1_2': tf.get_variable('wc1_2', [3, 3, 64, 64]), # 卷积 输出: batch_size*224*224*64
- # 池化 输出: 112*112*64
- # 第二层
- 'wc2_1': tf.get_variable('wc2_1', [3, 3, 64, 128]), # 卷积 输出: batch_size*112*112*128
- 'wc2_2': tf.get_variable('wc2_2', [3, 3, 128, 128]), # 卷积 输出: batch_size*112*112*128
- # 池化 输出: 56*56*128
- # 第三层
- 'wc3_1': tf.get_variable('wc3_1', [3, 3, 128, 256]), # 卷积 输出: batch_size*56*56*256
- 'wc3_2': tf.get_variable('wc3_2', [3, 3, 256, 256]), # 卷积 输出: batch_size*56*56*256
- 'wc3_3': tf.get_variable('wc3_3', [3, 3, 256, 256]), # 卷积 输出: batch_size*56*56*256
- # 池化 输出: 28*28*256
- # 第四层
- 'wc4_1': tf.get_variable('wc4_1', [3, 3, 256, 512]), # 卷积 输出: batch_size*28*28*512
- 'wc4_2': tf.get_variable('wc4_2', [3, 3, 512, 512]), # 卷积 输出: batch_size*28*28*512
- 'wc4_3': tf.get_variable('wc4_3', [3, 3, 512, 512]), # 卷积 输出: batch_size*28*28*512
- # 池化 输出: 14*14*512
- # 第五层
- 'wc5_1': tf.get_variable('wc5_1', [3, 3, 512, 512]), # 卷积 输出: batch_size*14*14*512
- 'wc5_2': tf.get_variable('wc5_2', [3, 3, 512, 512]), # 卷积 输出: batch_size*14*14*512
- 'wc5_3': tf.get_variable('wc5_3', [3, 3, 512, 512]), # 卷积 输出: batch_size*14*14*512
- # 池化 输出: 7*7*512
- # 全链接第一层
- 'wfc_1': tf.get_variable('wfc_1', [7*7*512, 4096]),
- # 全链接第二层
- 'wfc_2': tf.get_variable('wfc_2', [4096, 4096]),
- # 全链接第三层
- 'wfc_3': tf.get_variable('wfc_3', [4096, self.class_num]),
- }
- # 偏移量
- biase = {
- # 第一层
- 'bc1_1': tf.get_variable('bc1_1', [64]),
- 'bc1_2': tf.get_variable('bc1_2', [64]),
- # 第二层
- 'bc2_1': tf.get_variable('bc2_1', [128]),
- 'bc2_2': tf.get_variable('bc2_2', [128]),
- # 第三层
- 'bc3_1': tf.get_variable('bc3_1', [256]),
- 'bc3_2': tf.get_variable('bc3_2', [256]),
- 'bc3_3': tf.get_variable('bc3_3', [256]),
- # 第四层
- 'bc4_1': tf.get_variable('bc4_1', [512]),
- 'bc4_2': tf.get_variable('bc4_2', [512]),
- 'bc4_3': tf.get_variable('bc4_3', [512]),
- # 第五层
- 'bc5_1': tf.get_variable('bc5_1', [512]),
- 'bc5_2': tf.get_variable('bc5_2', [512]),
- 'bc5_3': tf.get_variable('bc5_3', [512]),
- # 全链接第一层
- 'bfc_1': tf.get_variable('bfc_1', [4096]),
- # 全链接第二层
- 'bfc_2': tf.get_variable('bfc_2', [4096]),
- # 全链接第三层
- 'bfc_3': tf.get_variable('bfc_3', [self.class_num]),
- }
- # 第一层
- net = tf.nn.conv2d(input=self.x, filter=weight['wc1_1'], strides=[1, 1, 1, 1], padding='SAME') # 卷积
- net = tf.nn.leaky_relu(tf.nn.bias_add.NET, biase['bc1_1'])) # 加 b 然后 激活
- net = tf.nn.conv2d.NET, filter=weight['wc1_2'], strides=[1, 1, 1, 1], padding='SAME') # 卷积
- net = tf.nn.leaky_relu(tf.nn.bias_add.NET, biase['bc1_2'])) # 加 b 然后 激活
- net = tf.nn.max_pool(value.NET, ksize=[1, 2, 2, 1], strides=[1, 2, 2, 1], padding='VALID') # 池化
- # 第二层
- net = tf.nn.conv2d.NET, weight['wc2_1'], [1, 1, 1, 1], padding='SAME') # 卷积
- net = tf.nn.leaky_relu(tf.nn.bias_add.NET, biase['bc2_1'])) # 加 b 然后 激活
- net = tf.nn.conv2d.NET, weight['wc2_2'], [1, 1, 1, 1], padding='SAME') # 卷积
- net = tf.nn.leaky_relu(tf.nn.bias_add.NET, biase['bc2_2'])) # 加 b 然后 激活
- net = tf.nn.max_pool.NET, [1, 2, 2, 1], [1, 2, 2, 1], padding='VALID') # 池化
- # 第三层
- net = tf.nn.conv2d.NET, weight['wc3_1'], [1, 1, 1, 1], padding='SAME') # 卷积
- net = tf.nn.leaky_relu(tf.nn.bias_add.NET, biase['bc3_1'])) # 加 b 然后 激活
- net = tf.nn.conv2d.NET, weight['wc3_2'], [1, 1, 1, 1], padding='SAME') # 卷积
- net = tf.nn.leaky_relu(tf.nn.bias_add.NET, biase['bc3_2'])) # 加 b 然后 激活
- net = tf.nn.conv2d.NET, weight['wc3_3'], [1, 1, 1, 1], padding='SAME') # 卷积
- net = tf.nn.leaky_relu(tf.nn.bias_add.NET, biase['bc3_3'])) # 加 b 然后 激活
- net = tf.nn.max_pool.NET, [1, 2, 2, 1], [1, 2, 2, 1], padding='VALID') # 池化
- # 第四层
- net = tf.nn.conv2d.NET, weight['wc4_1'], [1, 1, 1, 1], padding='SAME') # 卷积
- net = tf.nn.leaky_relu(tf.nn.bias_add.NET, biase['bc4_1'])) # 加 b 然后 激活
- net = tf.nn.conv2d.NET, weight['wc4_2'], [1, 1, 1, 1], padding='SAME') # 卷积
- net = tf.nn.leaky_relu(tf.nn.bias_add.NET, biase['bc4_2'])) # 加 b 然后 激活
- net = tf.nn.conv2d.NET, weight['wc4_3'], [1, 1, 1, 1], padding='SAME') # 卷积
- net = tf.nn.leaky_relu(tf.nn.bias_add.NET, biase['bc4_3'])) # 加 b 然后 激活
- net = tf.nn.max_pool.NET, [1, 2, 2, 1], [1, 2, 2, 1], padding='VALID') # 池化
- # 第五层
- net = tf.nn.conv2d.NET, weight['wc5_1'], [1, 1, 1, 1], padding='SAME') # 卷积
- net = tf.nn.leaky_relu(tf.nn.bias_add.NET, biase['bc5_1'])) # 加 b 然后 激活
- net = tf.nn.conv2d.NET, weight['wc5_2'], [1, 1, 1, 1], padding='SAME') # 卷积
- net = tf.nn.leaky_relu(tf.nn.bias_add.NET, biase['bc5_2'])) # 加 b 然后 激活
- net = tf.nn.conv2d.NET, weight['wc5_3'], [1, 1, 1, 1], padding='SAME') # 卷积
- net = tf.nn.leaky_relu(tf.nn.bias_add.NET, biase['bc5_3'])) # 加 b 然后 激活
- net = tf.nn.max_pool.NET, [1, 2, 2, 1], [1, 2, 2, 1], padding='VALID') # 池化
- print('last-net', net)
- # 拉伸 flatten, 把多个图片同时分别拉伸成一条向量
- net = tf.reshape.NET, shape=[-1, weight['wfc_1'].get_shape()[0]])
- print(weight['wfc_1'].get_shape()[0])
- print('拉伸 flatten', net)
- # 全链接层
- # fc 第一层
- net = tf.matmul.NET, weight['wfc_1']) + biase['bfc_1']
- net = tf.nn.dropout.NET, self.keep_drop)
- net = tf.nn.relu.NET)
- print('fc 第一层', net)
- # fc 第二层
- net = tf.matmul.NET, weight['wfc_2']) + biase['bfc_2']
- net = tf.nn.dropout.NET, self.keep_drop)
- net = tf.nn.relu.NET)
- print('fc 第二层', net)
- # fc 第三层
- net = tf.matmul.NET, weight['wfc_3']) + biase['bfc_3']
- print('fc 第三层', net)
- return.NET
- def getTrainDataset(self):
- """
- 整理数据集, 把图像 resize 为 224*224*3, 训练集做成 25000*224*224*3, 把 label 做成 one-hot 形式
- :return:
- """ train_data_list = os.listdir('./data/train_data/')
- print('共有 %d 张训练图片, 读取 %d 张:' % (len(train_data_list), self.train_max_num))
- random.shuffle(train_data_list) # 打乱顺序
- x_val_list = train_data_list[:self.val_num]
- y_val_list = [[0, 1] if file_name.find('cat')> -1 else [1, 0] for file_name in x_val_list]
- x_train_list = train_data_list[self.val_num:self.train_max_num]
- y_train_list = [[0, 1] if file_name.find('cat')> -1 else [1, 0] for file_name in x_train_list]
- return x_train_list, y_train_list, x_val_list, y_val_list
- def dataset_opt(self, x_train_queue, y_train_queue):
- """
- 处理图片和标签
- :param queue:
- :return:
- """
- queue = x_train_queue[0]
- contents = tf.read_file('./data/train_data/' + queue)
- im = tf.image.decode_jpeg(contents)
- im = tf.image.resize_images(images=im, size=[224, 224])
- im = tf.reshape(im, tf.stack([224, 224, 3]))
- im -= 147 # 去均值化
- # im /= 255 # 将像素处理在 0~1 之间, 加速收敛
- # im -= 0.5 # 将像素处理在 - 0.5~0.5 之间
- return im, y_train_queue[0]
- if __name__ == '__main__':
- opt_type = sys.argv[1:][0]
- instance = DogVsCat()
- if opt_type == 'train':
- instance.train()
- elif opt_type == 'test':
- instance.test('./data/test1/1.jpg')
- elif opt_type == 'start':
- # 将 session 持久化到内存中
- instance.test('./data/test1/1.jpg')
- # 启动 Web 服务
- # http://127.0.0.1:5050/dogOrCat?img_path=./data/test1/1.jpg
- @App.route('/dogOrCat', methods=['GET', 'POST'])
- def dogOrCat():
- img_path = '' if request.method =='POST':
- img_path = request.form.to_dict().get('img_path')
- elif request.method == 'GET':
- # img_path = request.args.get('img_path')
- img_path = request.args.to_dict().get('img_path')
- print(img_path)
- ret = instance.dogOrCat(img_path)
- print(ret)
- return JSON.dumps({'type': ret})
- App.run(host='0.0.0.0', port=5050, debug=False)
来源: https://juejin.im/post/5c0739b6f265da614312d86e