Server : LiteSpeed System : Linux in-mum-web1949.main-hosting.eu 5.14.0-503.40.1.el9_5.x86_64 #1 SMP PREEMPT_DYNAMIC Mon May 5 06:06:04 EDT 2025 x86_64 User : u595547767 ( 595547767) PHP Version : 7.4.33 Disable Function : NONE Directory : /opt/alt/python27/lib/python2.7/site-packages/postomaat/ |
# -*- coding: UTF-8 -*-
# Copyright 2009-2018 Oli Schacher
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
#
#
import postomaat.core
import postomaat.logtools as logtools
from postomaat.scansession import SessionHandler
from postomaat.stats import Statskeeper, StatDelta
from postomaat.addrcheck import Addrcheck
import multiprocessing
import multiprocessing.queues
import signal
import logging
import traceback
import threading
import importlib
class ProcManager(object):
def __init__(self, logQueue, numprocs = None, queuesize=100, config = None):
self._child_id_counter=0
self._logQueue = logQueue
self.manager = multiprocessing.Manager()
self.shared_state = self._init_shared_state()
self.config = config
self.numprocs = numprocs
self.workers = []
self.queuesize = queuesize
self.tasks = multiprocessing.Queue(queuesize)
self.child_to_server_messages = multiprocessing.Queue()
self.logger = logging.getLogger('%s.procpool' % __package__)
self._stayalive = True
self.name = 'ProcessPool'
self.message_listener = MessageListener(self.child_to_server_messages)
self.start()
def _init_shared_state(self):
shared_state = self.manager.dict()
return shared_state
@property
def stayalive(self):
return self._stayalive
@stayalive.setter
def stayalive(self, value):
# procpool is shut down -> send poison pill to workers
if self._stayalive and not value:
self._stayalive = False
self._send_poison_pills()
self._stayalive = value
def _send_poison_pills(self):
"""flood the queue with poison pills to tell all workers to shut down"""
for _ in range(len(self.workers)):
# tasks queue is FIFO queue. As long as nothing is added to the queue
# anymore the poison pills will be the last elements taken from the queue
self.tasks.put_nowait(None)
def add_task(self, session):
if self._stayalive:
self.tasks.put(session)
def add_task_from_socket(self, sock, port):
"""
Add task to queue compressing the socket which is needed for multiprocessing
Args:
sock (socket): incoming socket
port (int): port where message was received (needed for plugin list)
"""
# in multi processing, the other process manages configs and plugins itself, we only pass the minimum required information:
# a pickled version of the socket (this is no longer required in python 3.4, but in python 2 the multiprocessing queue can not handle sockets
# see https://stackoverflow.com/questions/36370724/python-passing-a-tcp-socket-object-to-a-multiprocessing-queue
task = (postomaat.core.forking_dumps(sock), port)
self.add_task(task)
def _create_worker(self):
self._child_id_counter +=1
worker_name = "Worker-%s"%self._child_id_counter
worker = multiprocessing.Process(target=postomaat_process_worker, name=worker_name,
args=(self.tasks, self.config, self.shared_state, self.child_to_server_messages, self._logQueue))
return worker
def start(self):
for i in range(self.numprocs):
worker = self._create_worker()
worker.start()
self.workers.append(worker)
# Start the child-to-parent message listener
self.message_listener.start()
def shutdown(self, newmanager=None):
"""
Shutdown manager, transfer queue to a new manager if available. Otherwise
mark messages as defer.
Keyword Args:
newmanager (ProcManager or ThreadPool): has to provide add_task accepting a pickled socket
"""
# setting stayalive equal to False
# will send poison pills to all processors
self.logger.debug("Shutdown procpool -> send poison pills")
self.stayalive = False
# add another poison pill for the ProcManager itself removing tasks...
self.tasks.put_nowait(None)
if newmanager:
# new manager available. Transfer tasks
# to new manager
countmessages = 0
while True:
task = self.tasks.get()
if task is None: # poison pill
break
newmanager.add_task(task)
countmessages += 1
self.logger.info("Moved %u messages to queue of new manager" % countmessages)
else:
# no new manager. Mark messages as defer.
returnmessage = "Temporarily unavailable... Please try again later."
markdefercounter = 0
while True:
task = self.tasks.get()
if task is None: # poison pill
break
markdefercounter += 1
psock, port = task
sock = postomaat.core.forking_load(psock)
handler = SessionHandler(sock, self.config, [])
handler.defer(returnmessage)
self.logger.info("Marked %s messages as '%s' to close queue" % (markdefercounter, returnmessage))
# join the workers
self.logger.debug("Join workers")
for worker in self.workers:
worker.join(120)
self.logger.debug("Join message listener")
self.message_listener.stayalive = False
# put poison pill into queue otherwise the process will not stop
# since "stayalive" is only checked after receiving a message from the queue
self.child_to_server_messages.put_nowait(None)
self.message_listener.join(120)
self.logger.debug("Close tasks queue")
self.tasks.close()
self.child_to_server_messages.close()
self.logger.debug("Shutdown multiprocessing manager")
self.manager.shutdown()
self.logger.debug("done...")
class MessageListener(threading.Thread):
def __init__(self, message_queue):
threading.Thread.__init__(self)
self.name = "Process Message Listener"
self.message_queue = message_queue
self.stayalive = True
self.statskeeper = Statskeeper()
self.daemon = True
def run(self):
while self.stayalive:
message = self.message_queue.get()
if message is None:
break
event_type = message['event_type']
if event_type == 'statsdelta': # increase statistics counters
try:
delta = StatDelta(**message)
self.statskeeper.increase_counter_values(delta)
except Exception:
print(traceback.format_exc())
def postomaat_process_worker(queue, config, shared_state,child_to_server_messages, logQueue):
signal.signal(signal.SIGHUP, signal.SIG_IGN)
logtools.client_configurer(logQueue)
logging.basicConfig(level=logging.DEBUG)
workerstate = WorkerStateWrapper(shared_state,'loading configuration')
logger = logging.getLogger('fuglu.process')
logger.debug("New worker: %s" % logtools.createPIDinfo())
# Setup address compliance checker
# -> Due to default linux forking behavior this should already
# have the correct setup but it's better not to rely on this
try:
address_check = config.get('main','address_compliance_checker')
except Exception as e:
# might happen for some tests which do not propagate defaults
address_check = "Default"
Addrcheck().set(address_check)
# load config and plugins
controller = postomaat.core.MainController(config,logQueue)
controller.load_extensions()
controller.load_plugins()
# create the incoming server objects. This will also create the port-specific
# plugin lists which are used for the SessionHandler
controller.start_incoming_servers(listen=False)
# forward statistics counters to parent process
stats = Statskeeper()
stats.stat_listener_callback.append(lambda event: child_to_server_messages.put(event.as_message()))
logger.debug("%s: Enter service loop..." % logtools.createPIDinfo())
try:
while True:
workerstate.workerstate = 'waiting for task'
logger.debug("%s: Child process waiting for task" % logtools.createPIDinfo())
task = queue.get()
if task is None: # poison pill
logger.debug("%s: Child process received poison pill - shut down" % logtools.createPIDinfo())
try:
# it might be possible it does not work to properly set the workerstate
# since this is a shared variable -> prevent exceptions
workerstate.workerstate = 'ended'
except Exception:
pass
finally:
return
workerstate.workerstate = 'starting scan session'
logger.debug("%s: Child process starting scan session" % logtools.createPIDinfo())
psock, port = task
sock = postomaat.core.forking_load(psock)
handler = SessionHandler(sock, config, controller.plugin_list_by_port(port))
handler.handlesession(workerstate)
except KeyboardInterrupt:
workerstate.workerstate = 'ended'
except Exception:
trb = traceback.format_exc()
logger.error("Exception in child process: %s"%trb)
print(trb)
workerstate.workerstate = 'crashed'
finally:
controller.shutdown()
class WorkerStateWrapper(object):
def __init__(self, shared_state_dict, initial_state='created', process=None):
self._state = initial_state
self.shared_state_dict = shared_state_dict
self.process = process
if not process:
self.process = multiprocessing.current_process()
self._publish_state()
def _publish_state(self):
try:
self.shared_state_dict[self.process.name] = self._state
except EOFError:
pass
@property
def workerstate(self):
return self._state
@workerstate.setter
def workerstate(self, value):
self._state = value
self._publish_state()