发布于2019-08-07 11:03 阅读(1093) 评论(0) 点赞(3) 收藏(5)
进程是cpu资源分配的最小单位,进程是正在进行的一个过程或者说一个任务。
线程是cpu调度的最小单位。
协程是一种用户态的轻量级线程,即协程是由用户程序自己控制调度的。(单线程下的并发)
进程线程区别:
子进程结束后,子进程的状态信息仍然保存在系统中,直到父进程结束。(所有进程都会经历僵尸进程)
父进程已经退出,但它的一个或多个子进程还在运行, 这些进程就会成为孤儿进程,由init进程收养。
守护进程会在主进程代码执行结束后就终止。
方式一:
import time
from multiprocessing import Process
def task(name): # 子进程执行的任务
print('%s is running' % name)
time.sleep(3)
print('%s is done' % name)
if __name__ == '__main__':
# 实例化得到对象
p1 = Process(target=task, args=('子进程1',)) # 必须加逗号
p2 = Process(target=task, kwargs={'name': '子进程2'})
# 调用对象下的方法,开启进程
p1.start() # 仅是给操作系统发送信号, 由操作系统开启子进程
p2.start()
print('主')
方式二:
import time
from multiprocessing import Process
class MyProcess(Process):
def __init__(self, name):
super().__init__()
self.name = name
def run(self): # 实际上是start,但是必须叫run
print('%s is running' % self.name)
time.sleep(3)
print('%s is done' % self.name)
if __name__ == '__main__':
p1 = MyProcess('子进程1')
p2 = MyProcess('子进程2')
p1.start() # 启动进程,并调用该子进程中的p.run()
p2.start()
print('主')
进程名:print(p.name)
进程PID: print(os.getpid)
print(p.pid)
父进程PID:print(os.getppid)
等待进程运行结束:p.join()
查看进程是否在运行:print(p.is_alive())
终止进程:p.terminate()
进程结束时间由操作系统决定。
终止进程但不进行任何清理操作,如果p创建了子进程,该子进程就成了僵尸进程;如果p还保存了一个锁那么也将不会被释放,进而导致死锁。
开启守护进程:p.daemon = True
必须在p.start(
)之前设置,守护进程将被禁止创建子进程。
例子:
from multiprocessing import Process, Lock
import time
def task(name, lock):
lock.acquire()
print('%s 1' % name)
time.sleep(1)
print('%s 2' % name)
time.sleep(1)
print('%s 3' % name)
lock.release()
if __name__ == '__main__':
lock = Lock()
for i in range(3):
p = Process(target=task, args=('进程%s' % i, lock))
p.start()
例子:
from multiprocessing import Queue
que = Queue(3)
que.put(1)
que.put({'a': 'hello'})
que.put([1, 2, 3])
print(que.full()) # 判断队列是否已满
# que.put(4) #再放就阻塞住了
print(que.get())
print(que.get())
print(que.get())
print(que.empty()) # 队列是否空了
# print(que.get()) # 再取就阻塞住了
例子:
from multiprocessing import Process, JoinableQueue
import time
import random
def producer(q, name, food):
for i in range(2):
res = '%s%s' % (food, i)
time.sleep(random.randint(1, 3))
print('\033[34m%s 生产了 %s\033[0m' % (name, res))
q.put(res) # 入队
q.join() # 等到消费者把队列中的所有的数据都取走之后,生产者才结束
def consumer(q, name):
while True:
res = q.get() # 出队
time.sleep(random.randint(1, 3))
print('\033[32m%s 吃 %s\033[0m' % (name, res))
q.task_done() # 发送信号给q.join(),说明已经从队列中取走一个数据并处理完毕了
if __name__ == '__main__':
que = JoinableQueue()
# 3个生产者
p1 = Process(target=producer, args=(que, '生产者1', '包子'))
p2 = Process(target=producer, args=(que, '生产者2', '饺子'))
p3 = Process(target=producer, args=(que, '生产者3', '月饼'))
# 2个消费者
c1 = Process(target=consumer, args=(que, '消费者1'))
c2 = Process(target=consumer, args=(que, '消费者2'))
c1.daemon = True # 将消费者设为守护进程
c2.daemon = True # 消费者在生产者结束后, 随主进程一起结束
l1 = [p1, p2, p3]
l2 = [c1, c2]
# 开始生产
for p in l1:
p.start()
# 开始消费
for c in l2:
c.start()
# 主进程等生产者p1、p2、p3结束
# 而p1、p2、p3在消费者把所有数据都取干净之后结束
for p in l1:
p.join()
print('主')
使用greenlet
库:
from greenlet import greenlet
def eat(name):
print('%s eat 1' % name)
g2.switch('egon') # 传入第一次参数, 之后不用再传
print('%s eat 2' % name)
g2.switch()
def play(name):
print('%s play 1' % name)
g1.switch()
print('%s play 2' % name)
g1 = greenlet(eat)
g2 = greenlet(play)
g1.switch('egon') # 在第一次switch时传入参数,以后都不需要
使用gevent
库 (无法识别socket
time
等模块的阻塞,需要使用gevent自带的阻塞):
import gevent
def eat(name):
print('%s eat 1' % name)
gevent.sleep(2) # gevent识别到阻塞,进行切换
print('%s eat 2' % name)
def play(name):
print('%s play 1' %name)
gevent.sleep(1)
print('%s play 2' %name)
g1 = gevent.spawn(eat, 'egon')
g2 = gevent.spawn(play, name='egon')
g1.join()
g2.join()
# gevent.joinall([g1,g2]) # 等待列表中所有协程对象结束
print('主')
导入monkey
可识别socket
time
等阻塞:
from gevent import monkey; monkey.patch_all()
# patch_all()必须放在导入socket、time等模块前,否则gevent无法识别socket、time的阻塞
import gevent
import time
def eat():
print('eat food 1')
time.sleep(2)
print('eat food 2')
def play():
print('play 1')
time.sleep(1)
print('play 2')
g1 = gevent.spawn(eat)
g2 = gevent.spawn(play)
gevent.joinall([g1, g2])
print('主')
导入模块:
from concurrent.futures import ProcessPoolExecutor # 进程池
from concurrent.futures import ThreadPoolExecutor # 线程池
创建进程/线程池:
executor = ProcessPoolExecutor(max_worker=3) # 进程池
executor = ThreadPoolExecutor(max_workers=3) #线程池
将进程/线程放入池内:
future = exector.submit(task, parm)
"""
executor.map(task, range(1,12))
相当于:
for i in range(11):
exector.submit(task, i)
"""
关闭进程/线程池:
exector.shutdown() # 默认wait参数为True
wait=True
等待池内所有任务执行完毕回收完资源后再执行后续代码。
wait=False
不join,直接执行后续代码。
关闭进程/线程池后,不允许再向已关闭的进程/线程池内加入进程/线程。
拿到进程/线程运行结果:
print(future.result())
回调函数:
future.add_done_callback(func)
: future
的task
结束后,会自动把future对象当做参数传给回调函数func。
例子:
from concurrent.futures import ProcessPoolExecutor
import time
import random
def task(name):
print("%s is running" % name)
time.sleep(random.randint(3, 5))
return name
def func(future):
name = future.result()
print("%s's callback function" % name)
if __name__ == '__main__':
executor = ProcessPoolExecutor(5)
futures = []
for i in range(3):
future = executor.submit(task, "task"+str(i))
future.add_done_callback(func)
futures.append(future)
executor.shutdown(True)
print("主")
阻塞程度:阻塞IO>非阻塞IO>多路转接IO>信号驱动IO>异步IO,效率是由低到高。
信号量也是一把锁,但同一时间可以被指定大小的任务获取。
模块导入:
from threading import Semaphore
设置计数器大小:
sm = Semaphore(value=1) # 计数器大小默认为1
两个主要的方法:
acquire() # 内置计数器-1, 当计数器为0时阻塞,等待其他线程调用release()
release() # 内置计数器+1
例子:
from threading import Thread, Semaphore
import threading
import time
def func():
sm.acquire()
print('%s get sm' % threading.current_thread().getName())
time.sleep(3)
sm.release()
if __name__ == '__main__':
sm = Semaphore(5)
for _ in range(23):
t = Thread(target=func)
t.start()
Event对象:
用于线程间通信,即程序中的其一个线程需要通过判断某个线程的状态来确定自己下一步的操作,就用到了Event对象。
模块导入:
from threading import Event
创建Event对象:
event = Event()
相关方法:
event.isSet() # 返回event的状态值
event.wait([maxtime]) # 如果 event.isSet() == False将阻塞线程, [maxtime]-超时时间
event.set() # 设置event的状态值为True,所有阻塞池的线程激活进入就绪状态, 等待操作系统调度
event.clear() # 恢复event的状态值为False
例子:
from threading import Thread, Event
import threading
import time
import random
def conn_mysql():
count = 1
while not event.is_set():
if count > 3:
print('\033[31m[%s]连接失败\033[0m' % threading.current_thread().getName())
exit(0)
print('<%s>第%s次尝试连接' % (threading.current_thread().getName(), count))
event.wait(1)
count += 1
print('<%s>连接成功' % threading.current_thread().getName())
def check_mysql():
print('\033[45m[%s]正在检查mysql\033[0m' % threading.current_thread().getName())
time.sleep(random.randint(1, 4))
event.set()
if __name__ == '__main__':
event = Event()
conn1 = Thread(target=conn_mysql)
conn2 = Thread(target=conn_mysql)
check = Thread(target=check_mysql)
conn1.start()
conn2.start()
check.start()
定时器Timer:指定n秒后执行某项操作(不会像sleep一样阻塞)
例子:
from threading import Timer
def hello():
print("hello, world")
t = Timer(1, hello) // 一秒后执行hello函数
t.start()
死锁现象: 指两个或两个以上的进程或线程在执行过程中,因争夺资源而造成的一种互相等待的现象,这种永远在互相等待的进程称为死锁进程。
递归锁: RLock
可以连续acquire多次。RLock内部维护着一个Lock和一个counter变量,counter记录了acquire的次数,从而使得资源可以被多次require。直到一个线程所有的acquire都被release,其他的线程才能获得资源。
例子:
from threading import Thread, RLock
import time
mutexA = mutexB = RLock()
class MyThread(Thread):
def run(self):
self.func1()
self.func2()
def func1(self):
mutexA.acquire()
print('\033[41m%s 拿到A锁\033[0m' % self.name)
mutexB.acquire()
print('\033[42m%s 拿到B锁\033[0m' % self.name)
mutexB.release()
print('\033[41m%s 释放A锁\033[0m' % self.name)
mutexA.release()
print('\033[42m%s 释放B锁\033[0m' % self.name)
def func2(self):
mutexB.acquire()
print('\033[43m%s 拿到B锁\033[0m' % self.name)
time.sleep(2)
mutexA.acquire()
print('\033[44m%s 拿到A锁\033[0m' % self.name)
mutexA.release()
print('\033[43m%s 释放B锁\033[0m' % self.name)
mutexB.release()
print('\033[44m%s 释放A锁\033[0m' % self.name)
if __name__ == '__main__':
for _ in range(4):
t = MyThread()
t.start()
cpython中引进GIL,保证同一时刻同一进程中只有一个线程被执行,获取锁并获取资源,避免了多线程并发执行,保证了线程的安全,但无法使用多核优势。
结论:
多线程用于IO密集型
多进程用于计算密集型
作者:我是女王
链接:https://www.pythonheidong.com/blog/article/10145/9607f2bec6a9f0d926dd/
来源:python黑洞网
任何形式的转载都请注明出处,如有侵权 一经发现 必将追究其法律责任
昵称:
评论内容:(最多支持255个字符)
---无人问津也好,技不如人也罢,你都要试着安静下来,去做自己该做的事,而不是让内心的烦躁、焦虑,坏掉你本来就不多的热情和定力
Copyright © 2018-2021 python黑洞网 All Rights Reserved 版权所有,并保留所有权利。 京ICP备18063182号-1
投诉与举报,广告合作请联系vgs_info@163.com或QQ3083709327
免责声明:网站文章均由用户上传,仅供读者学习交流使用,禁止用做商业用途。若文章涉及色情,反动,侵权等违法信息,请向我们举报,一经核实我们会立即删除!