Server : LiteSpeed System : Linux in-mum-web1949.main-hosting.eu 5.14.0-503.40.1.el9_5.x86_64 #1 SMP PREEMPT_DYNAMIC Mon May 5 06:06:04 EDT 2025 x86_64 User : u595547767 ( 595547767) PHP Version : 7.4.33 Disable Function : NONE Directory : /opt/alt/python27/lib/python2.7/site-packages/postomaat/ |
# Copyright 2009-2018 Oli Schacher
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
#
#
import threading
import time
import weakref
try:
import queue
except ImportError:
import Queue as queue
import logging
import postomaat.core
from postomaat.scansession import SessionHandler
class ThreadPool(threading.Thread):
def __init__(self, controller, minthreads=1, maxthreads=20, queuesize=100):
self.workers = []
self.queuesize = queuesize
self.tasks = queue.Queue(queuesize)
self.minthreads = minthreads
self.maxthreads = maxthreads
assert self.minthreads > 0
assert self.maxthreads >= self.minthreads
self.logger = logging.getLogger('%s.threadpool' % __package__)
self.threadlistlock = threading.Lock()
self.checkinterval = 10
self.threadcounter = 0
self._stayalive = True
self.laststats = 0
self.statinverval = 60
self.controller = weakref.ref(controller) # keep a weak reference to controller
threading.Thread.__init__(self)
self.name = 'Threadpool'
self.daemon = False
self.start()
@property
def stayalive(self):
return self._stayalive
@stayalive.setter
def stayalive(self, value):
# threadpool is shut down -> send poison pill to workers
if self._stayalive and not value:
self._stayalive = False
self._send_poison_pills()
self._stayalive = value
def _send_poison_pills(self):
"""flood the queue with poison pills to tell all workers to shut down"""
for _ in range(self.maxthreads):
self.tasks.put_nowait(None)
def add_task(self, session):
if self._stayalive:
self.tasks.put(session)
def add_task_from_socket(self, sock, port):
"""
Consistent interface with procpool
Add task to queue compressing the socket which is needed for multiprocessing
Args:
sock (socket): incoming socket
port (int): port where message was received (needed for plugin list)
"""
task = (postomaat.core.forking_dumps(sock), port)
self.add_task(task)
def get_task_sessionhandler(self):
if self._stayalive:
task = self.tasks.get(True)
if task is None:
return None
psock, port = task
sock = postomaat.core.forking_load(psock)
return SessionHandler(sock, self.controller().config, self.controller().plugin_list_by_port(port))
else:
return None
def run(self):
self.logger.debug('Threadpool initializing. minthreads=%s maxthreads=%s maxqueue=%s checkinterval=%s' % (
self.minthreads, self.maxthreads, self.queuesize, self.checkinterval))
while self._stayalive:
curthreads = self.workers
numthreads = len(curthreads)
# check the minimum boundary
requiredminthreads = self.minthreads
if numthreads < requiredminthreads:
diff = requiredminthreads - numthreads
self._add_worker(diff)
continue
# check the maximum boundary
if numthreads > self.maxthreads:
diff = numthreads - self.maxthreads
self._remove_worker(diff)
continue
changed = False
# ok, we are within the boundaries, now check if we can dynamically
# adapt something
queuesize = self.tasks.qsize()
# if there are more tasks than current number of threads, we try to
# increase
workload = float(queuesize) / float(numthreads)
if workload > 1 and numthreads < self.maxthreads:
self._add_worker()
numthreads += 1
changed = True
if workload < 1 and numthreads > self.minthreads:
self._remove_worker()
numthreads -= 1
changed = True
# log current stats
if changed or time.time() - self.laststats > self.statinverval:
workerlist = "\n%s" % '\n'.join(map(repr, self.workers))
self.logger.debug('queuesize=%s workload=%.2f workers=%s workerlist=%s' % (
queuesize, workload, numthreads, workerlist))
self.laststats = time.time()
time.sleep(self.checkinterval)
self.logger.info('Threadpool shut down')
def _remove_worker(self, num=1):
self.logger.debug('Removing %s workerthread(s)' % num)
for bla in range(0, num):
worker = self.workers[0]
worker.stayalive = False
worker.join(120)
del self.workers[0]
def _add_worker(self, num=1):
self.logger.debug('Adding %s workerthread(s)' % num)
for bla in range(0, num):
self.threadcounter += 1
worker = Worker("[%s]" % self.threadcounter, self)
self.workers.append(worker)
worker.start()
def shutdown(self, newmanager=None):
"""
Shutdown manager, transfer queue to a new manager if available. Otherwise
mark messages as defer.
Keyword Args:
newmanager (ProcManager or ThreadPool): has to provide add_task accepting a pickled socket
"""
# set stayalive to False, this will send
# poison pills to the workers
self.stayalive = False
# now remove elements from the queue
# first, put another poison pill for the Threadpool itself
self.tasks.put_nowait(None)
if newmanager:
# new manager available. Transfer tasks
# to new manager
countmessages = 0
while True:
# don't use the get_task_sessionhandler from Threadpool since this will
# not give anything once stayalive is False
task = self.tasks.get(True)
if task is None: # poison pill
break
newmanager.add_task(task)
countmessages += 1
self.logger.info("Moved %u messages to queue of new manager" % countmessages)
else:
# no new manager. Mark messages as defer.
returnMessage = "Temporarily unavailable... Please try again later."
markDeferCounter = 0
while True:
# don't use the get_task_sessionhandler from Threadpool since this will
# not give anything once stayalive is False
sesshandler = self.tasks.get(True)
if sesshandler == None: # poison pill -> shut down
break
markDeferCounter += 1
sesshandler.protohandler.defer(returnMessage)
self.logger.info("Marked %s messages as '%s' to close queue" % (markDeferCounter,returnMessage))
# remove all the workers (joins them also)
for worker in self.workers:
worker.stayalive = False
worker.join(120) # wait 120 seconds max
class Worker(threading.Thread):
def __init__(self, workerid, pool):
threading.Thread.__init__(self, name='Pool worker %s' % workerid)
self.workerid = workerid
self.birth = time.time()
self.pool = pool
self.stayalive = True
self.logger = logging.getLogger('%s.threads.worker.%s' % (__package__, workerid))
self.logger.debug('thread init')
self.noisy = False
self.setDaemon(False)
self.workerstate = 'created'
def __repr__(self):
return "%s: %s" % (self.workerid, self.workerstate)
def run(self):
self.logger.debug('thread start')
while self.stayalive:
self.workerstate = 'waiting for task'
if self.noisy:
self.logger.debug('Getting new task...')
sesshandler = self.pool.get_task_sessionhandler()
if sesshandler == None: # poison pill -> shut down
if self.noisy:
self.logger.debug("got a poison pill .. good bye world")
self.stayalive = False
continue
if self.noisy:
self.logger.debug('Doing work')
try:
sesshandler.handlesession(self)
except Exception as e:
self.logger.error('Unhandled Exception : %s' % e)
self.workerstate = 'task completed'
self.workerstate = 'ending'
self.logger.debug('thread end')