1, 多线程
线程是进程的一个实体, 是 CPU 进行调度的最小单位, 他是比进程更小能独立运行的基本单位.
线程基本不拥有系统资源, 只占用一点运行中的资源(如程序计数器, 一组寄存器和栈), 但是它可以与同属于一个进程的其他线程共享全部的资源.
提高程序的运行速率, 上下文切换快, 开销比较少, 但是不够稳定, 容易丢失数据, 形成死锁.
直接上代码:
- import time
- import threading
- # 函数 1 用时 2 秒
- def fun1():
- time.sleep(2)
- print(threading.current_thread().name, time.ctime())
- # 函数 2 用时 4 秒
- def fun2():
- time.sleep(4)
- print(threading.current_thread().name, time.ctime())
- # 函数 3 用时 6 秒
- def fun3():
- time.sleep(6)
- print('hello python', time.ctime())
- th1 = threading.Thread(target=fun1)
- th2 = threading.Thread(target=fun2)
- th3 = threading.Thread(target=fun3)
- th1.start()
- th2.start()
- th3.start()
打印结果:
- Thread-1 Mon Jan 7 11:01:52 2019
- Thread-2 Mon Jan 7 11:01:54 2019
- hello python Mon Jan 7 11:01:56 2019
解析: 从结果看出, 他们同一时间 11:01:50 开始执行, 分别用了不同的时间结束
接着往下看, 添加 join 阻塞线程
- ''''''
- th1.start()
- th1.join()
- th2.start()
- th2.join()
- th3.start()
- th3.join()
打印结果:
- Thread-1 Mon Jan 7 11:19:00 2019
- Thread-2 Mon Jan 7 11:19:04 2019
- hello python Mon Jan 7 11:19:10 2019
我们看到这三线程按顺序依次执行.
我们接着看看线程的方法使用:
- threading.enumerate() #列举线程, 返回列表, 其中里面会有一条主线程
- threading.activeCount() #查看线程运行个数
- threading.current_thread().name #查看当前运行线程名称
- join() #阻塞线程运行
我们接着看第二种开线程的方式:
- import threading
- import time
- class MyThread(threading.Thread):
- def run(self):
- for i in range(3):
- time.sleep(1)
- msg = "I'm "+self.name+' @ '+str(i) #name 属性中保存的是当前线程的名字
- print(msg)
- if __name__ == '__main__':
- t = MyThread()
- t.setName('yangzhenyu')
- a = t.isAlive()
- print(a)
- print(t.getName())
- t.start()
- b = t.isAlive()
- print(b)
打印结果:
- False
- yanzghenyu
- True
- I'm yanzghenyu @ 0
- I'm yanzghenyu @ 1
- I'm yanzghenyu @ 2
方法总结:
- t.setName() #设置运行线程名称, 不指定默认 Thread-1
- t.getName() #获取线程名称
- t.isAlive() #判断线程是否运行, 返回布尔类型
线程间共享全局变量:
- import threading
- import time
- n = 100
- def work01():
- global n
- for i in range(3):
- n += 1
- print(n) //103
- def work02():
- global n
- print(n) //103
- print(n) //100
- t1 = threading.Thread(target=work01)
- t1.start()
- time.sleep(1)
- t2 = threading.Thread(target=work02)
- t2.start()
关于线程锁
用 threading.Lock()创建锁, 用 acquire()申请锁, 每次只有一个线程获得锁, 其他线程必须等此线程 release()后才能获得锁
RLock 允许在同一线程中被多次 acquire. 而 Lock 却不允许这种情况.
注意: 如果使用 RLock, 那么 acquire 和 release 必须成对出现, 即同一线程中调用了 n 次 acquire, 必须调用 n 次的 release 才能真正释放所占用的琐
下面例子中我们用到的是 Lock(), 当加完锁之后, 该方法同一时间内只能被一个线程调用.
- import threading
- mylock=threading.Lock()# 创建锁
- num = 0
- def add_num(name):
- global num
- while True:
- mylock.acquire()# 申请锁也就是加锁
- print('thread %s locked! num=%d'%(name,num))
- if num>=5:
- print('thread %s release! num=%d'%(name,num))
- mylock.release()# 释放锁
- return
- num += 1
- print('thread %s release! num = %d'%(name,num))
- mylock.release()
- t1 = threading.Thread(target=add_num,args=('A',))
- t2 = threading.Thread(target=add_num,args=('B',))
- t1.start()
- t2.start()
打印结果:
- thread A locked! num=0
- thread A release! num = 1
- thread A locked! num=1
- thread A release! num = 2
- thread A locked! num=2
- thread A release! num = 3
- thread A locked! num=3
- thread A release! num = 4
- thread A locked! num=4
- thread A release! num = 5
- thread A locked! num=5
- thread A release! num=5
- thread B locked! num=5
- thread B release! num=5
关于进程:
进程是系统进行资源分配的最小单位, 每个进程都有自己的独立内存空间, 不用进程通过进程间通信来通信.
但是进程占据独立空间, 比较重量级, 所以上下文进程间的切换开销比较大, 但是比较稳定安全.
进程创建:
第一种创建进程的方式:
- from multiprocessing import Process
- import time
- import random
- import os
- def piao(name):
- print("%s is piaoping"%name)
- time.sleep(random.randint(0,1))
- print("%s is piao end"%name)
- if __name__ == '__main__':
- print("CPU 的个数是:%d"%os.cpu_count())
- p1 = Process(target=piao,args=("alex",),name="进程 1")
- print(p1.name)
- p1.start()
- print("父进程!") #执行速度要远快于建立新进程的时间
打印结果:
CPU 的个数是: 2
进程 1
父进程!
- alex is piaoping
- alex is piao end
第二种创建进程的方式:
- from multiprocessing import Process
- import time
- import random
- # 继承 Process 类, 并实现自己的 run 方法
- class Piao(Process):
- def __init__(self,name):
- #必须调用父类的 init 方法
- super().__init__()
- self.name = name
- def run(self):
- print("%s is piaoing"%self.name)
- time.sleep(random.randint(1,3))
- print("%s is piaoeng"%self.name)
- if __name__ == '__main__':
- p1 = Piao("Alex")
- #开辟一个新的进程实际上就是执行本进程所对应的 run()方法
- p1.start()
- print("主进程!")
结果:
主进程!
- Alex is piaoing
- Alex is piaoeng
解析: join 括号中不携带参数, 表示父进程在这个位置要等待 p1 进程执行完成后, 如果指定参数, 也就是等待时间 s, 那么主进程将在这个时间内结束,
用 is_active() 方法即可检测进程的状态, 不加 join() 返回 True, 表示进程还在进行.
进程的方法,
start() 启动进程实例(创建子进程);
terminate(): 不管任务是否完成, 立即终止;
name: 当前进程实例别名, 默认为 Process-N,N 为从 1 开始递增的整数;
pid: 当前进程实例的 PID 值; os.getpid()
is_alive(): 判断进程实例是否还在执行;
join([timeout]): 是否等待进程实例执行结束, 或等待多少秒;
进程池:
在程序实际处理问题时, 忙时会有成千上万个任务需要执行, 闲时有零星任务, 创建时需要消耗时间, 销毁也需要时间,
即使开启成千上万个进程, 操作系统也不能 让他同时执行. 这里就用到了进程池, 用于管理小块内存的申请与释放.
,
1, 上代码:
- from multiprocessing.pool import Pool
- from time import sleep
- def fun(a):
- sleep(1)
- print(a)
- if __name__ == '__main__':
- p = Pool() # 这里不加参数, 但是进程池的默认大小, 等于电脑 CPU 的核数
- # 也是创建子进程的个数, 也是每次打印的数字的个数
- for i in range(10):
- p.apply_async(fun, args=(i,))
- p.close()
- p.join() # 等待所有子进程结束, 再往后执行
- print("end")
2,callback 举例:
- from multiprocessing import Process,Pool
- def func(i):
- i+=1
- return i# 普通进程处理过的数据返回给主进程 p1
- def call_back(p1):
- p1+=1
- print(p1)
- if __name__ == '__main__':
- p = Pool()
- for i in range(10):
- p1 = p.apply_async(func,args=(i,),callback = call_back)#p 调用普通进程并且接受其返回值, 将返回值给要执行的回调函数处理
- p.close()
- p.join()
解析: 1,p.apply ( func,args = ()) 同步的效率, 也就是说池中的进程一个一个的去执行任务
p.apply_async( func,args = () , callback = None) : 异步的效率, 也就是池中的进程一次性都去执行任务.
2, 异步处理任务时 : 必须要加上 close 和 join. 进程池的所有进程都是守护进程(主进程代码执行结束, 守护进程就结束).
3,func : 进程池中的进程执行的任务函数
4,args : 可迭代对象性的参数, 是传给任务函数的参数
5,callback : 回调函数, 就是每当进程池中有进程处理完任务了, 返回的结果可以交给回调函数,
由回调函数进行进一步处理, 回调函数只异步才有, 同步没有. 回调函数是父进程调用.
- 3. map( func,iterable) (该方法经常用到爬虫)
- from multiprocessing import Pool
- def func(num):
- num += 1
- print(num)
- return num
- if __name__ == '__main__':
- p = Pool(2)
- res = p.map(func,[i for i in range(100)])
- # p.close()#map 方法自带这两种功能
- # p.join()
- print('主进程中 map 的返回值',res)
func : 进程池中的进程执行的任务函数
iterable : 可迭代对象, 是把可迭代对象那个中的每个元素一次传给任务函数当参数.
map 方法自带 close 和 join
进程间的通信:
1)队列
- from multiprocessing import Queue,Process
- import os,time,random
- # 添加数据函数
- def proc_write(queue,urls):
- print("进程 (%s) 正在写入..."%(os.getpid()))
- for url in urls:
- queue.put(url)
- print("%s 被写入到队列中"%(url))
- time.sleep(random.random()*3)
- # 读取数据函数
- def proc_read(queue):
- print("进程 (%s) 正在读取..."%(os.getpid()))
- while True:
- url = queue.get()
- print("从队列中提取到:%s"%(url))
- if __name__ =="__main__":
- queue = Queue()
- proc_writer1 = Process(target=proc_write,args=(queue,["ur1","ur2","ur3","ur4"]))
- proc_writer2 = Process(target=proc_write,args=(queue,["ur5","ur6","ur7","ur8"]))
- proc_reader = Process(target=proc_read,args=(queue,))
- proc_writer1.start()
- proc_writer1.join()
- proc_writer2.start()
- proc_writer2.join()
- proc_reader.start()
- proc_reader.terminate()
生产者与消费者模式(线程间的通信):
- from queue import Queue
- import threading,time
- class Producer(threading.Thread):
- def run(self):
- global queue
- count = 0
- while True:
- if queue.qsize() <1000:
- for i in range(100):
- count = count +1
- msg = '生成产品'+str(count)
- queue.put(msg)
- print(msg)
- time.sleep(0.5)
- class Consumer(threading.Thread):
- def run(self):
- global queue
- while True:
- if queue.qsize()> 100:
- for i in range(3):
- msg = self.name + '消费了'+queue.get()
- print(msg)
- time.sleep(1)
- if __name__ == '__main__':
- queue = Queue()
- for i in range(500):
- queue.put('初始产品'+str(i))
- for i in range(2):
- p = Producer()
- p.start()
- for i in range(5):
- c = Consumer()
- c.start()
2) 进程间的通信(管道)
- from multiprocessing import Pipe,Process
- import random,time,os
- def proc_send(pipe,urls):
- for url in urls:
- print("进程 (%s) 发送:%s"%(os.getpid(),url))
- pipe.send(url)
- time.sleep(random.random())
- def proc_recv(pipe):
- while True:
- print("进程 (%s) 接收到:%s"%(os.getpid(),pipe.recv()))
- time.sleep(random.random())
- if __name__ == "__main__":
- pipe = Pipe()
- p1 = Process(target=proc_send,args=(pipe[0],["url_"+str(i) for i in range(10)],))
- p2 = Process(target=proc_recv,args=(pipe[1],))
- p1.start()
- p2.start()
- p1.join()
- p2.terminate()
解析:
pipe 用于两个进程间的通信, 两个进程分别位于管道的两端, Pipe 方法返回 (conn1,conn2) 代表一个管道的两端,
Pipe 方法有 dumplex 参数, 若该参数为 True, 管道为全双工模式,
若为 Fasle,conn1 只负责接收消息, conn2 只负责发送消息. send 和 recv 方法分别是发送和接收消息的方法
协程:
协程: 是更小的执行单位, 是一种轻量级的线程, 协程的切换只是单纯的操作 CPU 的上下文, 所以切换速度特别快, 且耗能小.
gevent 是第三方库, 通过 greenlet 实现协程, 其基本思想是:
当一个 greenlet 遇到 IO 操作时, 比如访问网络, 就自动切换到其他的 greenlet, 等到 IO 操作完成, 再在适当的时候切换回来继续执行. 由于 IO 操作非常耗时, 经常使程序处于等待状态, 有了 gevent 为我们自动切换协程, 就保证总有 greenlet 在运行, 而不是等待 IO.
由于切换是在 IO 操作时自动完成, 所以 gevent 需要修改 Python 自带的一些标准库, 这一过程在启动时通过 monkey patch 完成:
- from gevent import monkey
- monkey.patch_all() # 用来在运行时动态修改已有的代码, 而不需要修改原始代码.
- import gevent
- import requests
- def f(url):
- print('GET: %s' % url)
- HTML = requests.get(url).text
- print(url, len(HTML))
- gevent.joinall([
- gevent.spawn(f, 'http://i.maxthon.cn/'), # 先执行这个函数, 发送请求, 等待的时候发送第二个请求
- gevent.spawn(f, 'http://www.jianshu.com/u/3cfeb3395a95'),
- gevent.spawn(f, 'http://edu.51cto.com/?jydh')])
运行结果:
- GET: http://i.maxthon.cn/
- GET: http://www.jianshu.com/u/3cfeb3395a95
- GET: http://edu.51cto.com/?jydh
- http://i.maxthon.cn/ 461786
- http://edu.51cto.com/?jydh 353858
- http://www.jianshu.com/u/3cfeb3395a95 597
从结果看, 3 个网络操作是并发执行的, 而且结束顺序不同, 但只有一个线程.
使用 gevent, 可以获得极高的并发性能, 但 gevent 只能在 Unix/Linux 下运行, 在 Windows 下不保证正常安装和运行.
来源: https://www.cnblogs.com/lvye001/p/10232377.html