博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
python自定义线程池
阅读量:5898 次
发布时间:2019-06-19

本文共 4460 字,大约阅读时间需要 14 分钟。

关于python的多线程,由与GIL的存在被广大群主所诟病,说python的多线程不是真正的多线程。但多线程处理IO密集的任务效率还是可以杠杠的。

我实现的这个线程池其实是根据银角的思路来实现的。

主要思路:


 

  任务获取和执行:

  1、任务加入队列,等待线程来获取并执行。

  2、按需生成线程,每个线程循环取任务。

  线程销毁:

  1、获取任务是终止符时,线程停止。

  2、线程池close()时,向任务队列加入和已生成线程等量的终止符。

  3、线程池terminate()时,设置线程下次任务取到为终止符。

流程概要设计:


详细代码:

  


 

import threadingimport contextlibfrom Queue import Queueimport timeclass ThreadPool(object):    def __init__(self, max_num):        self.StopEvent = 0#线程任务终止符,当线程从队列获取到StopEvent时,代表此线程可以销毁。可设置为任意与任务有区别的值。        self.q = Queue()        self.max_num = max_num  #最大线程数        self.terminal = False   #是否设置线程池强制终止        self.created_list = [] #已创建线程的线程列表        self.free_list = [] #空闲线程的线程列表        self.Deamon=False #线程是否是后台线程    def run(self, func, args, callback=None):        """        线程池执行一个任务        :param func: 任务函数        :param args: 任务函数所需参数        :param callback:        :return: 如果线程池已经终止,则返回True否则None        """        if len(self.free_list) == 0 and len(self.created_list) < self.max_num:            self.create_thread()        task = (func, args, callback,)        self.q.put(task)    def create_thread(self):        """        创建一个线程        """        t = threading.Thread(target=self.call)        t.setDaemon(self.Deamon)        t.start()        self.created_list.append(t)#将当前线程加入已创建线程列表created_list    def call(self):        """        循环去获取任务函数并执行任务函数        """        current_thread = threading.current_thread()   #获取当前线程对象·        event = self.q.get()    #从任务队列获取任务        while event != self.StopEvent:   #判断获取到的任务是否是终止符            func, arguments, callback = event#从任务中获取函数名、参数、和回调函数名            try:                result = func(*arguments)                func_excute_status =True#func执行成功状态            except Exception as e:                func_excute_status = False                result =None                print '函数执行产生错误', e#打印错误信息            if func_excute_status:#func执行成功后才能执行回调函数                if callback is not None:#判断回调函数是否是空的                    try:                        callback(result)                    except Exception as e:                        print '回调函数执行产生错误', e  # 打印错误信息            with self.worker_state(self.free_list,current_thread):                #执行完一次任务后,将线程加入空闲列表。然后继续去取任务,如果取到任务就将线程从空闲列表移除                if self.terminal:#判断线程池终止命令,如果需要终止,则使下次取到的任务为StopEvent。                    event = self.StopEvent                else: #否则继续获取任务                    event = self.q.get()  # 当线程等待任务时,q.get()方法阻塞住线程,使其持续等待        else:#若线程取到的任务是终止符,就销毁线程            #将当前线程从已创建线程列表created_list移除            self.created_list.remove(current_thread)    def close(self):        """        执行完所有的任务后,所有线程停止        """        full_size = len(self.created_list)#按已创建的线程数量往线程队列加入终止符。        while full_size:            self.q.put(self.StopEvent)            full_size -= 1    def terminate(self):        """        无论是否还有任务,终止线程        """        self.terminal = True        while self.created_list:            self.q.put(self.StopEvent)        self.q.queue.clear()#清空任务队列    def join(self):        """        阻塞线程池上下文,使所有线程执行完后才能继续        """        for t in self.created_list:            t.join()    @contextlib.contextmanager#上下文处理器,使其可以使用with语句修饰    def worker_state(self, state_list, worker_thread):        """        用于记录线程中正在等待的线程数        """        state_list.append(worker_thread)        try:            yield        finally:            state_list.remove(worker_thread)if __name__ == '__main__':    def Foo(arg):        return arg        # time.sleep(0.1)    def Bar(res):        print res    pool=ThreadPool(5)    # pool.Deamon=True#需在pool.run之前设置    for i in range(1000):        pool.run(func=Foo,args=(i,),callback=Bar)    pool.close()    pool.join()    # pool.terminate()    print "任务队列里任务数%s" %pool.q.qsize()    print "当前存活子线程数量:%d" % threading.activeCount()    print "当前线程创建列表:%s" %pool.created_list    print "当前线程创建列表:%s" %pool.free_list
详细代码

 

 关于上下文处理:


 

来个简单例子说明: 下面的代码手动自定义了一个myopen方法,模拟我们常见的with open() as f:语句。具体的contextlib模块使用,会单独开章来将。
# coding:utf-8import contextlib@contextlib.contextmanager  #定义该函数支持上下文with语句def myopen(filename,mode):    f=open(filename,mode)    try:        yield f.readlines()  #正常执行返回f.readlines()    except Exception as e:        print e    finally:        f.close()  #最后在with代码快执行完毕后返回执行finally下的f.close()实现关闭文件if __name__ == '__main__':    with myopen(r'c:\ip1.txt','r') as f:        for line in f:            print line

总结


  实现这个线程池我吐血三升啊。

转载于:https://www.cnblogs.com/tkqasn/p/5711593.html

你可能感兴趣的文章
php5安装
查看>>
稀疏表达:向量、矩阵与张量(上)
查看>>
java反射(二)
查看>>
非常不错的Jquery图片截取插件
查看>>
OCP终于考完了
查看>>
微软RemoteApp单点登陆(SSO)
查看>>
Oracle性能调优之最小化应用负载
查看>>
ab 测试命令
查看>>
irf
查看>>
我的友情链接
查看>>
我的友情链接
查看>>
正则表达式语法
查看>>
IPython安装过程 @win7 64bit
查看>>
SQL92标准与SQL99标准查询
查看>>
使用SharePoint对象SPFieldCollection获取列表所有字段信息
查看>>
mysql各版本的新特性整理
查看>>
Nginx的ip_hash解析
查看>>
判断浏览器版本
查看>>
我的友情链接
查看>>
AnySDK+GooglePlay对接1
查看>>