Al-HUWAITI Shell
Al-huwaiti


Server : LiteSpeed
System : Linux in-mum-web1949.main-hosting.eu 5.14.0-503.40.1.el9_5.x86_64 #1 SMP PREEMPT_DYNAMIC Mon May 5 06:06:04 EDT 2025 x86_64
User : u595547767 ( 595547767)
PHP Version : 7.4.33
Disable Function : NONE
Directory :  /opt/alt/python27/lib/python2.7/site-packages/postomaat/

Upload File :
current_dir [ Writeable ] document_root [ Writeable ]

 

Current File : //opt/alt/python27/lib/python2.7/site-packages/postomaat/core.py
# -*- coding: utf-8 -*-
#   Copyright 2012-2018 Oli Schacher
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# 
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
#

import logging
import sys
import os
import socket
import time
import traceback
import re
import inspect
from postomaat.shared import Suspect
from postomaat.scansession import SessionHandler
from postomaat.stats import StatsThread
import threading
import postomaat.threadpool
import postomaat.procpool
import multiprocessing
import multiprocessing.reduction
import code
import datetime

from pickle import loads
from multiprocessing.reduction import ForkingPickler
try:
    import ConfigParser
    from StringIO import StringIO
except ImportError:
    # Python 3
    import configparser as ConfigParser
    from io import BytesIO as StringIO


HOSTNAME=socket.gethostname()
         
   

class MainController(object):
    """main class to startup and control the app"""
    plugins=[]
    config=None
    
    def __init__(self, config, logQueue=None, logProcessFacQueue=None):
        """
        Main controller instance
        Note: The logQueue and logProcessFacQueue keyword args are only needed in the postomaat main process when logging
              to files. For default logging to the screen there is not logQueue needed.

        Args:
            config (configparser.RawConfigParser()): Config file parser (file already read)

        Keyword Args:
            logQueue (multiprocessing.queue or None): Queue where to put log messages (not directly used, only by loggers as defined in logtools.client_configurer)
            logProcessFacQueue (multiprocessing.queue or None): Queue where to put new logging configurations (logtools.logConfig objects)
        """
        
        self.requiredvars={
            #main section
            'identifier':{
                'section':'main',
                'description':"""identifier can be any string that helps you identifying your config file\nthis helps making sure the correct config is loaded. this identifier will be printed out when postomaat is reloading its config""",
                'default':'dist',
            },
            
            'daemonize':{
                'section':'main',
                'description':"run as a daemon? (fork)",
                'default':"1",
                #todo: validator...?
            },

            'scantimelogger':{
                'section':'main',
                'description':"Enable session scantime logger",
                'default':"0",
            },
            
            'user':{
                'section':'main',
                'description':"run as user",
                'default':"nobody",
                #todo: validator, check user...?
            },
            
            'group':{
                'section':'main',
                'description':"run as group",
                'default':"nobody",
                #todo: validator, check user...?
            },
            
            'plugindir':{
                'section':'main',
                'description':"where should postomaat search for additional plugins",
                'default':"",
            },
            
            'plugins':{
                'section':'main',
                'description':"what plugins do we load, comma separated",
                'default':"",
            },
            
            'bindaddress':{
                'section':'main',
                'description':"address postomaat should listen on. usually 127.0.0.1 so connections are accepted from local host only",
                'default':"127.0.0.1",
            },
            
            'incomingport':{
                'section':'main',
                'description':"incoming port",
                'default':"9998",
            },
            
            #performance section
            'minthreads':{
                'default':"2",
                'section':'performance',
                'description':'minimum scanner threads',
            },
            'maxthreads':{
                'default':"40",
                'section':'performance',
                'description':'maximum scanner threads',
            },

            'address_compliance_checker': {
                'section': 'main',
                'description': "Method to check mail address validity (\"Default\",\"LazyLocalPart\")",
                'default': "Default",
            },
            'address_compliance_fail_action': {
                'section': 'main',
                'description': "Action to perform if address validity check fails (\"defer\",\"reject\",\"discard\")",
                'default': "defer",
            },
            'address_compliance_fail_message': {
                'section': 'main',
                'description': "Reply message if address validity check fails",
                'default': "invalid sender or recipient address",
            },
            'backend': {
                'default': "thread",
                'section': 'performance',
                'description': "Method for parallelism, either 'thread' or 'process' ",
            },
            'initialprocs': {
                'default': "0",
                'section': 'performance',
                'description': "Initial number of processes when backend='process'. If 0 (the default), automatically selects twice the number of available virtual cores. Despite its 'initial'-name, this number currently is not adapted automatically.",
            },
            
            #  plugin alias
            'call-ahead':{
                'default':"postomaat.plugins.call-ahead.AddressCheck",
                'section':'PluginAlias',
            },
            
            'dbwriter':{
                'default':"postomaat.plugins.dbwriter.DBWriter",
                'section':'PluginAlias',
            },
        }
        self.config=config
        self.servers=[]
        self.logger=self._logger()
        self.stayalive=True
        self.threadpool=None
        self.procpool=None
        self.controlserver = None
        self.started = datetime.datetime.now()
        self.statsthread = None
        self.debugconsole = False
        self._logQueue = logQueue
        self._logProcessFacQueue = logProcessFacQueue
        self.configFileUpdates = None
        self.logConfigFileUpdates = None

    @property
    def logQueue(self):
        return self._logQueue

    @property
    def logProcessFacQueue(self):
        return self._logProcessFacQueue

    @logProcessFacQueue.setter
    def logProcessFacQueue(self, lProc):
        self._logProcessFacQueue = lProc

    def _logger(self):
        myclass=self.__class__.__name__
        loggername="%s.%s"%(__package__, myclass)
        return logging.getLogger(loggername)

    def _start_stats_thread(self):
        self.logger.info("Init Stat Engine")
        statsthread = StatsThread(self.config)
        mrtg_stats_thread = threading.Thread(
            name='MRTG-Statswriter', target=statsthread.writestats, args=())
        mrtg_stats_thread.daemon = True
        mrtg_stats_thread.start()
        return statsthread

    def _start_threadpool(self):
        self.logger.info("Init Threadpool")
        try:
            minthreads = self.config.getint('performance', 'minthreads')
            maxthreads = self.config.getint('performance', 'maxthreads')
        except ConfigParser.NoSectionError:
            self.logger.warning(
                'Performance section not configured, using default thread numbers')
            minthreads = 1
            maxthreads = 3

        queuesize = maxthreads * 10
        return postomaat.threadpool.ThreadPool(self, minthreads, maxthreads, queuesize)

    def _start_processpool(self):
        numprocs = self.config.getint('performance', 'initialprocs')
        if numprocs < 1:
            numprocs = multiprocessing.cpu_count() *2
        self.logger.info("Init process pool with %s worker processes"%(numprocs))
        pool = postomaat.procpool.ProcManager(self._logQueue, numprocs=numprocs, config=self.config)
        return pool

    def startup(self):
        ok=self.load_plugins()
        if not ok:
            sys.stderr.write("Some plugins failed to load, please check the logs. Aborting.\n")
            self.logger.info('postomaat shut down after fatal error condition')
            sys.exit(1)

        self.statsthread = self._start_stats_thread()
        backend = self.config.get('performance','backend')
        if backend == 'process':
            self.procpool = self._start_processpool()
        elif backend == 'thread':
            self.threadpool = self._start_threadpool()
        else:
            raise ValueError("Input \"%s\" not allowed for backend, valid options are \"thread\" and \"process\""%backend)

        self.start_incoming_servers()

        self.logger.info('Startup complete')
        if self.debugconsole:
            self.run_debugconsole()
        else:
            while self.stayalive:
                try:
                    time.sleep(10)
                except KeyboardInterrupt:
                    self.shutdown()

    def start_incoming_servers(self, listen=True):
        """
        Start severs listening at ports. For multiprocessing
        only the main process should create and listen at these ports.
        The copy of the controller which resides in the worker processes
        should only create the object but not try to listen. The correct
        list of the listening servers is used in other routines, for example
        in plugin_list_by_port.

        Args:
            listen (bool): If true the server will listen, otherwise the object is only created

        """
        ports=self.config.get('main', 'incomingport')
        for portconfig in ports.split():
            #plugins
            plugins=self.plugins
            if ':' in portconfig:
                port,pluginlist=portconfig.split(':')
                port=int(port.strip())
                plugins,ok=self._load_all(pluginlist)
                if not ok:
                    self.logger.error("Could not startup engine on port %s, some plugins failed to load"%port)
                    continue
            else:
                port=int(portconfig.strip())

            server = PolicyServer(self, port=port, address=self.config.get('main', 'bindaddress'),
                                  plugins=plugins, listen=listen)
            if listen:
                tr = threading.Thread(target=server.serve, args=())
                tr.daemon = True
                tr.start()
            self.servers.append(server)

    def plugin_list_by_port(self, port):
        """
        Return the plugin list for a given port where postomaat listens
        Args:
            port (int): incoming port

        Returns:
            plugin list if incoming port has been found, full plugin list otherwise

        """
        try:
            for srv in self.servers:
                if srv.port == port:
                    return srv.plugins
        except Exception as e:
            self.logger.exception(e)
            self.logger.error("Exception looking for port %u, returning full list" % port)
            return self.plugins

        self.logger.error("No incoming server found for port %u, returning full list" % port)
        return self.plugins

    def run_debugconsole(self):
        # do not import readline at the top, it will cause undesired output, for example when generating the default config
        # http://stackoverflow.com/questions/15760712/python-readline-module-prints-escape-character-during-import
        import readline

        print("Interactive Console started")
        print("")
        print("pre-defined locals:")

        mc = self
        print("mc : maincontroller")

        terp = code.InteractiveConsole(locals())
        terp.interact("")

    def reload(self):
        """apply config changes"""
        self.logger.info('Applying configuration changes...')

        backend = self.config.get('performance','backend')

        if backend == 'thread':
            if self.threadpool is not None:
                minthreads = self.config.getint('performance', 'minthreads')
                maxthreads = self.config.getint('performance', 'maxthreads')

                # threadpool changes?
                if self.threadpool.minthreads != minthreads or self.threadpool.maxthreads != maxthreads:
                    self.logger.info('Threadpool config changed, initialising new threadpool')
                    currentthreadpool = self.threadpool
                    self.threadpool = self._start_threadpool()
                    currentthreadpool.shutdown(self.threadpool)
                else:
                    self.logger.info('Keep existing threadpool')
            else:
                self.logger.info('Create new threadpool')
                self.threadpool = self._start_threadpool()

            # stop existing procpool
            if self.procpool is not None:
                self.logger.info('Delete old procpool')
                self.procpool.shutdown(self.threadpool)
                self.procpool = None

        elif backend == 'process':
            # start new procpool
            currentProcPool = self.procpool
            self.logger.info('Create new processpool')
            self.procpool = self._start_processpool()

            # stop existing procpool
            # -> the procpool has to be recreated to take configuration changes
            #    into account (each worker process has its own controller unlike using threadpool)
            if currentProcPool is not None:
                self.logger.info('Delete old processpool')
                currentProcPool.shutdown(self.procpool)

            # stop existing threadpool
            if self.threadpool is not None:
                self.logger.info('Delete old threadpool')
                self.threadpool.shutdown(self.procpool)
                self.threadpool = None
        else:
            self.logger.error('backend %s not detected -> ignoring input! (valid options \"thread\" and \"process\")'%backend)

        #smtp engine changes?
        ports=self.config.get('main', 'incomingport')
        portlist = []
        for portconfig in ports.split():
            #plugins
            plugins=self.plugins
            if ':' in portconfig:
                port,pluginlist=portconfig.split(':')
                port=int(port.strip())
                portlist.append(port)

        for port in portlist:
            alreadyRunning=False
            for serv in self.servers:
                if serv.port==port:
                    alreadyRunning=True
                    break
            
            if not alreadyRunning:
                self.logger.info('start new policy server at %s' % str(port))
                server=PolicyServer(self,port=port,address=self.config.get('main', 'bindaddress'))
                tr = threading.Thread(target=server.serve, args=())
                tr.daemon = True
                tr.start()
                self.servers.append(server)
            else:
                self.logger.debug('keep existing policy server at %s' % str(port))

        
        servercopy=self.servers[:] 
        for serv in servercopy:
            if serv.port not in portlist:
                self.logger.info('Closing server socket on port %s' % serv.port)
                serv.shutdown()
                self.servers.remove(serv)
            else:
                self.logger.info('Keep server socket on port %s' % serv.port)

        self.logger.info('Config changes applied')
    
    
    def test(self,valuedict,port=None):
        """dryrun without postfix"""
        suspect=Suspect(valuedict)
        if not self.load_plugins():
            sys.exit(1)

        if port is not None:
            plugins=None
            ports=self.config.get('main', 'incomingport')
            for portconfig in ports.split():
                if ':' in portconfig:
                    pport,pluginlist=portconfig.split(':')
                    if pport!=port:
                        continue
                    plugins,ok=self._load_all(pluginlist)
                    break
                else:
                    if portconfig==port: #port with default config
                        plugins=self.plugins
                        break
        else:
            plugins=self.plugins

        if plugins is None:
            raise Exception("no plugin configuration for current port selection")
        sesshandler=SessionHandler(None, self.config, plugins)
        sesshandler.run_plugins(suspect, plugins)
        action=sesshandler.action
        arg=sesshandler.arg
        return (action,arg)
         
    def shutdown(self):
        if self.statsthread:
            self.statsthread.stayalive = False
        for server in self.servers:
            self.logger.info('Closing server socket on port %s' % server.port)
            server.shutdown()
        
        # stop existing procpool
        if self.procpool is not None:
            self.logger.info('Delete procpool')
            self.procpool.shutdown()
            self.procpool = None
        # stop existing threadpool
        if self.threadpool is not None:
            self.logger.info('Delete threadpool')
            self.threadpool.shutdown()
            self.threadpool = None

        self.stayalive=False
        self.logger.info('Shutdown complete')
        self.logger.info('Remaining threads: %s' %threading.enumerate())
        
    def _lint_dependencies(self, fc):
        print(fc.strcolor('Checking dependencies...', 'magenta'))
        try:
            import sqlalchemy
            print(fc.strcolor('sqlalchemy: Version %s installed' % sqlalchemy.__version__, 'green'))
        except ImportError:
            print(fc.strcolor('sqlalchemy: not installed', 'yellow') +
                  " Optional dependency, required if you want to enable any database lookups")
        
    def lint(self):
        errors=0
        from postomaat.funkyconsole import FunkyConsole
        fc=FunkyConsole()
        self._lint_dependencies(fc)
        print(fc.strcolor('Loading plugins...','magenta'))
        if not self.load_plugins():
            print(fc.strcolor('At least one plugin failed to load','red'))
        print(fc.strcolor('Plugin loading complete','magenta'))
        
        print("Linting %s" % fc.strcolor("main configuration",'cyan'))
        if not self.checkConfig():
            print(fc.strcolor("ERROR","red"))
        else:
            print(fc.strcolor("OK","green"))
        
        allplugins=self.plugins
        
        for plugin in allplugins:
            print("")
            print("Linting Plugin %s Config section: %s" %
                  (fc.strcolor(str(plugin),'cyan'), fc.strcolor(str(plugin.section),'cyan')))
            try:
                result=plugin.lint()
            except Exception as e:
                print("ERROR: %s"%e)
                result=False
            
            if result:
                print(fc.strcolor("OK","green"))
            else:
                errors=errors+1
                print(fc.strcolor("ERROR","red"))
        print("%s plugins reported errors."%errors)
        
        
    
    def checkConfig(self):
        """Check if all requred options are in the config file
        Fill missing values with defaults if possible
        """
        allOK=True
        for config,infodic in iter(self.requiredvars.items()):
            section=infodic['section']
            try:
                var=self.config.get(section,config)
    
                if 'validator' in infodic:
                    if not infodic["validator"](var):
                        print("Validation failed for [%s] :: %s"%(section,config))
                        allOK=False
                
            except ConfigParser.NoSectionError:
                print("Missing configuration section [%s] :: %s"%(section,config))
                allOK=False
            except ConfigParser.NoOptionError:
                print("Missing configuration value [%s] :: %s"%(section,config))
                allOK=False
        return allOK

    def load_extensions(self):
        """load extensions"""
        ret = []
        import postomaat.extensions
        for extension in postomaat.extensions.__all__:
            mod = __import__('%s.extensions.%s' % (__package__, extension))
            ext = getattr(mod, 'extensions')
            fl = getattr(ext, extension)
            enabled = getattr(fl, 'ENABLED')
            status = getattr(fl, 'STATUS')
            name = getattr(fl, '__name__')
            ret.append((name, enabled, status))
        return ret

    def get_component_by_alias(self,pluginalias):
        """Returns the full plugin component from an alias. if this alias is not configured, return the original string"""
        if not self.config.has_section('PluginAlias'):
            return pluginalias
        
        if not self.config.has_option('PluginAlias', pluginalias):
            return pluginalias
        
        return self.config.get('PluginAlias', pluginalias)
    
    def load_plugins(self):
        """load plugins defined in config"""
        allOK = True
        # checking directories, ignore empty string or None
        # (if plugin dir is not set, this would oterhwise result in an array containing one empty string [""]
        #  which would still be processed and a warning printed)
        plugindirs = [dir for dir in self.config.get('main', 'plugindir').strip().split(',') if dir]
        for plugindir in plugindirs:
            if os.path.isdir(plugindir):
                self.logger.debug('Searching for additional plugins in %s' % plugindir)
                if plugindir not in sys.path:
                    sys.path.insert(0, plugindir)
            else:
                self.logger.warning('Plugin directory %s not found' % plugindir)

        self.logger.debug('Module search path %s' % sys.path)
        self.logger.debug('Loading scanner plugins')
        
        newplugins,loadok=self._load_all(self.config.get('main', 'plugins'))
        if not loadok:
            allOK=False
        
        if allOK:
            self.plugins=newplugins
            self.propagate_plugin_defaults()
            
        return allOK
    
    def _load_all(self,configstring):
        """load all plugins from config string. returns tuple ([list of loaded instances],allOk)"""
        pluglist=[]
        config_re=re.compile("""^(?P<structured_name>[a-zA-Z0-9\.\_\-]+)(?:\((?P<config_override>[a-zA-Z0-9\.\_]+)\))?$""")
        allOK=True
        plugins=configstring.split(',')
        for plug in plugins:
            if plug=="":
                continue
            m=config_re.match(plug)
            if m is None:
                self.logger.error('Invalid Plugin Syntax: %s'%plug)
                allOK=False
                continue
            structured_name,configoverride=m.groups()
            structured_name=self.get_component_by_alias(structured_name)
            try:
                plugininstance=self._load_component(structured_name,configsection=configoverride)
                pluglist.append(plugininstance)
            except Exception as e:
                self.logger.error('Could not load plugin %s : %s'%(structured_name, str(e)))
                exc=traceback.format_exc()
                self.logger.error(exc)
                allOK=False
        
        return pluglist,allOK
    
    
    def _load_component(self,structured_name,configsection=None):
        #from: http://mail.python.org/pipermail/python-list/2003-May/204392.html
        component_names = structured_name.split('.')
        mod = __import__('.'.join(component_names[:-1]))
        for component_name in component_names[1:]:
            mod = getattr(mod, component_name)
        
        if configsection is None:
            plugininstance=mod(self.config)
        else:
            #check if plugin supports config override
            if 'section' in inspect.getargspec(mod.__init__)[0]:
                plugininstance=mod(self.config,section=configsection)
            else:
                raise Exception('Cannot set Config Section %s : Plugin %s does not support config override'%(configsection,mod))
        return plugininstance
    
    def propagate_defaults(self,requiredvars,config,defaultsection=None):
        """propagate defaults from requiredvars if they are missing in config"""
        for option,infodic in iter(requiredvars.items()):
            if 'section' in infodic:
                section=infodic['section']
            else:
                section=defaultsection
                
            default=infodic['default']
            
            if not config.has_section(section):
                config.add_section(section)
                
            if not config.has_option(section,option):
                config.set(section,option,default)
    
    def propagate_core_defaults(self):
        """check for missing core config options and try to fill them with defaults
        must be called before we can do plugin loading stuff
        """
        self.propagate_defaults(self.requiredvars, self.config,'main')
    
    def propagate_plugin_defaults(self):
        """propagate defaults from loaded lugins"""
        for plug in self.plugins:
            if hasattr(plug,'requiredvars'):
                requiredvars=getattr(plug,'requiredvars')
                if type(requiredvars)==dict:
                        self.propagate_defaults(requiredvars, self.config, plug.section)

class PolicyServer(object):
    def __init__(self, controller, port=10025, address="127.0.0.1", plugins=None, listen=True):
        self.logger=logging.getLogger("%s.proto.incoming.%s"%(__package__, port))
        self.logger.debug('Starting incoming policy server on Port %s'%port)
        self.port=port
        self.controller=controller
        self.stayalive=1
        if plugins is None:
            self.plugins=controller.plugins
        else:
            self.plugins=plugins

        self._socket = None
        if listen:
            try:
                self._socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
                self._socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
                self._socket.bind((address, port))
                self._socket.listen(5)
            except Exception as e:
                self.logger.error('Could not start incoming policy server: %s'%e)
                sys.exit(1)

   
    def shutdown(self):
        self.stayalive=False
        if self._socket is not None:
            self._socket.close()
            self._socket = None
        
    def serve(self):

        self.logger.info('policy server running on port %s'%self.port)
        while self.stayalive:
            try:
                self.logger.debug('Waiting for connection...')
                sock, addr = self._socket.accept()
                if not self.stayalive:
                    break
                self.logger.debug('Incoming connection from %s' % str(addr))

                if self.controller.threadpool:
                    #this will block if queue is full
                    self.controller.threadpool.add_task_from_socket(sock, self.port)
                elif self.controller.procpool:
                    self.controller.procpool.add_task_from_socket(sock, self.port)
                else:
                    engine = SessionHandler(sock, self.controller.config, self.plugins)
                    engine.handlesession()
            except Exception as e:
                self.logger.exception(e)

def forking_dumps(obj):
    """ Pickle a socket This is required to pass the socket in multiprocessing"""
    buf = StringIO()
    ForkingPickler(buf).dump(obj)
    return buf.getvalue()

def forking_load(dump):
    sock = loads(dump)
    return sock

Al-HUWAITI Shell