在日常任务中,难免会碰到“大数据”处理,那么并发编程就是一项必须的技能了。对于脚本语言而言,并发是一件的很容易的事情,难的是如何合理利用资源。
在学习的道路上,也是看到了很多种多线程的解决方案,遂做记录,以便日后使用。
0x00 虚拟场景
假设现在有一项任务处理流程fun如下,返回值在多线程中一般用不到,这里不做讨论。
task_list = range(30000)
# 虚拟任务
def fun(task):
task += task // 2
return task
0x10 遍历
0x11 暴力枚举
暴力枚举人如其名,遍历任务,依次创建子线程
import threading # 多线程库
# 暴力枚举
def rangeThreadFun(task_list):
thread_list = []
for task in task_list:
thread = threading.Thread(target=fun, args=(task,))
thread_list.append(thread)
thread.start()
for thread in thread_list:
thread.join()
这里使用了另外两个库,用于对程序的时间消耗和空间使用进行统计
import time # 时间,用于计算耗时
import psutil # 三方库,用于计算空间消耗
这是极其浪费资源的一种写法,运行结果如下:
耗时2.5s,消耗空间8020KB
0x12 限制枚举
原理还是枚举,但是使用BoundedSemaphore方法使用锁限制了线程数,这样可以避免消耗额外资源,但是耗时差不多。
# 需要对锁进行操作,所以需要重写方法fun为fun2
def fun2(task):
task += 1
lock.release()
return task
# 使用锁限制线程数
def lockRangeThreadFun(task_list):
for task in task_list:
lock.acquire()
thread = threading.Thread(target=fun2, args=(task,))
thread.start()
可以看到消耗空间明显变小了,但是时间变化不大
耗时2.47秒,消耗空间104KB
0x13 分组遍历
分组遍历是我最爱用的方法,虽然写起来比较复杂,但是在理想情况下,效率是最高的。它是这样做的
# 首先需要封装一个处理一组任务的线程方法
def threadFun(task_list):
for task in task_list:
fun(task)
# 分组遍历开启多线程
def groupRangeThreadFun(task_list, thread_num):
thread_step = len(task_list) // thread_num + 1
thread_list = []
for i in range(thread_num):
start = i * thread_step
end = start + thread_step
# 将任务分成多组,每组开启个子线程
thread = threading.Thread(target=threadFun, args=(task_list[start:end],))
thread.start()
thread_list.append(thread)
for thread in thread_list:
thread.join()
运行结果如下
很震惊吧,又快又不消耗空间:耗时0.005s,消耗空间108KB
0x20 进阶
0x21 queue队列
队列具有多种模式
这里使用最简单先进先出即可,队列有多种写法,比较普遍的写法如下:
import queue
# 封装队列子方法
def queueFun(q):
while True:
task = q.get()
if task is None:
break
fun(task)
# 使用队列多线程
def queueThreadFun(task_list, thread_num):
q = queue.Queue() # 创建一个队列
thread_list = []
# 创建线程
for i in range(thread_num):
t = threading.Thread(target=queueFun, args=(q,))
t.start()
thread_list.append(t)
# 将任务填入队列
for task in task_list:
q.put(task)
# 完成后关闭线程,传入None跳出死循环
for i in range(thread_num):
q.put(None)
# 等待所有任务完成
for t in thread_list:
t.join()
运行结果如下:
这个方法的消耗空间一会大一会小,就取中间值1036KB,耗时0.12s
0x22 线程池方法
目前使用的较新的版本的Python是自带了多线程的库的。
from concurrent.futures import ThreadPoolExecutor
def threadPoolFun(task_list, thread_num):
pool = ThreadPoolExecutor(max_workers=thread_num)
thread_list = []
for task in task_list:
thread = pool.submit(fun, task)
thread_list.append(thread)
pool.shutdown()
运行结果如下
耗时0.22s,消耗空间12800KB
0x30 总结
对这几种方法进行统计
可以看到,暴力枚举的方式即费时,又消耗空间。限制枚举的方式消耗空间甚至比分组遍历的消耗空间还小,但是用时却和暴力枚举差不多,这是因为限制枚举和分组遍历的同时存在的线程数都限制在了30,因此消耗空间差不多。但是用时方面枚举法远大于分组法则是因为任务函数fun太过简单,创建线程的时间占比过高,前两者均创建了三万次线程,而后者指创建了30次线程,因此时间消耗就小。在实际的使用中,如果任务函数fun很费时,那么这点时间是基本没什么影响了。
再说队列和线程池的方法,队列的方法其实是线程池的雏形,但是线程池会有更多的资源调度以及任务选择方案等等方法,导致了线程池消耗了大量空间。
至于该用什么方法,全凭使用场景和个人喜好了。当然不止这几种姿势,如果你有更骚的操作,欢迎留言讨论。