在日常任务中,难免会碰到“大数据”处理,那么并发编程就是一项必须的技能了。对于脚本语言而言,并发是一件的很容易的事情,难的是如何合理利用资源。

在学习的道路上,也是看到了很多种多线程的解决方案,遂做记录,以便日后使用。

0x00 虚拟场景

假设现在有一项任务处理流程fun如下,返回值在多线程中一般用不到,这里不做讨论。

task_list = range(30000)

# 虚拟任务
def fun(task):
    task += task // 2
    return task

0x10 遍历

0x11 暴力枚举

暴力枚举人如其名,遍历任务,依次创建子线程

import threading	# 多线程库
# 暴力枚举
def rangeThreadFun(task_list):
    thread_list = []
    for task in task_list:
        thread = threading.Thread(target=fun, args=(task,))
        thread_list.append(thread)
        thread.start()

    for thread in thread_list:
        thread.join()

这里使用了另外两个库,用于对程序的时间消耗和空间使用进行统计

import time			# 时间,用于计算耗时
import psutil		# 三方库,用于计算空间消耗

这是极其浪费资源的一种写法,运行结果如下:

耗时2.5s,消耗空间8020KB

0x12 限制枚举

原理还是枚举,但是使用BoundedSemaphore方法使用锁限制了线程数,这样可以避免消耗额外资源,但是耗时差不多。

# 需要对锁进行操作,所以需要重写方法fun为fun2
def fun2(task):
    task += 1
    lock.release()
    return task

# 使用锁限制线程数
def lockRangeThreadFun(task_list):
    for task in task_list:
        lock.acquire()
        thread = threading.Thread(target=fun2, args=(task,))
        thread.start()

可以看到消耗空间明显变小了,但是时间变化不大

耗时2.47秒,消耗空间104KB

0x13 分组遍历

分组遍历是我最爱用的方法,虽然写起来比较复杂,但是在理想情况下,效率是最高的。它是这样做的

# 首先需要封装一个处理一组任务的线程方法
def threadFun(task_list):
    for task in task_list:
        fun(task)

# 分组遍历开启多线程
def groupRangeThreadFun(task_list, thread_num):
    thread_step = len(task_list) // thread_num + 1
    thread_list = []

    for i in range(thread_num):
        start = i * thread_step
        end = start + thread_step
		# 将任务分成多组,每组开启个子线程
        thread = threading.Thread(target=threadFun, args=(task_list[start:end],))
        thread.start()
        thread_list.append(thread)

    for thread in thread_list:
        thread.join()

运行结果如下

很震惊吧,又快又不消耗空间:耗时0.005s,消耗空间108KB

0x20 进阶

0x21 queue队列

队列具有多种模式

模式

解释

queue.Queue

先进先出 FIFO

queue.LifoQueue

后进先出

queue.PriorityQueue

自定义进出顺序

queue.SimpleQueue

简单的FIFO 队列,缺少任务跟踪等高级功能。

这里使用最简单先进先出即可,队列有多种写法,比较普遍的写法如下:

import queue
# 封装队列子方法
def queueFun(q):
    while True:
        task = q.get()
        if task is None:
            break
        fun(task)

# 使用队列多线程
def queueThreadFun(task_list, thread_num):
    q = queue.Queue()   # 创建一个队列
    thread_list = []
    # 创建线程
    for i in range(thread_num):
        t = threading.Thread(target=queueFun, args=(q,))
        t.start()
        thread_list.append(t)
    # 将任务填入队列
    for task in task_list:
        q.put(task)
    # 完成后关闭线程,传入None跳出死循环
    for i in range(thread_num):
        q.put(None)
    # 等待所有任务完成
    for t in thread_list:
        t.join()

运行结果如下:

这个方法的消耗空间一会大一会小,就取中间值1036KB,耗时0.12s

0x22 线程池方法

目前使用的较新的版本的Python是自带了多线程的库的。

from concurrent.futures import ThreadPoolExecutor
def threadPoolFun(task_list, thread_num):
    pool = ThreadPoolExecutor(max_workers=thread_num)
    thread_list = []
    for task in task_list:
        thread = pool.submit(fun, task)
        thread_list.append(thread)

    pool.shutdown()

运行结果如下

耗时0.22s,消耗空间12800KB

0x30 总结

对这几种方法进行统计

方法

时间(s)

空间(KB)

暴力枚举

2.54

8020

限制枚举

2.47

104

分组遍历

0.005

108

queue队列

0.12

1036

线程池

0.22

12800

可以看到,暴力枚举的方式即费时,又消耗空间。限制枚举的方式消耗空间甚至比分组遍历的消耗空间还小,但是用时却和暴力枚举差不多,这是因为限制枚举和分组遍历的同时存在的线程数都限制在了30,因此消耗空间差不多。但是用时方面枚举法远大于分组法则是因为任务函数fun太过简单,创建线程的时间占比过高,前两者均创建了三万次线程,而后者指创建了30次线程,因此时间消耗就小。在实际的使用中,如果任务函数fun很费时,那么这点时间是基本没什么影响了。

再说队列和线程池的方法,队列的方法其实是线程池的雏形,但是线程池会有更多的资源调度以及任务选择方案等等方法,导致了线程池消耗了大量空间。

至于该用什么方法,全凭使用场景和个人喜好了。当然不止这几种姿势,如果你有更骚的操作,欢迎留言讨论。