【前言】最近一段时间在学习使用Python进行异步编程,与C++、Java等编译型语言类型类似,Python也提供了一些可用于异步编程的内建模块。例如,支持多线程处理的若处理的任务属于CPU密集型,则应使用多进程;若处理的任务属于I/O密集型,则应使用多线程。本文着重介绍使用Python多线程进行异步编程,以后有时间的话,会另写一篇使用Python多进程进行异步编程的文章。
本篇文章也假设读者已经明白“阻塞”、“异步”、“锁”等术语背后的基本原理,本篇文章的所有示例均基于threading 模块。
本篇文章的解释内容包括:创建线程
线程锁
关闭/中断线程
基于线程实现取消服务。
【创建线程】Python中可通过调用threading.Thread直接创建线程,如下所示,调用threading.Thread通常需要传入两个参数:target为“线程函数”;args为传递给“线程函数”的参数,必须为tuple类型。
import threading
def worker(name):
print("thread %s is running" % name)
worker = threading.Thread(target=worker, args=('worker',))
worker.start()
worker.join()也可通过继承threading.Thread类创建线程,如下所示。其中super(MyThread, self).__init__()表示对继承自父类(threading.Thread)的属性进行初始化,并用父类的初始化方法来初始化继承的属性。此外,继承自threading.Thread类的子类必须具有run()方法。线程启动时,自动调用每个线程实例的run()方法。
import threading
import time
class MyThread(threading.Thread):
def __init__(self, delay):
super(MyThread, self).__init__()
self.delay = delay
def run(self):
while True:
print("sleep %ds" % self.delay)
time.sleep(self.delay)
delays = [2, 4, 6]
threads = []
for delay in delays:
threads.append(MyThread(delay))
for t in threads:
t.start()
for t in threads:
t.join()
运行结果如下所示
如果多个线程同时对某个资源进行“写操作”,则可能会出现不可预料的结果。考虑这样一种情况:一个列表里所有元素都是0,线程"set"从后向前把所有元素改成1,而线程"print"负责从前往后读取列表并打印。那么,可能线程"set"开始改的时候,线程"print"便来打印列表了,输出就成了一半0一半1,这就是数据的不同步。因此,为了避免这种情况即为了保证资源的正确性,需要引入锁的概念。锁有两种状态——锁定和未锁定。每当一个线程比如"set"要访问共享数据时,必须先获得锁定;如果已经有别的线程比如"print"获得锁定了,那么就让线程"set"暂停,也就是同步阻塞;等到线程"print"访问完毕,释放锁以后,再让线程"set"继续。
使用 Thread 对象的 Lock 和 Rlock 可以实现简单的线程同步,这两个对象都有 acquire 方法和 release 方法,对于那些每次只允许一个线程操作的资源,可以将对其进行的操作放到 acquire 和 release 方法之间,如下所示。
import threading
import time
class myThread (threading.Thread):
def __init__(self, threadID, name, counter):
#super(myThread, self).__init__()
threading.Thread.__init__(self)
self.threadID = threadID
self.name = name
self.counter = counter
def run(self):
print ("开启线程: " + self.name)
# 获取锁,用于线程同步
threadLock.acquire()
print_time(self.name, self.counter, 3)
# 释放锁,开启下一个线程
threadLock.release()
def print_time(threadName, delay, counter):
while counter:
time.sleep(delay)
print ("%s: %s" % (threadName, time.ctime(time.time())))
counter -= 1
threadLock = threading.Lock()
threads = []
# 创建新线程
thread1 = myThread(1, "Thread-1", 1)
thread2 = myThread(2, "Thread-2", 2)
# 开启新线程
thread1.start()
thread2.start()
# 添加线程到线程列表
threads.append(thread1)
threads.append(thread2)
# 等待所有线程完成
for t in threads:
t.join()
print ("退出主线程")
运行结果如下所示
当然,Python 的 Queue 模块中提供了同步的线程安全的队列类,包括FIFO(先入先出)队列Queue,LIFO(后入先出)队列LifoQueue,和优先级队列 PriorityQueue。这些队列都实现了锁原语,能够在多线程中直接使用,可以使用队列来实现线程间的同步。此处,不再赘述。
【关闭/中断线程】借助
def _async_raise(tid, exctype):
"""Raises an exception in the threads with id tid"""
if not inspect.isclass(exctype):
raise TypeError("Only types can be raised (not instances)")
res = ctypes.pythonapi.PyThreadState_SetAsyncExc(ctypes.c_long(tid), ctypes.py_object(exctype))
if res == 0:
raise ValueError("invalid thread id")
elif res != 1:
# """if it returns a number greater than one, you're in trouble,
# and you should call it again with exc=NULL to revert the effect"""
ctypes.pythonapi.PyThreadState_SetAsyncExc(tid, None)
raise SystemError("PyThreadState_SetAsyncExc failed")
def stop_thread(thread):
_async_raise(thread.ident, SystemExit)更合理的方式是使用threading.Event()创建一个事件管理标记flag,然后线程在运行过程中以极短的时间间隔访问flag的状态值,一旦flag的状态值满足“关闭线程”,立刻执行关闭线程操作。
class MyThread(threading.Thread):
"""Thread class with a stop() method. The thread itself has to check
regularly for the stopped() condition."""
def __init__(self, *args, **kwargs):
super(MyThread, self).__init__(*args, **kwargs)
self._stop_event = threading.Event()
def stop(self):
self._stop_event.set()
def stopped(self):
return self._stop_event.is_set()
def run(self):
while True:
#做任务
print("sleep 1s")
time.sleep(1)
if self.stopped():
# 做一些必要的收尾工作
break
【基于线程实现取消服务】
考虑这样一个场景:调度器A和执行器B之间通过web通信,调度器A给执行器B传入“执行”参数,执行器B基于单线程或多线程开始执行任务;之后,调度器A给执行器B传入了“取消”参数,执行器B取消当前正在执行的任务。那如何实现“取消”服务呢?
解决思路:当调度器A给执行器B传递参数后,执行器B端首先判断参数为“执行”操作还是“取消”操作。若为“执行”操作,执行器B端新建一个“线程池”并开始执行任务;若为“取消”操作,执行器B端将“线程池”中的线程资源利用“threading.Event()”的方式合理关闭,并清理或重新初始化“线程池”。需要注意的是,执行器B端的“线程池”对象需要为全局变量。Demo代码如下所示。
模拟调度器A端代码
import requests
#json_data = {"cancel": "yes"}
json_data = {"nums": 2, "cancel": "no"}
r = requests.post("http://127.0.0.1:5000/access_api", json=json_data)
print(r.text, type(r.text))
模拟执行器B端封装web API代码
import json
from flask import Flask, request
from test_API.test_process import MyThread
app = Flask(__name__)
threads = []
@app.route('/')
def hello_world():
return 'hello world'
@app.route('/access_api', methods=['POST'])
def access_api():
#print(request.headers)
global threads#声明变量类型为全局变量
state = None
try:
print(threads)
cancel = request.json['cancel']
if cancel == "no":
print("for testing of performing")
nums = request.json['nums']
for i in range(nums):
threads.append(MyThread(i+1))
for t in threads:
t.start()
elif cancel == "yes":
print('for testing of canceling')
for t in threads:
t.stop()#MyThread.stop()
threads = []
state = 'Successful'
except:
state = 'Failed'
finally:
return state
if __name__ == '__main__':
app.run(host='0.0.0.0', port=5000, debug=True)
#本地访问ip为127.0.0.1
#局域网其它电脑访问:提供本电脑的ip地址
模拟执行器B端执行逻辑代码
import json
import time
import threading
class MyThread(threading.Thread):
"""Thread class with a stop() method. The thread itself has to check
regularly for the stopped() condition."""
def __init__(self, delay):
super(MyThread, self).__init__()
self._stop_event = threading.Event()
self.delay = delay
def stop(self):
self._stop_event.set()
def stopped(self):
return self._stop_event.is_set()
def run(self):
print("begin run the child thread")
while True:
print("sleep %ds" % self.delay)
time.sleep(self.delay)
if self.stopped():
# 做一些必要的收尾工作
break
当cancel值为no时,测试结果如下所示
重新传入cancel值为yes时,测试结果如下
再重新传入cancel值为no时,测试结果如下小结1:基于以上测试结果,demo代码可实现了“取消”服务,而且“取消”方式也不是采用抛出异常的方式强制关闭线程,而是采用设置线程状态标记值的方式合理关闭。但请注意,采用设置线程状态标记值的方式关闭线程,只能在当前正在运行的这次循环结束之后起作用。也就是说,在执行下次循环中的print时,线程检测到标记值已被重置,因此退出循环。这种行为的典型应用场景是服务器监听客户端的访问这种类型!
小结2:当然,如果线程之间不涉及到“锁”,采用抛出异常的方式也可!该种方式适用于单次且耗时任务的执行!比如,从mysql数据库迁移数据至oracle数据库。整个过程涉及到数据库读、数据库写,比较耗时。此时,若想立刻结束该服务,应采用抛出异常的方式立即中断线程。但应做好数据库的回滚等后续处理,尽量降低“不可预期行为”发生的可能性。
小结3:对“线程池”资源threads粗暴地进行重新赋值的方式并不可取,应该对“线程池”资源进行合理释放并重新初始化。
就写这么多吧,欢迎讨论指正!!!