歡迎來到Linux教程網
Linux教程網
Linux教程網
Linux教程網
Linux教程網 >> Linux編程 >> Linux編程 >> Python線程池實現

Python線程池實現

日期:2017/3/1 9:10:09   编辑:Linux編程

Python 的線程池主要有threadpool,不過它並不是內置的庫,每次使用都需要安裝,而且使用起來也不是那麼好用,所以自己寫了一個線程池實現,每次需要使用直接import即可。其中還可以根據傳入的特征量handlerkey來獲取每個任務的結果。

#!/bin/env python
# -*- coding:utf-8 -*-

"""
@lx
created on 2016-04-14
"""

import Queue
import sys
import threading
import time
import StringIO
import traceback

reload(sys)
sys.setdefaultencoding("utf8")


class MyThread(threading.Thread):
"""Background thread connected to the requests/results queues."""
def __init__(self, workQueue, resultQueue, timeout=0.1, **kwds):
threading.Thread.__init__(self, **kwds)
self.setDaemon(True)
self._workQueue = workQueue
self._resultQueue = resultQueue
self._timeout = timeout
self._dismissed = threading.Event()
self.start()

def run(self):
"""Repeatedly process the job queue until told to exit."""
while True:
if self._dismissed.isSet():
break

handlerKey = None # unique key
code = 0 # callback return code
handlerRet = None
errMsg = ""

try:
callable, args, kwds = self._workQueue.get(True, self._timeout)
except Queue.Empty:
continue
except:
exceptMsg = StringIO.StringIO()
traceback.print_exc(file=exceptMsg)
errMsg = exceptMsg.getvalue()
code = 3301 # system error
self._resultQueue.put(
(handlerKey, code, (callable, args, kwds), errMsg))
break

if self._dismissed.isSet():
self._workQueue.put((callable, args, kwds))
break

try:
if "handlerKey" in kwds:
handlerKey = kwds["handlerKey"]
handlerRet = callable(*args, **kwds) # block
self._resultQueue.put((handlerKey, code, handlerRet, errMsg))
except:
exceptMsg = StringIO.StringIO()
traceback.print_exc(file=exceptMsg)
errMsg = exceptMsg.getvalue()
code = 3303
self._resultQueue.put((handlerKey, code, handlerRet, errMsg))

def dismiss(self):
"""Sets a flag to tell the thread to exit when done with current job."""
self._dismissed.set()


class ThreadPool(object):
def __init__(self, workerNums=3, timeout=0.1):
self._workerNums = workerNums
self._timeout = timeout
self._workQueue = Queue.Queue() # no maximum
self._resultQueue = Queue.Queue()
self.workers = []
self.dismissedWorkers = []
self._createWorkers(self._workerNums)

def _createWorkers(self, workerNums):
"""Add num_workers worker threads to the pool."""
for i in range(workerNums):
worker = MyThread(self._workQueue, self._resultQueue,
timeout=self._timeout)
self.workers.append(worker)

def _dismissWorkers(self, workerNums, _join=False):
"""Tell num_workers worker threads to quit after their current task."""
dismissList = []
for i in range(min(workerNums, len(self.workers))):
worker = self.workers.pop()
worker.dismiss()
dismissList.append(worker)

if _join:
for worker in dismissList:
worker.join()
else:
self.dismissedWorkers.extend(dismissList)

def _joinAllDissmissedWorkers(self):
"""
Perform Thread.join() on all
worker threads that have been dismissed.
"""
for worker in self.dismissedWorkers:
worker.join()
self.dismissedWorkers = []

def addJob(self, callable, *args, **kwds):
self._workQueue.put((callable, args, kwds))

def getResult(self, block=False, timeout=0.1):
try:
item = self._resultQueue.get(block, timeout)
return item
except Queue.Empty, e:
return None
except:
raise

def waitForComplete(self, timeout=0.1):
"""
Last function. To dismiss all worker threads. Delete ThreadPool.
:param timeout
"""
while True:
workerNums = self._workQueue.qsize() # 釋放掉所有線程
runWorkers = len(self.workers)

if 0 == workerNums:
time.sleep(timeout) # waiting for thread to do job
self._dismissWorkers(runWorkers)
break
# if workerNums < runWorkers: # 不能這樣子亂取消
# self._dismissWorkers(runWorkers - workerNums)
time.sleep(timeout)
self._joinAllDissmissedWorkers()


if "__main__" == __name__:
test1 = """
def doSomething(*args, **kwds):
if "sleep" in kwds:
sleep = kwds["sleep"]
msgTxt = "sleep %fs.." % sleep
time.sleep(sleep)
return msgTxt

for i in range(10):
print doSomething(sleep=0.1, handlerKey="key-%d"%i)

wm = ThreadPool(10)
for i in range(10):
wm.addJob(doSomething, sleep=1, handlerKey="key-%d"%i)
wm.waitForComplete()
for i in range(10):
print wm.getResult()
del wm
"""
# test2 = """

def doSomething_(*args, **kwds):
sleep = int(args[0])
msgTxt = "sleep %ds.." % sleep
time.sleep(sleep)
return msgTxt


wm = ThreadPool(10)
result = []
for i in range(10):
data = 5
wm.addJob(doSomething_, data)

while 1:
res = wm.getResult()
if res:
result.append(res)
if 10 == len(result):
break
print "sleep 0.1"
time.sleep(0.1)
print time.time()
wm.waitForComplete()
print time.time()
# """

Copyright © Linux教程網 All Rights Reserved