可以重复利用的线程
直接上代码
- from threading import Thread, current_thread
- from queue import Queue
- # 重写线程类
- class MyThread(Thread):
- def __init__(self):
- super().__init__()
- self.daemon = True # 守护线程
- self.queue = Queue(10)
- self.start() # 实例化的时候开启线程
- def run(self): # 子线程只有这一个线程, 从队列里面拿任务
- while True:
- task, args, kwargs = self.queue.get() # 拿任务 也是元组
- task(*args, **kwargs) # 可能有, 可能没有, 所有传入不定长参数
- self.queue.task_done() # 结束任务
- def apply_async(self, func, args=(), kwargs={}): # 自写任务, 不是重写任务, 充当生产者, 给线程提供任务 (把任务扔到队列)
- self.queue.put((func, args, kwargs))
- def join_R(self): # 主线程等待子线程结束
- self.queue.join() # task_done 为 0 的时候就阻塞
- def func():
- print(1, current_thread())
- def func2(*args, **kwargs):
- print(2, current_thread())
- print('func:', args, kwargs)
- t = MyThread()
- t.apply_async(func)
- t.apply_async(func2, args=(1,2), kwargs={'a':1, 'b':2})
- print("任务提交完成")
- t.join_R()
- print("任务完成")
结果:
任务提交完成
- 1 <MyThread(Thread-1, started daemon -1223214272)>
- 2 <MyThread(Thread-1, started daemon -1223214272)>
- func: (1, 2) {'a': 1, 'b': 2}
任务完成 任务完成后, 主线程就开始退出, 因此守护线程被杀死
线程池的简单实现
池的概念
主线程: 相当于生产者, 只管向线程池提交任务.
并不关心线程池是如何执行任务的.
因此, 并不关心是哪一个线程执行的这个任务.
线程池: 相当于消费者, 负责接收任务,
并将任务分配到一个空闲的线程中去执行.
代码实现如下:
- from threading import Thread, current_thread
- from queue import Queue
- class T_pool:
- def __init__(self, n): # 准备多少个池
- super().__init__()
- self.queue = Queue()
- for i in range(n): # 在池里开多少个线程
- Thread(target=self.fun, daemon=Thread).start() # 守护进程 并启动
- def fun(self): # 生产者
- while True:
- task = self.queue.get()
- task()
- self.queue.task_done()
- def apply_async(self, task): # 消费者
- self.queue.put(task)
- def join(self):
- self.queue.join()
- def func():
- print(current_thread())
- def func2():
- print(current_thread())
- p = T_pool(2)
- p.apply_async(func)
- p.apply_async(func2)
- p.join()
结果:
- <Thread(Thread-1, started daemon -1223324864)>
- <Thread(Thread-1, started daemon -1223324864)>
Python 自带的池
内置线程池
- from multiprocessing.pool import ThreadPool # 线程池
- from multiprocessing import pool # 进程池
- # 内置线程池
- def fun(*args, **kwargs):
- print(args, kwargs)
- p = ThreadPool(2) # 直接使用内置的
- p.apply_async(fun, args=(1,2), kwds={'a':1})
- p.close() # 要求: 在 join 前必须要 close, 这样就不允许再提交任务了
- p.join()
结果:
(1, 2) {'a': 1}
内置进程池
- from multiprocessing import Pool # 进程池
- # 内置进程池
- def fun(*args, **kwargs):
- print(args, kwargs)
- if __name__ == '__main__': # 必须要有一个 main 测试
- p = Pool(2) # pool 的实例化必须在 main 测试之下
- p.apply_async(fun, args=(1,2), kwds={'a':1})
- p.close() # 要求: 在 join 前必须要 close, 这样就不允许再提交任务了
- p.join()
结果:
(1, 2) {'a': 1}
池的其他操作
操作一: close - 关闭提交通道, 不允许再提交任务
操作二: terminate - 中止进程池, 中止所有任务
操作三: 结果操作
结果操作
- from multiprocessing.pool import ThreadPool
- import time
- def func(n):
- if n == 1:
- return 1
- elif n == 2:
- return 2
- return func(n-1) + func(n-2)
- pool = ThreadPool()
- a_result = pool.apply_async(func, args=(35,))
- print("note1:",time.asctime(time.localtime(time.time())))
- result = a_result.get() # 会阻塞, 知道结果产生了
- print("note2:",time.asctime(time.localtime(time.time())))
结果:
- note1: Mon Sep 17 00:07:31 2018
- note2: Mon Sep 17 00:07:34 2018
使用池来实现并发服务器
使用线程池来实现并发服务器
- import socket
- from multiprocessing.pool import ThreadPool # 线程池
- from multiprocessing import Pool, cpu_count
- '''
- 使用线程池来实现
- 并发服务器
- '''
- print(cpu_count())
- server = socket.socket()
- server.bind(('0.0.0.0', 8080))
- server.listen(1000)
- def work_thread(conn):
- while True:
- data = conn.recv(1000)
- if data:
- print(data)
- conn.send(data)
- else:
- conn.close()
- break
- if __name__ == '__main__':
- t_pool = ThreadPool(5) # 使用线程池, 通常分配 2 倍的 cpu 个数
- while True:
- conn,addr = server.accept()
- t_pool.apply_async(work_thread, args=(conn,)) # 接收的是个任务, conn 做为参数
- 使用进程池来实现并发服务器
- import socket
- from multiprocessing.pool import ThreadPool # 线程池
- from multiprocessing import Pool, cpu_count
- '''
- 使用进程池来实现
- 并发服务器
- '''
- print(cpu_count())
- server = socket.socket()
- server.bind(('0.0.0.0', 9000))
- server.listen(1000)
- def work_process(server):
- t_pool = ThreadPool(cpu_count()*2) # 使用线程池, 通常分配 2 倍的 cpu 个数
- while True:
- conn,addr = server.accept()
- t_pool.apply_async(work_thread, args=(conn,)) # 接收的是个任务, conn 做为参数
- def work_thread(conn):
- while True:
- data = conn.recv(1000)
- if data:
- print(data)
- conn.send(data)
- else:
- conn.close()
- break
- n = cpu_count() # 获取当前计算机的 CPU 核心数量
- p = Pool(n)
- for i in range(n): # 充分利用 CPU, 为每个 CPU 分配一个进程
- p.apply_async(work_process, args=(server,))
- p.close()
- p.join()
客户端:
- import socket
- click = socket.socket()
- click.connect(('127.0.0.1', 8888))
- while True:
- data = input("请输入你要发送的数据:")
- click.send(data.encode())
- print("接收到的消息: {}".format(click.recv(1024).decode()))
总结完毕.
来源: http://www.bubuko.com/infodetail-2770637.html