进程和线程
什么是线程 (thread) 什么是进程
线程: 操作系统能够进行运算调度的最小单位. 它被包含在进程中, 是进程中的实际运作单位. 是一串指令的集合
一个线程指的是进程中一个单一顺序的控制流, 一个进程是中可以并发多个线程, 每个线程并行执行不同的任务
进程: 以一个整体的形式暴露给操作系统管理, 里面包含对各种资源的调用, 内存的管理, 网络接口的调用等, 对各种资源管理的集合, 就可以称为进程
进程要操作 cpu, 必须先创建一个线程, 所有在同一个进程里的线程是共享同一块内存空间的
注意: 1. 进程本身不能够执行
2. 进程和线程不能比较谁快谁慢, 两个没有可比性, 进程是资源的集合, 线程是真正执行任务的, 进程要执行任务也要通过线程
3. 启动一个线程比启动一个进程快
进程和线程的区别
1. 线程共享内存空间, 进程的内存是独立的
2.thread have direct access to data segment of its process;process have their own copy of the data segment of the parent process
3. 同一个进程之间线程可以互相交流, 两个进程想要通信, 必须通过一个中间代理来实现
4. 创建一个线程很简单, 创建一个新进程需要对其父进程做一次克隆
5. 一个线程可以控制和操作同一个进程里的其他的线程, 但是进程只能操作子进程
6. 改变一个主线程可能会影响到其他线程的运行, 对父进程的修改不会影响到子进程
Python GIL(全局解释器锁)
在 python 中无论你启用多少个线程, 你有多少个 cpu, 在 python 执行的时候都会在同一时刻只准许一个线程运行
原因: python 的线程是调用操作系统的原生线程
线程锁:
import threading
import time
def run(n):
lock.acquire()
global num
num += 1
#time.sleep(1) 在上锁的情况不要使用 sleep, 不然会等 50s 才会完成
lock.release()
lock = threading.Lock()
num = 0
start_time = time.time()
t_objs = []
for i in range(50):
t = threading.Thread(target = run,args = ("t-%s" %i,))
t.start()
t_objs.append(t)
for t in t_objs:
t.join()
print("-----all threads has finished....",threading.current_thread(),threading.active_count())
print("num =",num)
给线程上锁, 使程序变成串行, 保证 num 在 + 1 的时候能够准确无误, 不然可能会导致, 一个线程正在执行 + 1 还没有结束, 另外一个线程也开始 + 1, 最后达不到准确结果
递归锁: 在一个大锁中包含一个小锁
场景: 学校放学, 学生离校了, 突然发现自己的文具盒落在了教师中, 学校每次只准许一个人进学校(其他东西丢了就是这个人干的), 进教室拿的时候也要开教室门(防止有其他人在学校没有走), 最后拿到文具盒, 离开学校. 这个期间学校大门相当于一把锁, 教室的门相当于另一把锁. 保证了整个过程没有其他人的干扰
import threading,time
def run1():
print("grab the first part data")
lock.acquire()
global num
num += 1
lock.release()
return num
def run2():
print("grab the second part data")
lock.acquire()
global num2
num2 += 1
lock.release()
return num2
def run3():
lock.acquire()
res = run1()
print('------between run1 and run2')
res2 = run2()
lock.release()
print(res,res2)
num,num2 = 0,0
lock = threading.RLock()
for i in range(10):
t = threading.Thread(target=run3)
t.start()
while threading.active_count() != 1:
print(threading.active_count())
else:
print("----all threads done---")
print(num,num2)
相当于先进入 run3 这个大门, 然后在进入 run1 和 run2 这个两个小门, 然后进行 num1 和 num2 的加 1, 保证了每个线程的执行都是串行的
Python threading 模块
线程有 2 中调用方式
直接调用:
import threading
import time
def run(n):
print("task",n)
time.sleep(2)
t1 = threading.Thread(target = run,args = ("t1",))
t2 = threading.Thread(target = run,args = ("t2",))
t1.start()
t2.start()
运行结果: 一下子出来两个结果, 但是程序还会等待 2s 才会结束, 一共两秒, 因为他们是并行的
继承式调用:
import threading
import time
#用类的方式启动线程
class MyThread(threading.Thread):
def __init__(self,n):
super(MyThread,self).__init__()
self.n = n
def run(self):
print("running task",self.n)
t1 = MyThread("t1")
t2 = MyThread("t2")
t1.start()
t2.start()
上面只启动了 2 个线程, 我们下面启动 50 个线程:
import threading
import time
def run(n):
print("task",n)
time.sleep(2)
start_time = time.time()
for i in range(50):
t = threading.Thread(target = run,args = ("t-%s" %i,))
t.start()
print("cost :",time.time()-start_time)
运行结果:
问题: 创建了 50 个进程之后只用了这点时间, 主程序没有等其他的子线程就往下走
原因: 是多线程的原因, 一个程序至少有一个线程, 程序本身就是一个线程, 主线程, 主线程启动了子线程之后, 子线程就和主线程没有关系了, 两个互不影响
解决方法: join() 方法, 等子线程运行完成之后, 主程序才往下走
import threading
import time
def run(n):
print("task",n)
time.sleep(2)
print("task has done....",n)
start_time = time.time()
t_objs = []
for i in range(50):
t = threading.Thread(target = run,args = ("t-%s" %i,))
t.start()
t_objs.append(t)
for t in t_objs:
t.join()
print("------all threads has finished")
print("cost :",time.time()-start_time)
先自己创建一个临时列表, 然后存储创建的线程, 然后一个一个的用 join() 方法
创建了 50 个线程, 主线程就是程序的本身, 也就是说上面一共有 51 个线程
#threading.current_thread() 显示当前线程是主线程还是子线程
#threading.active_count() 显示当前程序运行的线程的个数
print("-----all threads has finished....",threading.current_thread(),threading.active_count())
守护进程
t.setDaemon(True) # 把当前线程设置成守护线程, 要在 start() 之前
非守护线程结束, 所有守护线程就退出
import threading
import time
def run(n):
print("task",n)
time.sleep(2)
print("task has done....",n)
start_time = time.time()
t_objs = []
for i in range(50):
t = threading.Thread(target = run,args = ("t-%s" %i,))
t.setDaemon(True)# 把当前线程设置成守护线程, 要在 start() 之前
t.start()
t_objs.append(t)
print("-----all threads has finished....",threading.current_thread(),threading.active_count())
print("cost :",time.time()-start_time)
运行结果: task has done 都没有运行, 主线程结束之后, 所有子线程都强制结束
Semaphore(信号量)
互斥锁同时只允许一个线程更改数据, 而 Semaphore 是同时允许一定数量的线程更改数据, 如有 3 个座位, 最多允许 3 个人同时在座位上, 后面的人只能等到有人起来才能够坐进去
import threading,time
def run(n):
semaphore.acquire()
time.sleep(1)
print("run the thread:%s\n" %n)
semaphore.release()
if __name__ == '__main__':
10 semaphore = threading.BoundedSemaphore(5)# 最多允许 5 个线程同时运行
for i in range(20):
t = threading.Thread(target = run,args=(i,))
t.start()
while threading.active_count() != 1:
pass#print(threading.active_count())
else:
print("----all threads done---")
21 #一次性执行了 5 个线程
和线程锁差不多, 都要 acquire() 和 release(), 区别就是信号量有多把锁,
threading.BoundedSemaphore(5) 最多允许 5 个线程同时运行, 那运行结果就是 5 个结果一起出来, 但是实质是这 5 个要是有 3 个先完成就会立刻再送进去 3 个, 它不会等 5 个都完成, 它是每出来一个就放进去一个
这 5 个如果同时改数据就有可能改错, 这个主要用于: 链接池 , 线程池. mysql 链接有链接池的概念, 同一时刻最多有几个并发; socketserver, 为了保证系统不会被太多线程拉慢, 可以用信号量弄同一时刻最多有 100 个链接进来
事件(event)
a events is a simple synchronization object;
the event represents an internal flag,and threads can wait for the flag to be set,or set or clear the flag themselves
事件是一个简单的同步对象, 事件就相当于设置一个全部变量, 然后线程不断的检测这个变量的变化然后进行不同的操作
方法:
event = threading.Event() 生成一个 event 对象
event.set()
event.clear()
event.wait()
if the flag is set,the wait method doesn't do anything
标志位设定, 代表绿灯, 直接通行
if the flag is cleared,wait will block until it becomes set again
标志位被清空, 代表红灯, wait 等待变绿灯
any number of threads my wait for the same event
红绿灯案例:
import threading
import time
event = threading.Event()
def lighter():
count = 0
event.set()# 一开始先设成绿灯
while True:
if count > 5 and count <=10 :# 改成红灯
event.clear()# 清空标志位
print("\033[41;1mred light is on ...\033[0m")
elif count > 10:
event.set()# 变绿灯
print("\033[42;1mgreen light is on ...\033[0m")
count = 0
else:
print("\033[42;1mgreen light is on ...\033[0m")
time.sleep(1)
count += 1
def car(name):
while True:
if event.is_set():# 代表绿灯
print("[%s] running..." %name)
time.sleep(1)
else:
print("[%s] sees red light,waiting..." %name)
event.wait()
print("\033[34;1m[%s] green light is on,start going ...\033[0m" %name)
light = threading.Thread(target = lighter,)
light.start()
car1 = threading.Thread(target = car,args=("Tesla",))
car1.start()
运行结果:
queue 队列
queue is especially useful in threaded programming when information must be exchanged safely between multiple threads
class queue.Queue(maxsize = 0) 先入先出
class queue.LifeQuene(maxsize = 0)last in first out
class queue.PriorityQueue(maxsize = 0) 存储数据时可设置优先级的队列
队列就是一个有顺序的容器
列表和队列的区别: 列表取出数据后, 数据还在列表中, 相当如复制数据; 队列的数据只有一份, 取走就没有了
class queue.Queue:
利用 put(),get() 方法往队列里加减数据, 用 qsize() 方法得到此刻队列中还有多少数据.
但是有一个问题: 当数据都取出来之后, 如果还用 get() 方法去取数据的话, 程序就会卡主
可以用 get_nowait() 方法, 如果 queue 中没有了数据, 就会抛出一个异常, 这样就不会被卡住, 同时也可以用 qsize() 方法进行判断, 如果没有了数据就不要取了
Queue.get(block = True,timeout=None) block 参数, 如果取不到数据, 默认就会卡住, 改成 false 就不会卡住; timeout 设置卡住几秒
class queue.LifoQueue(maxsize = 0)#last in first out
import queue
q = queue.LifoQueue()
q.put(1)
q.put(2)
q.put(3)
print(q.get())
print(q.get())
print(q.get())
运行结果:
class queue.PriorityQueue(maxsize = 0)# 存储数据室可以设置优先级队列
import queue
q = queue.PriorityQueue()
q.put((10,"d1"))
q.put((-1,"d2"))
q.put((3,"d4"))
q.put((6,"d5"))
print(q.get())
print(q.get())
print(q.get())
print(q.get())
运行结果:
生产者消费者模型:
在并发编程中使用生产者和消费者模式能够解决大多数并发问题, 该模式通过平衡生产线程和消费线程的工作能力来提高程序的整体处理数据的速度
例子:
import threading,time
import queue
q = queue.Queue(maxsize = 10)
def Producer(name):
count = 1
while True:
q.put("骨头:%s" %count)
print("生产了骨头:",count)
count += 1
time.sleep(0.5)
def Consumer(name):
while True:
if q.qsize() > 0:
print("[%s] 取到 [%s], 并且吃到了它..." %(name,q.get()))
time.sleep(1)
p = threading.Thread(target = Producer,args=("xiaoming",))
c = threading.Thread(target = Consumer,args=("xiaohong",))
c1 = threading.Thread(target = Consumer,args=("liangliang",))
p.start()
c.start()
c1.start()
运行结果:
来源: https://www.cnblogs.com/charles8866/p/8402291.html