Al-HUWAITI Shell
Al-huwaiti


Server : LiteSpeed
System : Linux in-mum-web1949.main-hosting.eu 5.14.0-503.40.1.el9_5.x86_64 #1 SMP PREEMPT_DYNAMIC Mon May 5 06:06:04 EDT 2025 x86_64
User : u595547767 ( 595547767)
PHP Version : 7.4.33
Disable Function : NONE
Directory :  /opt/alt/python27/lib/python2.7/site-packages/postomaat/

Upload File :
current_dir [ Writeable ] document_root [ Writeable ]

 

Current File : //opt/alt/python27/lib/python2.7/site-packages/postomaat/threadpool.py
#   Copyright 2009-2018 Oli Schacher
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
#
#
import threading
import time
import weakref
try:
    import queue
except ImportError:
    import Queue as queue
import logging
import postomaat.core
from postomaat.scansession import SessionHandler


class ThreadPool(threading.Thread):

    def __init__(self, controller, minthreads=1, maxthreads=20, queuesize=100):
        self.workers = []
        self.queuesize = queuesize
        self.tasks = queue.Queue(queuesize)
        self.minthreads = minthreads
        self.maxthreads = maxthreads
        assert self.minthreads > 0
        assert self.maxthreads >= self.minthreads

        self.logger = logging.getLogger('%s.threadpool' % __package__)
        self.threadlistlock = threading.Lock()
        self.checkinterval = 10
        self.threadcounter = 0
        self._stayalive = True
        self.laststats = 0
        self.statinverval = 60
        self.controller = weakref.ref(controller)  # keep a weak reference to controller
        threading.Thread.__init__(self)
        self.name = 'Threadpool'
        self.daemon = False
        self.start()

    @property
    def stayalive(self):
        return self._stayalive

    @stayalive.setter
    def stayalive(self, value):
        # threadpool is shut down -> send poison pill to workers
        if self._stayalive and not value:
            self._stayalive = False
            self._send_poison_pills()
        self._stayalive = value

    def _send_poison_pills(self):
        """flood the queue with poison pills to tell all workers to shut down"""
        for _ in range(self.maxthreads):
            self.tasks.put_nowait(None)

    def add_task(self, session):
        if self._stayalive:
            self.tasks.put(session)

    def add_task_from_socket(self, sock, port):
        """
        Consistent interface with procpool
        Add task to queue compressing the socket which is needed for multiprocessing

        Args:
            sock (socket): incoming socket
            port (int): port where message was received (needed for plugin list)
        """
        task = (postomaat.core.forking_dumps(sock), port)
        self.add_task(task)

    def get_task_sessionhandler(self):
        if self._stayalive:
            task = self.tasks.get(True)
            if task is None:
                return None
            psock, port = task
            sock = postomaat.core.forking_load(psock)
            return SessionHandler(sock, self.controller().config, self.controller().plugin_list_by_port(port))
        else:
            return None

    def run(self):
        self.logger.debug('Threadpool initializing. minthreads=%s maxthreads=%s maxqueue=%s checkinterval=%s' % (
            self.minthreads, self.maxthreads, self.queuesize, self.checkinterval))

        while self._stayalive:
            curthreads = self.workers
            numthreads = len(curthreads)

            # check the minimum boundary
            requiredminthreads = self.minthreads
            if numthreads < requiredminthreads:
                diff = requiredminthreads - numthreads
                self._add_worker(diff)
                continue

            # check the maximum boundary
            if numthreads > self.maxthreads:
                diff = numthreads - self.maxthreads
                self._remove_worker(diff)
                continue

            changed = False
            # ok, we are within the boundaries, now check if we can dynamically
            # adapt something
            queuesize = self.tasks.qsize()

            # if there are more tasks than current number of threads, we try to
            # increase
            workload = float(queuesize) / float(numthreads)

            if workload > 1 and numthreads < self.maxthreads:
                self._add_worker()
                numthreads += 1
                changed = True

            if workload < 1 and numthreads > self.minthreads:
                self._remove_worker()
                numthreads -= 1
                changed = True

            # log current stats
            if changed or time.time() - self.laststats > self.statinverval:
                workerlist = "\n%s" % '\n'.join(map(repr, self.workers))
                self.logger.debug('queuesize=%s workload=%.2f workers=%s workerlist=%s' % (
                    queuesize, workload, numthreads, workerlist))
                self.laststats = time.time()

            time.sleep(self.checkinterval)

        self.logger.info('Threadpool shut down')

    def _remove_worker(self, num=1):
        self.logger.debug('Removing %s workerthread(s)' % num)
        for bla in range(0, num):
            worker = self.workers[0]
            worker.stayalive = False
            worker.join(120)
            del self.workers[0]

    def _add_worker(self, num=1):
        self.logger.debug('Adding %s workerthread(s)' % num)
        for bla in range(0, num):
            self.threadcounter += 1
            worker = Worker("[%s]" % self.threadcounter, self)
            self.workers.append(worker)
            worker.start()

    def shutdown(self, newmanager=None):
        """
        Shutdown manager, transfer queue to a new manager if available. Otherwise
        mark messages as defer.

        Keyword Args:
            newmanager (ProcManager or ThreadPool): has to provide add_task accepting a pickled socket
        """

        # set stayalive to False, this will send
        # poison pills to the workers
        self.stayalive = False


        # now remove elements from the queue
        # first, put another poison pill for the Threadpool itself
        self.tasks.put_nowait(None)

        if newmanager:
            # new manager available. Transfer tasks
            # to new manager
            countmessages = 0
            while True:
                # don't use the get_task_sessionhandler from Threadpool since this will
                # not give anything once stayalive is False
                task = self.tasks.get(True)
                if task is None:  # poison pill
                    break
                newmanager.add_task(task)
                countmessages += 1
            self.logger.info("Moved %u messages to queue of new manager" % countmessages)
        else:
            # no new manager. Mark messages as defer.
            returnMessage = "Temporarily unavailable... Please try again later."
            markDeferCounter = 0
            while True:
                # don't use the get_task_sessionhandler from Threadpool since this will
                # not give anything once stayalive is False
                sesshandler = self.tasks.get(True)
                if sesshandler == None:  # poison pill -> shut down
                    break
                markDeferCounter += 1
                sesshandler.protohandler.defer(returnMessage)
            self.logger.info("Marked %s messages as '%s' to close queue" % (markDeferCounter,returnMessage))

        # remove all the workers (joins them also)
        for worker in self.workers:
            worker.stayalive = False
            worker.join(120) # wait 120 seconds max

class Worker(threading.Thread):

    def __init__(self, workerid, pool):
        threading.Thread.__init__(self, name='Pool worker %s' % workerid)
        self.workerid = workerid
        self.birth = time.time()
        self.pool = pool
        self.stayalive = True
        self.logger = logging.getLogger('%s.threads.worker.%s' % (__package__, workerid))
        self.logger.debug('thread init')
        self.noisy = False
        self.setDaemon(False)
        self.workerstate = 'created'

    def __repr__(self):
        return "%s: %s" % (self.workerid, self.workerstate)

    def run(self):
        self.logger.debug('thread start')

        while self.stayalive:
            self.workerstate = 'waiting for task'
            if self.noisy:
                self.logger.debug('Getting new task...')
            sesshandler = self.pool.get_task_sessionhandler()
            if sesshandler == None:  # poison pill -> shut down
                if self.noisy:
                    self.logger.debug("got a poison pill .. good bye world")
                self.stayalive = False
                continue

            if self.noisy:
                self.logger.debug('Doing work')
            try:
                sesshandler.handlesession(self)
            except Exception as e:
                self.logger.error('Unhandled Exception : %s' % e)
            self.workerstate = 'task completed'

        self.workerstate = 'ending'
        self.logger.debug('thread end')

Al-HUWAITI Shell