进程 线程 协程
1)进程是具有一定独立功能的的程序关于某个数据集合上的一次运行活动,是系统进行资源分配和调度的独立单位
2)线程是进程的一个实体,是 CPU 调度和分派的基本单位
3)协程是程序自身产生的一种线程复用机制,作用是让一个线程重复利用,减少系统资源开销,提高程序效率
由于进程、线程都是操作系统的基本概念,比较抽象,我们可以将 CPU 看作是一个时刻在运行中的大型工厂,车间就是工厂里具有独立工作能力的程序进程,每个车间里工作的机器人就是线程:
系统工作模式:
同一时间工厂只能为一个车间供电,供电期间 CPU 调度线程,完成他们自己的工作,一旦供电时间到,即使有线程工作未完成,也会立即停止,各线程会保存当前工作的进度(系统的上下文切换),等待下一次供电时间,继续完成上一次工作;其实我们系统里同一时刻确实只能运行一个程序进程,只是由于 CPU 在各个程序之间切换的速度够快,我们感知不到,所有表面上看所有程序都是同时运行的。
Python 线程
- import threading import time def show(arg) : time.sleep(1) print('thread' + str(arg)) # 创建10个线程 start表示创建成功 等待CPU调度运行
- for i in range(10) : t = threading.Thread(target = show, args = (i, )) t.start() print('main thread stop') # 输出:main thread stop thread3 thread2 thread1 thread0... # Thread 线程的其他方法# start 线程准备就绪,等待CPU调度# setName 为线程设置名称# getName 获取线程名称# setDaemon 设置为后台线程或前台线程(默认)# 如果是后台线程,主线程执行过程中,后台线程也在进行,主线程执行完毕后,后台线程不论成功与否,均停止# 如果是前台线程,主线程执行过程中,前台线程也在进行,主线程执行完毕后,等待前台线程也执行完成后,程序停止# join 逐个执行每个线程,执行完毕后继续往下执行,该方法使得多线程变得无意义# run 线程被cpu调度后自动执行线程对象的run方法
自定义创建线程类
- # 线程被创建准备好后 被cpu调度会去执行threading.Thread里面的 run 方法 class MyThread(threading.Thread) : def __init__(self, func, args) : self.func = func self.args = args super(MyThread, self).__init__() def run(self) : self.func(self.args) def f2(arg) : print(arg) obj = MyThread(f2, 123) obj.start()
线程锁
由于 CPU 调用线程是随机性的,也就是线程被执行顺序不固定,在创建了很多线程同时去执行修改同一数据时就会发生操作顺序混乱,导致出现脏数据
- import threading import time NUM = 10 def func() : global NUM NUM -= 1 # 程序等待1秒钟 10个线程基本都执行到此处 time.sleep(1) # 此时打印出来的 NUM 值就是 0 了 print(NUM)
- for i in range(10) : t = threading.Thread(target = func, ) t.start() # 输出0 0 0...
加线程锁以后
- import threading import time NUM = 10 def func(l) : global NUM l.acquire() NUM -= 1 time.sleep(1) print(NUM) l.release() # RLock 可以加多层线程锁# lock = threading.Lock() lock = threading.RLock()
- for i in range(10) : t = threading.Thread(target = func, args = (lock, )) t.start() # 输出9 8 7 6...
信号量(Semaphore)
线程锁同时只允许一个线程更改数据,而 semaphore 可以允许同时一定数量的线程通过,后面的线程将等待
- import threading,
- time def run(n) : semaphore.acquire() time.sleep(1) print("run the thread: %s" % n) semaphore.release()
- if __name__ == '__main__': num = 0 # 最多允许5个线程同时运行 semaphore = threading.BoundedSemaphore(5)
- for i in range(20) : t = threading.Thread(target = run, args = (i, )) t.start()
事件(event)
Python 线程的事件用于主线程控制子线程的执行,事件主要提供 3 中方法:set、wait、clear
有点像红绿灯:clear 红灯停 set 绿灯行 wait 黄灯等
- import threading def func(i, e) : print(i) # 出现wait后 所有线程将在此阻塞 直到用户输入1设置event.set放行 e.wait() print(i + 100) event = threading.Event()
- for i in range(3) : t = threading.Thread(target = func, args = (i, event, )) t.start()# 默认为设置为clear阻塞event.clear() inp = input(' >>> ')# 手动放行
- if inp == '1': event.set() # 输出:1 2 3 >>> 1 100 200 300
Condition 条件
condition 使得在等待的线程 满足一定条件才会被放行设定数量的线程
- import threading def func(i, con) : print(i) con.acquire() con.wait() print(i + 100) con.release() c = threading.Condition()
- for i in range(10) : t = threading.Thread(target = func, args = (i, c, )) t.start()
- while True: inp = input(' >>> ')
- if inp == 'q':
- break c.acquire() c.notify(int(inp)) c.release()
Timer
指定 n 秒后 执行某操作
- from threading import Timer def hello() : print("hello, world") t = Timer(1, hello) t.start()
Python 自定义线程池
Python 中在 3 之前没有线程池 3 中已有的线程池也是功能非常少的低级线程池
自定义一个线程池
- import queue import threading import contextlib import time # 空任务标识 终止进程StopEvent = object() class ThreadPool(object) : def __init__(self, max_num, max_task_num = None) :
- if max_task_num: # 装任务的队列q self.q = queue.Queue(max_task_num)
- else: self.q = queue.Queue() # 允许的最大大线程数 self.max_num = max_num # 终止线程 self.cancel = False self.terminal = False # 当前已创建的线程数量 self.generate_list = [] # 当前空闲的线程数量 self.free_list = [] def run(self, func, args, callback = None) : """
- 线程池执行一个任务
- :param func: 任务函数
- :param args: 任务函数所需参数
- :param callback: 任务执行失败或成功后执行的回调函数,回调函数有两个参数1、任务函数执行状态;2、任务函数返回值(默认为None,即:不执行回调函数)
- :return: 如果线程池已经终止,则返回True否则None
- """
- if self.cancel:
- return # 判断已经创建的线程数量是否小于最大线程数 表示所有的线程在忙 多余任务不再创建线程
- if len(self.free_list) == 0 and len(self.generate_list) < self.max_num: self.generate_thread() # 任务元组放到队列 w = (func, args, callback, ) self.q.put(w) def generate_thread(self) : """
- 创建一个线程
- """ t = threading.Thread(target = self.call) t.start() def call(self) : """
- 循环去获取任务函数并执行任务函数
- """ current_thread = threading.currentThread() # 将已经创建的线程加入线程列表 self.generate_list.append(current_thread) # 取任务元组 event = self.q.get()
- while event != StopEvent: # 得到任务元组的内容 func,
- arguments,
- callback = event
- try: # 执行 action 函数 result = func( * arguments) success = True except Exception as e: success = False result = None
- if callback is not None:
- try: callback(success, result) except Exception as e: pass # 将线程标记为空闲 with self.worker_state(self.free_list, current_thread) :
- if self.terminal: event = StopEvent
- else: # 如果取到的是空任务执行下面的
- else event = self.q.get()
- else: self.generate_list.remove(current_thread) def close(self) : """
- 执行完所有的任务后,所有线程停止
- """ self.cancel = True # 创建了多少线程就传几个空任务 full_size = len(self.generate_list)
- while full_size: self.q.put(StopEvent) full_size -= 1 def terminate(self) : """
- 无论是否还有任务,终止线程
- """ self.terminal = True
- while self.generate_list: self.q.put(StopEvent) self.q.queue.clear() @contextlib.contextmanager def worker_state(self, state_list, worker_thread) : """
- 用于记录线程中正在等待的线程数
- """ state_list.append(worker_thread)
- try: yield
- finally: state_list.remove(worker_thread) # How to use pool = ThreadPool(5) def callback(status, result) : # status,
- execute action status # result,
- execute action
- return value pass def action(i) : print(i) # 30个任务
- for i in range(30) : ret = pool.run(action, (i, ), callback) time.sleep(5) print(len(pool.generate_list), len(pool.free_list)) print(len(pool.generate_list), len(pool.free_list)) pool.close()# pool.terminate()
Python 进程
创建进程类似于创建线程
不同的是,创建进程需要消耗系统不小的开销 而且各进程间数据不共享
- from multiprocessing import Process import threading import time def foo(i) : print 'say hi',
- i
- for i in range(10) : p = Process(target = foo, args = (i, )) p.start()
进程间数据共享方式
1)queues 队列方式
2)Array 数组方式
3)dict 字典方式
- # 方式一 queues from multiprocessing import Process from multiprocessing import queues import multiprocessing def foo(i, arg) : arg.put(i) print('say hi', i, arg.qsize())
- if __name__ == '__main__': li = queues.Queue(20, ctx = multiprocessing)
- for i in range(10) : p = Process(target = foo, args = (i, li, )) p.start() # 方式二 Array数据 from multiprocessing import Process from multiprocessing import Array def foo(i, arg) : arg[i] = i + 100
- for item in arg: print(item) print(' === ======')
- if __name__ == '__main__': # 使用数据Array必须指定数据类型 和长度 li = Array('i', 10)
- for i in range(10) : p = Process(target = foo, args = (i, li, )) p.start() p.join() # 方式三 dict from multiprocessing import Process from multiprocessing import Manager import time def foo(i, arg) : arg[i] = 100 + i print(arg.values())
- if __name__ == '__main__': obj = Manager() li = obj.dict()
- for i in range(10) : p = Process(target = foo, args = (i, li, )) p.start() # 主进程结束后 子进程会被强行终止 # 可以使用join等待所有子进程全部执行完成 或者 time.sleep p.join()
进程锁
- from multiprocessing import Process from multiprocessing import Array from multiprocessing import RLock,
- Lock,
- Event,
- Condition,
- Semaphore import time def foo(i, lis, lc) : lc.acquire() # 将列表中的数字递减1 lis[0] = lis[0] - 1 time.sleep(1) print('say hi', lis[0]) lc.release()
- if __name__ == '__main__': li = Array('i', 1) li[0] = 10 lock = RLock()
- for i in range(10) : p = Process(target = foo, args = (i, li, lock)) p.start()
进程池
进程池内部维护一个进程序列 但使用时 这去进程池中获取一个进程,如果进程中没有,则需要等待,知道进程池中有进程为止
- import time from multiprocessing import Pool def f1(arg) : time.sleep(1) print(arg)
- if __name__ == '__main__': pool = Pool(5)
- for i in range(30) : # pool.apply(func = f1, args = (i, )) # 异步执行 一部到位 pool.apply_async(func = f1, args = (i, )) # close 表示所有的子进程任务执行完毕 pool.close() # time.sleep(1) # terminate 表示立即终止所有子进程 # pool.terminate() pool.join()
协程
进程和协程的操作是系统,而协程的创建和操作是程序的编写者
协程存在的意义:对于多协程的应用,CPU 通过切片的方式来切换协程间的执行,线程切换时需要耗时(保存状态,下次继续)协程则只使用一个线程,在一个线程中规定某个代码块执行顺序
协程使用场景:当程序中存在大量不需要使用 CPU 的操作时,即多 IO 操作适用协程
greelet
- from greenlet import greenlet def test1() : print(12) gr2.
- switch () print(34) gr2.
- switch () def test2() : print(56) gr1.
- switch () print(78) gr1 = greenlet(test1) gr2 = greenlet(test2) gr1.
- switch ()
gevent
- import gevent def foo() : print('Running in foo') gevent.sleep(0) print('Explicit context
- switch to foo again') def bar() : print('Explicit context to bar') gevent.sleep(0) print('Implicit context
- switch back to bar') gevent.joinall([ gevent.spawn(foo), gevent.spawn(bar), ])
通过 IO 自动切换来完成网站请求
- from gevent import monkey; monkey.patch_all() import gevent import requests def f(url) : print('GET: % s' % url) resp = requests.get(url) data = resp.text print(' % d bytes received from % s.' % (len(data), url)) gevent.joinall([ gevent.spawn(f, 'https: //www.python.org/'),
- gevent.spawn(f, 'https: //www.yahoo.com/'),
- gevent.spawn(f, 'https: //github.com/'),
- ]) # 输出GET: https: //www.python.org/
- GET: https: //www.yahoo.com/
- GET: https: //github.com/
- 462078 bytes received from https: //www.yahoo.com/.
- 25689 bytes received from https: //github.com/.
- 47373 bytes received from https: //www.python.org/.
队列 queue
- import queue # queue.Queue 先进先出# q = queue.LifoQueue() 后进先出 # 权重队列# q = queue.PriorityQueue()# q.put((1, "alex"))# print(q.get()) # 双向队列# q = queue.deque()# q.append(123)# q.append(456)# q.appendleft(789)# q.pop()# q.popleft() # 先进先出队列# 参数10表示最多只接收10个数据# q = queue.Queue(10)# q.put(11)# q.put(22)# 超时时间timeout block是否阻塞# q.put(33, timeout = 2)# q.put(33, block = False) # print(q.qsize())# get默认阻塞 有数据才能取# print(q.get())# print(q.get(block = False)) # join task_done 阻塞进程 当队列中的任务执行完毕后 不再阻塞q = queue.Queue() q.put(123) q.put(456) q.get() q.task_done() q.get() q.task_done() q.join()
来源: http://www.bubuko.com/infodetail-1865207.html