天天看点

python ctypes 多线程_Python多线程一览

【前言】最近一段时间在学习使用Python进行异步编程,与C++、Java等编译型语言类型类似,Python也提供了一些可用于异步编程的内建模块。例如,支持多线程处理的若处理的任务属于CPU密集型,则应使用多进程;若处理的任务属于I/O密集型,则应使用多线程。本文着重介绍使用Python多线程进行异步编程,以后有时间的话,会另写一篇使用Python多进程进行异步编程的文章。

本篇文章也假设读者已经明白“阻塞”、“异步”、“锁”等术语背后的基本原理,本篇文章的所有示例均基于threading 模块。

本篇文章的解释内容包括:创建线程

python ctypes 多线程_Python多线程一览

线程锁

python ctypes 多线程_Python多线程一览

关闭/中断线程

python ctypes 多线程_Python多线程一览

基于线程实现取消服务。

【创建线程】Python中可通过调用threading.Thread直接创建线程,如下所示,调用threading.Thread通常需要传入两个参数:target为“线程函数”;args为传递给“线程函数”的参数,必须为tuple类型。

import threading

def worker(name):

print("thread %s is running" % name)

worker = threading.Thread(target=worker, args=('worker',))

worker.start()

worker.join()也可通过继承threading.Thread类创建线程,如下所示。其中super(MyThread, self).__init__()表示对继承自父类(threading.Thread)的属性进行初始化,并用父类的初始化方法来初始化继承的属性。此外,继承自threading.Thread类的子类必须具有run()方法。线程启动时,自动调用每个线程实例的run()方法。

import threading

import time

class MyThread(threading.Thread):

def __init__(self, delay):

super(MyThread, self).__init__()

self.delay = delay

def run(self):

while True:

print("sleep %ds" % self.delay)

time.sleep(self.delay)

delays = [2, 4, 6]

threads = []

for delay in delays:

threads.append(MyThread(delay))

for t in threads:

t.start()

for t in threads:

t.join()

运行结果如下所示

如果多个线程同时对某个资源进行“写操作”,则可能会出现不可预料的结果。考虑这样一种情况:一个列表里所有元素都是0,线程"set"从后向前把所有元素改成1,而线程"print"负责从前往后读取列表并打印。那么,可能线程"set"开始改的时候,线程"print"便来打印列表了,输出就成了一半0一半1,这就是数据的不同步。因此,为了避免这种情况即为了保证资源的正确性,需要引入锁的概念。锁有两种状态——锁定和未锁定。每当一个线程比如"set"要访问共享数据时,必须先获得锁定;如果已经有别的线程比如"print"获得锁定了,那么就让线程"set"暂停,也就是同步阻塞;等到线程"print"访问完毕,释放锁以后,再让线程"set"继续。

使用 Thread 对象的 Lock 和 Rlock 可以实现简单的线程同步,这两个对象都有 acquire 方法和 release 方法,对于那些每次只允许一个线程操作的资源,可以将对其进行的操作放到 acquire 和 release 方法之间,如下所示。

import threading

import time

class myThread (threading.Thread):

def __init__(self, threadID, name, counter):

#super(myThread, self).__init__()

threading.Thread.__init__(self)

self.threadID = threadID

self.name = name

self.counter = counter

def run(self):

print ("开启线程: " + self.name)

# 获取锁,用于线程同步

threadLock.acquire()

print_time(self.name, self.counter, 3)

# 释放锁,开启下一个线程

threadLock.release()

def print_time(threadName, delay, counter):

while counter:

time.sleep(delay)

print ("%s: %s" % (threadName, time.ctime(time.time())))

counter -= 1

threadLock = threading.Lock()

threads = []

# 创建新线程

thread1 = myThread(1, "Thread-1", 1)

thread2 = myThread(2, "Thread-2", 2)

# 开启新线程

thread1.start()

thread2.start()

# 添加线程到线程列表

threads.append(thread1)

threads.append(thread2)

# 等待所有线程完成

for t in threads:

t.join()

print ("退出主线程")

运行结果如下所示

当然,Python 的 Queue 模块中提供了同步的线程安全的队列类,包括FIFO(先入先出)队列Queue,LIFO(后入先出)队列LifoQueue,和优先级队列 PriorityQueue。这些队列都实现了锁原语,能够在多线程中直接使用,可以使用队列来实现线程间的同步。此处,不再赘述。

【关闭/中断线程】借助

def _async_raise(tid, exctype):

"""Raises an exception in the threads with id tid"""

if not inspect.isclass(exctype):

raise TypeError("Only types can be raised (not instances)")

res = ctypes.pythonapi.PyThreadState_SetAsyncExc(ctypes.c_long(tid), ctypes.py_object(exctype))

if res == 0:

raise ValueError("invalid thread id")

elif res != 1:

# """if it returns a number greater than one, you're in trouble,

# and you should call it again with exc=NULL to revert the effect"""

ctypes.pythonapi.PyThreadState_SetAsyncExc(tid, None)

raise SystemError("PyThreadState_SetAsyncExc failed")

def stop_thread(thread):

_async_raise(thread.ident, SystemExit)更合理的方式是使用threading.Event()创建一个事件管理标记flag,然后线程在运行过程中以极短的时间间隔访问flag的状态值,一旦flag的状态值满足“关闭线程”,立刻执行关闭线程操作。

class MyThread(threading.Thread):

"""Thread class with a stop() method. The thread itself has to check

regularly for the stopped() condition."""

def __init__(self, *args, **kwargs):

super(MyThread, self).__init__(*args, **kwargs)

self._stop_event = threading.Event()

def stop(self):

self._stop_event.set()

def stopped(self):

return self._stop_event.is_set()

def run(self):

while True:

#做任务

print("sleep 1s")

time.sleep(1)

if self.stopped():

# 做一些必要的收尾工作

break

【基于线程实现取消服务】

考虑这样一个场景:调度器A和执行器B之间通过web通信,调度器A给执行器B传入“执行”参数,执行器B基于单线程或多线程开始执行任务;之后,调度器A给执行器B传入了“取消”参数,执行器B取消当前正在执行的任务。那如何实现“取消”服务呢?

解决思路:当调度器A给执行器B传递参数后,执行器B端首先判断参数为“执行”操作还是“取消”操作。若为“执行”操作,执行器B端新建一个“线程池”并开始执行任务;若为“取消”操作,执行器B端将“线程池”中的线程资源利用“threading.Event()”的方式合理关闭,并清理或重新初始化“线程池”。需要注意的是,执行器B端的“线程池”对象需要为全局变量。Demo代码如下所示。

模拟调度器A端代码

import requests

#json_data = {"cancel": "yes"}

json_data = {"nums": 2, "cancel": "no"}

r = requests.post("http://127.0.0.1:5000/access_api", json=json_data)

print(r.text, type(r.text))

模拟执行器B端封装web API代码

import json

from flask import Flask, request

from test_API.test_process import MyThread

app = Flask(__name__)

threads = []

@app.route('/')

def hello_world():

return 'hello world'

@app.route('/access_api', methods=['POST'])

def access_api():

#print(request.headers)

global threads#声明变量类型为全局变量

state = None

try:

print(threads)

cancel = request.json['cancel']

if cancel == "no":

print("for testing of performing")

nums = request.json['nums']

for i in range(nums):

threads.append(MyThread(i+1))

for t in threads:

t.start()

elif cancel == "yes":

print('for testing of canceling')

for t in threads:

t.stop()#MyThread.stop()

threads = []

state = 'Successful'

except:

state = 'Failed'

finally:

return state

if __name__ == '__main__':

app.run(host='0.0.0.0', port=5000, debug=True)

#本地访问ip为127.0.0.1

#局域网其它电脑访问:提供本电脑的ip地址

模拟执行器B端执行逻辑代码

import json

import time

import threading

class MyThread(threading.Thread):

"""Thread class with a stop() method. The thread itself has to check

regularly for the stopped() condition."""

def __init__(self, delay):

super(MyThread, self).__init__()

self._stop_event = threading.Event()

self.delay = delay

def stop(self):

self._stop_event.set()

def stopped(self):

return self._stop_event.is_set()

def run(self):

print("begin run the child thread")

while True:

print("sleep %ds" % self.delay)

time.sleep(self.delay)

if self.stopped():

# 做一些必要的收尾工作

break

当cancel值为no时,测试结果如下所示

重新传入cancel值为yes时,测试结果如下

再重新传入cancel值为no时,测试结果如下小结1:基于以上测试结果,demo代码可实现了“取消”服务,而且“取消”方式也不是采用抛出异常的方式强制关闭线程,而是采用设置线程状态标记值的方式合理关闭。但请注意,采用设置线程状态标记值的方式关闭线程,只能在当前正在运行的这次循环结束之后起作用。也就是说,在执行下次循环中的print时,线程检测到标记值已被重置,因此退出循环。这种行为的典型应用场景是服务器监听客户端的访问这种类型!

小结2:当然,如果线程之间不涉及到“锁”,采用抛出异常的方式也可!该种方式适用于单次且耗时任务的执行!比如,从mysql数据库迁移数据至oracle数据库。整个过程涉及到数据库读、数据库写,比较耗时。此时,若想立刻结束该服务,应采用抛出异常的方式立即中断线程。但应做好数据库的回滚等后续处理,尽量降低“不可预期行为”发生的可能性。

小结3:对“线程池”资源threads粗暴地进行重新赋值的方式并不可取,应该对“线程池”资源进行合理释放并重新初始化。

就写这么多吧,欢迎讨论指正!!!