python-进程

并发: 任务数大于cpu个数

并行: cpu个数和任务数相同

GIL 锁: 任何python进程中,一次永远只有一个线程运行

一个python进程 只能执行一个线程

创建进程
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
## 不同操作系统创建进程的区别:

#linux上:   fork() 
import os
import time

pid = os.fork() # 这个地方会创建一个子进程, 他的pid号值永远为0
if pid == 0:
    print("我是子进程")
    time.sleep(3)
else:
    print("我是父进程")
    time.sleep(3)




# win上创建进程 类似于导入机制   都通用的
import multiprocessing

def func():
    print("000")

if __name__ == '__main__':
    p = multiprocessing.Process(target=func)  # 生成进程, 传参target= ,args=
    p.start()       # 开启进程, 相当于一个子进程
进程标识
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
# 进程标识  pid

import time
import os
import multiprocessing

def func():
    time.sleep(5)

if __name__ == "__main__":
    print("main pid ", os.getpid())   # 当前进程的
	print(multiprocessing.current_process().pid)
    p = multiprocessing.Process(target=func)
    print(p.pid)
    p.start()     # start之后才有pid号
    print(p.pid)
    time.sleep(10)
# 线程的标识是 ident


# 操作系统调用的是进程
"""
注意!
操作系统并不能看到线程的标识。
因为,线程是由Python解释器
来负责调度的。

操作系统仅需要调度进程就行了
"""
守护进程
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
# 守护进程  daemon = True  主进程结束之后,子进程跟着结束(某子进程的生命周期随着主进程)
import time
import multiprocessing

def func():
    time.sleep(20)
    print("子进程结束")

if __name__ == "__main__":
    p = multiprocessing.Process(target=func, daemon = True)
    p.daemon = True   # 设置成守护进程 p True会随着主进程结束而结束, 主进程不会等待子进程结束
    p.start()
    time.sleep(3)
终止进程
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
# 终止进程
import multiprocessing
import time

def func():
    time.sleep(5)
    print(multiprocessing.current_process())


if __name__ == "__main__":
    p = multiprocessing.Process(target=func)
    p.start()
    time.sleep(2) # 主进程2s
    p.terminate()   # 结束,主进程结束,不管子进程有没有结束,就终止子进程(线程没有这个)
面向对象
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
# 面向对象 类继承创建进程
# start() --> run(已经是在新的进程了) --> target   # target是由默认的run运行
import multiprocessing
import time

class My_Process(multiprocessing.Process):

    def __init__(self, *args, **kwargs):
        super().__init__(args, kwargs)
        print("初始化...")

    def run(self):
        """
        start 默认调用的方法   重写啦在这里
        :return:
        """
        print("run...")
        self.task()
        time.sleep(5)

    def task(self):
        print("task...")


if __name__ == "__main__":
    p = My_Process()
    p.start()   # start 方法会调用run

# 创建进程的方式 : multiprocessing.Process     类继承,重写run     linux下:fork
# 如果换成是线程的话:换掉继承类就行
进程通信
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
# -*- coding:utf-8 -*-

import multiprocessing
from multiprocessing import Manager  # 管理器


def func(l):
    l.append(111)

if __name__ == '__main__':
    manager = Manager()  # 实例化  先开启一个公共进程,并返回一个管理器
    l = manager.list()   # 开启空间,左边就是代理
    # l = manager.dict()
    """
    一般常用的空间类型是:
    1.  mgr.list()
    2.  mgr.dict()
    3.  mgr.Queue()
    """
    print(l)
    p = multiprocessing.Process(target=func, args=(l,))
    p.start()
    p.join()
    print(l)   # 这样使用manager之后 l就是【共享】的了
    

进程池 & 线程池

ps:有点乱

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
# from multiprocessing import Pool   # 进程池
from multiprocessing.dummy import Pool  #线程池
from multiprocessing.pool import ThreadPool # 线程池
import threading
import time


def func(i):
    print("{}-------555".format(i))
    time.sleep(2)
    return i

def print_back(*args, **kwargs):
    print("处理数据完成",args, kwargs)

pool = Pool(6)   # 不写的话 默认是cpu的个数
# print(threading.active_count())

for i in range(5):
    pool.apply_async(func=func, args=(i,), callback=print_back)  ## 添加任务   不阻塞    主要使用的方法
#     pool.apply(func=func, )  ## 添加任务   阻塞
# pool.map(func, [i for i in range(5)])   #添加任务  不阻塞

pool.close()   #关闭线程池  不在提交新的任务
pool.join()    #等待进程池中的任务执行完毕
print("任务结束")


########### 线程池的步骤
p = ThreadPool(3) # 实例化
p.apply_async(func) # 函数      # 可以将返回值.get() 但是get也会i阻塞
p.close()
p.join()   # join()语句要放在close()语句后面


# 进程池比线程池耗费资源


# 可以将返回值.get() 但是get也会i阻塞
async_result = p.apply_async(func) # 函数
print(async_result.get())
使用进程池来实现并发服务器
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
# 使用池来实现并发服务器
# 先开一个进程池, 每个进程下面再开一个线程

import socket
from multiprocessing import Pool, cpu_count
from multiprocessing.pool import ThreadPool


def woeker_thread(conn):  # 使用线程池来
    while True:
        recv_data =conn.recv(1000)
        if recv_data:
            print(recv_data)
            conn.send(recv_data)
        else:
            conn.close()
            break

def worker_process(server): # 使用进程池来接收套接字
    # pool = Pool(cpu_count()*2)     # 通常可以分配2倍的cpu个数
    pool = ThreadPool(cpu_count()) # 获取电脑核心数
    while True:
        conn, addr = server.accept()
        pool.apply_async(woeker_thread, args=(conn,))


if __name__ == '__main__':
    server = socket.socket()
    server.bind(('127.0.0.1', 8888))
    server.listen(1000)
    n = cpu_count()  # 获得当前计算机的cpu核心数量
    pool = Pool(n)
    for i in range(n):  # 充分利用cpu,为每个cpu分配一个进程
        # conn, addr = server.accept()
        pool.apply_async(func=worker_process, args=(server,))
        pool.apply_async(func=worker_process, args=(server,))
    pool.close()
    pool.join()
0%