多进程queue·pipe·manager·进程锁

Queue (队列)
作用
1:解耦,使程序实现松耦合。
2:提高运行效率
相当于一个有顺序的容器。
数据只有一份,取出就没了。

判断队列为空的方法汇总

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
#  Author: Diedline
import queue
q = queue.Queue()
q.put("d1")
q.put("d2")
q.put("d3")
print(q.qsize())
print(q.get())
print(q.get())
print(q.get())
"""
这些方法都可以用来判断队列是否为空了
print(q.get()) #使用get 获取空队列时会卡死
print(q.get_nowait()) #抛出queue.Empty的异常 用try 获取
print(q.qsize()) # 可以 当q,size等于0 时放弃继续取值
print(q.get(block= False)) #同样抛出 queue.Empty的异常
print(q.get(timeout = 1)) #等待一秒结束卡死状态 抛出 queue.Empty的异常
"""

后进先出队列
q = queue.LifoQueue()

1
2
3
4
5
6
7
8
9
#  Author: Diedline
import queue
q = queue.LifoQueue()
q.put("d1")
q.put("d2")
q.put("d3")
print(q.get())
print(q.get())
print(q.get())

能设定优先级的队列

1
2
3
4
5
6
7
8
9
10
11
#  Author: Diedline
import queue
q= queue.PriorityQueue()
q.put((10,"wch"))
q.put((2,"hxd"))
q.put((4,"111"))
q.put((20,"lss"))
print(q.get())
print(q.get())
print(q.get())
print(q.get())

生产消费模型:
在并发编程中通过生产者消费者模型解决大多数并发问题。该模式通过平衡生产线程和消费线程的工作能力来提高程序的整体数据的处理速度
生产者消费者模型:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
#  Author: Diedline
import threading
import time
import queue
q = queue.Queue(maxsize=10)
def Producer(name):
count =1
while True:
q.put("骨头%s"%count)
print("生产了骨头%s"%count)
count+=1
time.sleep(0.4)
def Comsumer(name):
while True:
print("%s 取到了%s并吃了它"%(name,q.get()))
time.sleep(1)
p = threading.Thread(target=Producer,args=("王某某",))
c = threading.Thread(target=Comsumer,args=("李某某",))
wh = threading.Thread(target=Comsumer,args=("湖某某",))
p.start()
c.start()
wh.start()

Io 操作不占用cpu
计算占用cpu
上下文切换也需要消耗资源,所以大量的计算如果使用多线程可能不一定快,不一定会比单线程快。所以python的多线程不适合cpu密集操作型的任务,适合io密集型的任务。
两个进程都是独立的,无法互相访问,必须找一个中级件。

两个进程数据的传递实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
#  Author: Diedline

from multiprocessing import Process, Pipe


def f(conn):
conn.send([42, None, 'hello from child'])
conn.send([42, None, 'hello from child2'])
print("from parent:",conn.recv())
conn.close()

if __name__ == '__main__':
parent_conn, child_conn = Pipe()
p = Process(target=f, args=(child_conn,))
p.start()
print(parent_conn.recv()) # prints "[42, None, 'hello']"
print(parent_conn.recv()) # prints "[42, None, 'hello']"
parent_conn.send("张洋可好") # prints "[42, None, 'hello']"
p.join()

实现进程真正的数据共享,manager已经帮你加锁了,无法两个进程同时修改一个数据

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
#  Author: Diedline
__author__ = "Alex Li"

from multiprocessing import Process, Manager
import os
def f(d, l):
d[os.getpid()] =os.getpid()
l.append(os.getpid())
print(l)

if __name__ == '__main__':
with Manager() as manager:
d = manager.dict() #{} #生成一个字典,可在多个进程间共享和传递

l = manager.list(range(5))#生成一个列表,可在多个进程间共享和传递
p_list = []
for i in range(10):
p = Process(target=f, args=(d, l))
p.start()
p_list.append(p)
for res in p_list: #等待结果
res.join()

print(d)
print(l)

name =’main’ 作用
意思就是说让你写的脚本模块既可以导入到别的模块中用,另外该模块自己也可执行。就是导入到其他模块中的时候,
不会执行 if name =’main’:下面的内容
进程池中有两个方法
apply 同步执行
apply_async 并行

callback参数 回调即执行完前面的进程之后执行callback中的参数 注意回调是主进程的回调,不是子进程。
进程里也有锁

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
#  Author: Diedline
from multiprocessing import Process, Pool,freeze_support
import time
import os

def Foo(i):
time.sleep(2)
print("in process",os.getpid())
return i + 100

def Bar(arg):
print('-->exec done:', arg,os.getpid())

if __name__ == '__main__':
#freeze_support()
pool = Pool(processes=3) #允许进程池同时放入5个进程
print("主进程",os.getpid())
for i in range(10):
pool.apply_async(func=Foo, args=(i,), callback=Bar) #callback=回调
#pool.apply(func=Foo, args=(i,)) #串行
#pool.apply_async(func=Foo, args=(i,)) #串行
print('end')
pool.close()
pool.join() #进程池中进程执行完毕后再关闭,如果注释,那么程序直接关闭。.join()