发布于2019-08-07 09:54 阅读(1010) 评论(0) 点赞(5) 收藏(1)
生产者: 泛指产生数据的一方
消费者: 泛指处理数据的一方
他们之间有什么问题
效率低,因为双方的处理速度不同,一个快一个慢,则双方需要相互等待
具体的解决方法:
from multiprocessing import Process,Queue
import requests
import re
import time,random
# 生产者任务
def product(urls,q):
count = 0
for url in urls:
response = requests.get(url)
data = response.text
time.sleep(random.random())
q.put(data)
count +=1
print(f'生产了{count}个数据')
# 消费者任务
def customer(q):
i = 0
while True:
data = q.get()
res = re.findall('src=//(.*?) width',data)
time.sleep(random.random())
i += 1
print(f'第{i}个任务获取{len(res)}个img')
if __name__ == '__main__':
urls = [
'http://www.baidu.com',
'http://www.baidu.com',
'http://www.baidu.com',
'http://www.baidu.com'
]
# 创建一个队列
q = Queue()
# 创建生产者
p = Process(target=product,args=(urls,q))
p.start()
# 创建消费者
c = Process(target=customer,args=(q,))
c.start()
这个产生一个问题:
消费者不知道生产者什么时候能结束,所以消费者进程一直在等待,导致程序不能结束
思路
joinableQueue类
继承自Queue类,使用方法一样,并且增加了join方法和taskDone方法
join是个阻塞函数,会阻塞直到taskdone的调用次数等于存入的元素个数,可以用于表示队列任务处理完成.
from multiprocessing import Process,JoinableQueue
import time,random
def product(q,name):
for i in range(10):
hot_dog =f'{name}的{i+1}个热狗'
time.sleep(random.random())
print(f'{name}生产了第{hot_dog}个热狗')
q.put(hot_dog)
def customs(q):
while True:
dog = q.get()
time.sleep(random.random())
print(f'消费了{dog}')
q.task_done() # 标记这个任务处理完成,加一次计数
if __name__ == '__main__':
# 如何判定今天的热狗真的吃完了
# 1.确定生成者任务完成
# 2.确定生出来的数据已经全部处理完成
# 创建一个容器
q = JoinableQueue()
# 创建生产者
p1 = Process(target=product,args=(q,'上海分店'))
p2 = Process(target=product,args=(q,'北京分店'))
p1.start()
p2.start()
# 创建消费者
c = Process(target=customs,args=(q,))
c.start()
p1.join()
p2.join() # 表示生产完成
q.join() # 意味着队列中的任务都处理完成了
c.terminate() # 直接终止消费者进程
线程是操作系统可以运算调度的最小单位,是真正的执行单位,其包含在进程中,一个线程就是一条固定的流程控制,一个进程可以包含多个线程,同一进程中的线程共享进程内的资源.
特点:系统会为每一个进程自动创建一条线程,称之为主线程,后续通过代码开启的线程称之为子线程.
直接实例化Thread类
from threading import Thread
def task():
print('zi over')
#与进程不同之处,不要加main判断,开启线程的代码放哪里都可以
t = Thread(target=task)
t.start()
print('zhu over')
zi over
zhu over # 从这里看出来,线程开启的速度并代码执行速度更快
继承Thread类
from threading import Thread
class MyThread(Thread):
def run(self):
print('zi run')
m = MyThread()
m.start()
print('zhu over')
只要并发访问了同一资源一定会产生安全问题,解决方案和多进程一致,就是给操作公共资源代码加锁
from threading import Thread,Lock
import time
a = 10
l = Lock()
def task():
global a
l.acquire()
temp = a
time.sleep(0.5)
a = temp - 1
l.release()
ts = []
for i in range(10):
t = Thread(target=task)
t.start()
ts.append(t)
for t in ts:
t.join()
print(a) # 如果不加锁,那么a就等于9
0
一个线程a设置为b的守护进程,a会随着b的结束而结束
默认情况下,主线程即使代码执行完毕,也会等待所有非守护线程完毕后程序才能结束,因为多个线程之间是协作关系
from threading import Thread
import time
def task():
print('z1 start')
time.sleep(4)
print('z1 over')
def task2():
print('z2 start')
time.sleep(2)
print('z2 over')
print('zhu start')
t1 = Thread(target=task)
t1.daemon = True
t1.start()
t2 = Thread(target=task2)
t2.start()
print('zhu over')
# t1还没有结束就退出,因为t1是守护线程.
zhu start
z1 start
z2 start
zhu over
z2 over
# t = Thread()
# t.start()
# t.join()
# t.is_alive()
# t.isAlive()
# t.ident # 线程标识符 id
# t.daemon
active_count()
enumerate()
current_thread()
作者:雷神归来
链接:https://www.pythonheidong.com/blog/article/9556/50f2bd4da37397813425/
来源:python黑洞网
任何形式的转载都请注明出处,如有侵权 一经发现 必将追究其法律责任
昵称:
评论内容:(最多支持255个字符)
---无人问津也好,技不如人也罢,你都要试着安静下来,去做自己该做的事,而不是让内心的烦躁、焦虑,坏掉你本来就不多的热情和定力
Copyright © 2018-2021 python黑洞网 All Rights Reserved 版权所有,并保留所有权利。 京ICP备18063182号-1
投诉与举报,广告合作请联系vgs_info@163.com或QQ3083709327
免责声明:网站文章均由用户上传,仅供读者学习交流使用,禁止用做商业用途。若文章涉及色情,反动,侵权等违法信息,请向我们举报,一经核实我们会立即删除!