博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
python 进程
阅读量:6885 次
发布时间:2019-06-27

本文共 11725 字,大约阅读时间需要 39 分钟。

进程 Process

  • 正在执行中的程序称为进程。进程的执行会占用内存等资源。多个进程同时执行时,每个进程的执行都需要由操作系统按一定的算法(RR调度、优先数调度算法等)分配内存空间

创建一个进程第一种方式

# process模块是一个创建进程的模块,借助这个模块,就可以完成进程的创建。from multiprocessing import Processdef func(n):    # 子进程函数        # 获取当前进程id    print("当前进程id:", os.getpid())    # 获取父进程id    print("父进程id:", os.getppid())    for i in range(10):        time.sleep(2)        print("子进程", n)# 子进程中的程序相当于import的主进程中的程序,那么import的时候会不会执行你import的那个文件的程序啊,前面学的,是会执行的,所以出现了两次打印print("-----------------")# Windows下写代码开启子进程时,必须写上if __name__ == ‘__main__’:if __name__ == "__main__":    # 首先我运行当前这个test.py文件,运行这个文件的程序,那么就产生了进程,这个进程我们称为主进程    # 将函数注册到一个进程中,此时还没有启动进程,只是创建了一个进程对象。并且func是不加括号的。    p = Process(target=func, args=("传参",))  # args指定的为传给target函数的位置参数,是一个元组形式,必须有逗号        # 告诉操作系统,给我开启一个进程,func这个函数就被我们新开的这个进程执行了,    # 而这个进程是我主进程运行过程中创建出来的,所以称这个新创建的进程为主进程的子进程,而主进程又可以称为这个新进程的父进程。    p.start()  # start并不是直接就去执行了,我们知道进程有三个状态,进程会进入进程的三个状态,就绪,(被调度,也就是时间片切换到它的时候)执行,阻塞,并且在这个三个状态之间不断的转换,等待cpu执行时间片到了。    p.json() # 等待子进程执行结束 主进程 才能继续往下执行    # 获取当前进程id    print("当前进程id:", os.getpid())    # 获取父进程id  Pycharm 进程的ID    print("父进程id Pycharm 进程的ID:", os.getppid())    # 这是主进程的程序,上面开启的子进程的程序是和主进程的程序同时运行的,我们称为异步    for i in range(10):        time.sleep(1)        print("父进程")

process 类中的 参数

group   # 未使用 值始终为 Nonetarget  # 表示调用对象,即子进程要执行的任务args   #  表示调用对象的 参数元祖  args=(..., )kwargs  # 表示调用对象的字典 kwargs={"name":'张',}name  # 为子进程的名字 定义子进程的名字

进程对象的 方法

from multiprocessing import Processp = Process(target=func,)p.start()  # 启动进程  p.json()   # 等待子进程执行结束 主进程 才能继续往下执行p.Terminate()  # 关闭进程 不是自己关闭 而是 给操作系统 发送了一个关闭进程的信号p.is_alive()  # 查看进程是否还活着p.daemon = True  # 设置进程为守护进程 写在 start()之前 子进程会跟父进程一起结束p.name  # 进程的名字p.pid  # 进程的 id

创建进行的第二种方式:

  • 自己定义一个类,继承Process类,必须写一个run方法,想传参数,自行写init方法,然后执行super父类的init方法
from multiprocessing import Processimport os# 自定义类名  继承Processclass Myprocess(Process):    def __init__(self, name):        # 调用父类的__init__()方法        super().__init__()        self.name = name    # 进程类 必须重写run方法    def run(self):        print("我是子进程的 id", os.getpid())        print(self.name)  # W 下必须写 __name__ == "__main__"if __name__ == "__main__":    # 实现 多进程类对象    p = Myprocess("张飞")    # 开启进程    #给操作系统发送创建进程的指令,子进程创建好之后,要被执行,执行的时候就会执行run方法    p.start()    print('p1.name',p1.name)    print('p1.pid',p1.pid)        print('主进程结束')print("我是父进程", os.getpid())

验证进程间的空间隔离

import timefrom multiprocessing import Process# 进程之间是空间隔离的,不共享资源global_num = 100def func1():    global global_num    global_num = 0    print('子进程全局变量>>>', global_num)if __name__ == '__main__':    p1 = Process(target=func1, )    p1.start()    time.sleep(1)    print('主进程的全局变量>>>', global_num)

僵和孤儿进程

  • 进程结束后资源回收 主进程不会管子进程 自己结束
  • 使用 p.json() 主进程会等待子进程结束后 才结束
import timeimport osfrom multiprocessing import Processdef func1():    time.sleep(30)    print(os.getpid())    print('子进程')if __name__ == '__main__':    p1 = Process(target=func1,)    p1.start()    # p1.join()    # time.sleep(2)    # print(p1.pid)    print('主进程的ID',os.getpid())    print('主进程结束')

同步锁 Lock

  • 同步效率低,但是保证了数据安全 重点

  • 抢票案例

    import randomimport jsonimport timefrom multiprocessing import Process, Lockdef quang(i, lock):    print("等待抢票")    time.sleep(1)    lock.acquire()  # 锁头 只有一把      with open("aaa", "r") as f:        dic = json.load(f)        if dic["piao"] > 0:            time.sleep(random.random())            dic["piao"] -= 1            with open("aaa", "w") as f1:                json.dump(dic, f1)            print("%s强盗了" % i)        else:            print("%s没票了" % i)    lock.release()  # 还锁 if __name__ == "__main__":    lo = Lock()    for i in range(10):        p = Process(target=quang, args=(i, lo))        p.start()

信号量 Semaphore

  • 阿斯蒂芬

    import randomimport timefrom multiprocessing import Process, Semaphoredef dbj(i, s):    # 信息 入口    s.acquire()      print("%s号顾客来洗脚了" % i)    time.sleep(random.randrange(2, 7))    # 信息 出口    s.release()if __name__ == "__main__":    # 信息    # 创建一个计数器,每次acquire就减1,直到减到0,那么上面的任务只有4个在同时异步的执行,后面的进程需要等待.    s = Semaphore(5)    for i in range(20):        p = Process(target=dbj, args=(i, s))        p.start()

事件 Event

  • 通过事件来完成红路灯

    import randomimport timefrom multiprocessing import Process, Eventdef hld(e):    while 1:        print("红灯了!!")        time.sleep(3)        # 将事件状态改为 True        e.set()        print("绿灯了")        time.sleep(5)        # 将事件状态改为 False        e.clear()def car(i, e):    # e.is_set() 查看事件状态 True False    if e.is_set():        print("%s号车直接通过" % i)    else:        print("%s号车等红灯" % i)        # 等待 如果 状态为 True 时 向后执行        e.wait()        print("%s号车绿灯通过" % i)if __name__ == "__main__":    e = Event()    p1 = Process(target=hld, args=(e,))    p1.start()    for i in range(1000):        time.sleep(random.random())        p = Process(target=car, args=(i, e))        p.start()# 方法e.is_set()  # 查看事件状态(通过改变事件状态来控制事件其他进程的运行)e.set()  # 将事件 改为 Truee.clear()  # 将事件改为  Falsee.wait()  # 等待 如果 状态为 True 时 向后执行

进程间通信IPC

  • 水电费

    import timefrom multiprocessing import Process,Queuedef girl(q):    print('来自boy的信息',q.get())    print('来自校领导的凝视',q.get())def boy(q):    q.put('约吗')if __name__ == '__main__':    q = Queue(5)    boy_p = Process(target=boy,args=(q,))    girl_p = Process(target=girl,args=(q,))    boy_p.start()    girl_p.start()    time.sleep(1)    q.put('好好工作,别乱搞')

队列 Queue #重点

  • 先进先出

    import randomimport timefrom multiprocessing import Process, Event, Queueq = Queue(3)q.put(1)  # 将对象放入队列中  会有细微的 延迟q.put(2)print("队列是否已经满了", q.full())  # 队列是否已经满了q.put(3)print("队列是否已经满了", q.full())# q.put(3)print(q.get())  # 取数据print("队列是否空了", q.empty())  # 队列是否空了print(q.get())print(q.get())print("队列是否空了", q.empty())print(q.get())print(q.get(False)) # 判断队列是否已经空了  空了就报错  queue.Emptyq.qsize()   # 获取队列当前大小 就是已存在的数据个数  不可靠while 1:    try:        print(q.get(False))    except:        print("11111")        breakdef boy(q):    q.put("约吗")def girl(q):    while 1:        try:            print(q.get(False)) #  == q.get_nowait()  如果队列空则报错        except:            passif __name__ == "__main__":    q = Queue(5)    boy = Process(target=boy, args=(q,))    girl = Process(target=girl, args=(q,))    boy.start()    girl.start()    time.sleep(1)    q.put("好好学习")

生产者消费者模型

  • 解耦 缓冲 降低生产者与消费者之间的 耦合性

    import timefrom multiprocessing import Process, Queuedef producer(p):    for i in range(1, 11):        time.sleep(1)        print("生产了包子%s" % i)        # 将生产的包子添加到队列中        p.put("包子%s" % i)    # 生产结束后在队列末尾 添加一个空信号    p.put(None)def consumer(p,i):    while 1:        time.sleep(1.5)        # 循环 取出所有元素        pp = p.get()        if pp:            print("%s吃了" % i, pp)        else:            print("%s吃完了" % i)            # 将空信息还回去            p.put(None)            breakif __name__ == "__main__":    q = Queue(10)    # 创建    pro_p = Process(target=producer, args=(q,))    pro_p.start()    for i in range(2):        con_p = Process(target=consumer, args=(q,i))        con_p.start()    # p2.join()    # p.put(None)

JoinableQueue

  • JoinableQueue的生产者消费者模型

  • import timefrom multiprocessing import Process, JoinableQueue# 生产者def producer(q):    for i in range(10):        time.sleep(0.5)        # 创建10 个包子装进队列中        q.put("包子%s号" % i)        print("生产了 包子%s" % i)    # 等待队列中所有内容被取走后 关闭本进程    q.join()    print("包子卖完了")# 消费者def consumer(q):    while 1:        time.sleep(1)        # 循环吃包子        print("吃了 %s" % q.get())        # 每取 一个元素 就给 q.join 传递一个信号 记录取出个数        q.task_done() if __name__ == "__main__":    # 实现 JoinableQueue 队列 对象    q = JoinableQueue(20)    # 将 生产者加入进程    pro_p = Process(target=producer, args=(q,))    pro_p.start()    # 将 消费者 加入进程    con_p = Process(target=consumer, args=(q,))    # 将消费者设置成守护进程  随 住程序一起关闭    con_p.daemon = True    con_p.start()    # 等待进程 执行完闭 主程序才能关闭    pro_p.join()    print("关闭程序!")

管道 pipe

  • 管道是不安全的

  • from multiprocessing import Process, Pipe, Manager, Lock, Pooldef func(conn2):    try:        print(conn2.recv())        print(conn2.recv())    except EOFError:        print("管道已经关闭了")        conn2.close()if __name__ == '__main__':    try:        conn1, conn2 = Pipe()  # 在创建 Process 对象之前创建管道        p = Process(target=func, args=(conn2,))        p.start()        conn1.send("asdad")        conn1.close()        conn1.recv()        p.join()    except OSError:        print("管道关闭>>>>>>>>>>")# 方法 recv() 接收  send() 发送#- 管道默认是双工的  #  设置为 单工  参数:  duplex=False  改为单工  conn1 发送  conn2 接收  如果另一端已经关闭 则 recv() 接收会报错

数据共享 Manager

  • 多进程同时操作一个文件的数据 不加锁就会出现错误数据

  • 共享: 可以将一个数据传递到进程中 在不同的 作用于中 进程可对其进行修改

    import time, osfrom multiprocessing import Process, Manager, Lock'''资源共享'''def func(mm):    mm["name"] = "张飞"if __name__ == '__main__':    m = Manager()    mm = m.dict({"name": "aaaa"})    print(mm)    p = Process(target=func, args=(mm,))    p.start()    p.join()    print(mm)def func(m_d, ml):    with ml:        m_d["count"] -= 1if __name__ == '__main__':    m = Manager()    ml = Lock()    m_d = m.dict({"count": 100})    lis = []    for i in range(20):        p = Process(target=func, args=(m_d, ml))        p.start()        lis.append(p)    [i.join() for i in lis]    print(m_d)

进程池 Pool

  • import timefrom multiprocessing import Process,Pooldef func(n):    for i in range(5):        n = n + i    print(n)if __name__ == '__main__':    #验证一下传参    pool = Pool(4)    pool.map(func,range(100))  #map自带join功能,异步执行任务,参数是可迭代的    p_list = []    for i in range(200):        p1 = Process(target=func,args=(i,))        p1.start()        p_list.append(p1)    [p.join() for p in p_list]
  • def func(n):    print(n)    time.sleep(1)    return n * nif __name__ == '__main__':    pool = Pool(4)  # 进程池 的个数    lis = []    for i in range(10):        # res = p.apply(fun,args=(i,))  #同步执行的方法,他会等待你的任务的返回结果,        # 异步运行,根据进程池中有的进程数,每次最多3个子进程在异步执行,并且可以执行不同的任务,传送任意的参数了。        # 返回结果之后,将结果放入列表,归还进程,之后再执行新的任务        # 需要注意的是,进程池中的三个进程不会同时开启或者同时结束        # 而是执行完一个就释放一个进程,这个进程就去接收新的任务。        ret = pool.apply_async(func, args=(i,))  # 异步执行的方法,他会等待你的任务的返回结果,        # print(ret.get())        lis.append(ret)    # print(lis)    pool.close()  # 不是关闭进程池,而是不允许再有其他任务来使用进程池    pool.join()  # 这是感知进程池中任务的方法,等待进程池的任务全部执行完  pool.ready()  # 如果调用完成 返回 True    pool.terminate()  # 立即终止所有进程    for i in lis:        print(i.get()) #使用get来获取apply_aync的结果,如果是apply,则没有get方法,因为apply是同步执行,立刻获取结果,也根本无需get    print("主程序结束")

    map 传参

    import timefrom multiprocessing import Process,Pooldef func(n):    print(n)if __name__ == '__main__':    pool = Pool(4)    # pool.map(func,range(100)) #参数是可迭代的    pool.map(func,['sb',(1,2)]) #参数是可迭代的    # pool.map(func,range(100))  #map自带join功能,异步执行任务,参数是可迭代的

    回调函数 callback

    • 回调函数的形参只能有一个 如果执行函数有多个返回值 那么 回调函数 接收的是一个元祖 包含所有执行函数的返回值

      import time, osfrom multiprocessing import Process, Pooldef func1(n):    print(os.getpid())    n += 10    return ndef func2(nn):    # 回调函数 接收 func1 的返回值    print(os.getpid())    print(nn)    return 10if __name__ == '__main__':    pool = Pool()    print(os.getpid())    lis = []    c = pool.apply_async(func1, args=(1,), callback=func2)  # 将func1 反回的结果交给func2 执行    pool.close()  # 不是关闭进程池,而是不允许再有其他任务来使用进程池    pool.join()  # 这是感知进程池中任务的方法,等待进程池的任务全部执行完

多进程爬虫

from multiprocessing import Process, Poolfrom urllib.request import urlopenimport ssl, re# ⼲掉数字签名证书ssl._create_default_https_context = ssl._create_unverified_contexturls = [    'https://www.baidu.com',    'https://www.python.org',    'https://www.openstack.org',    'https://help.github.com/',    'http://www.sina.com.cn/']def func1(path):    # 打开网址    condent = urlopen(path)    # 返回网页源代码    return condent.read().decode("utf-8")def func2(content):    com = re.compile(r"
.*?)") cont = re.findall(com, content) print(cont)if __name__ == '__main__': pool = Pool() for path in urls: tar = pool.apply_async(func1, args=(path,), callback=func2) pool.close() pool.join()

转载于:https://www.cnblogs.com/zhang-zi-yi/p/10755810.html

你可能感兴趣的文章
powershell
查看>>
bzoj1925(SCOI2010)地精部落
查看>>
Yii PHP 框架分析 (一)
查看>>
Axure RP-->如何使生成的原型页面左侧不带框架(转)
查看>>
(1)安装----anaconda3下配置pyspark【单机】
查看>>
怎么提高点数据加载速度呢?
查看>>
CentOS 7 systemd的坑
查看>>
WPF学习笔记:获取ListBox的选中项
查看>>
FeatureLayer.MODE_SNAPSHOT限制数量问题
查看>>
Go数据结构之Queue
查看>>
例题7-6 UVa140 Bandwidth(枚举+剪枝)
查看>>
java中static关键字的作用
查看>>
使用UIPageControl UIScrollView制作APP引导界面
查看>>
PyQt4 ShowHMDB show sqlite3 with QTableWidget summary
查看>>
你所能用到的BMP格式介绍(一)
查看>>
题目1489:计算两个矩阵的乘积
查看>>
raid
查看>>
LoadRunner学习第三天
查看>>
[转]缺少 ; (在标识符 PhysicalMediumType 的前面)
查看>>
redis
查看>>