Al-HUWAITI Shell
Al-huwaiti


Server : LiteSpeed
System : Linux in-mum-web1949.main-hosting.eu 5.14.0-503.40.1.el9_5.x86_64 #1 SMP PREEMPT_DYNAMIC Mon May 5 06:06:04 EDT 2025 x86_64
User : u595547767 ( 595547767)
PHP Version : 7.4.33
Disable Function : NONE
Directory :  /opt/alt/python27/lib/python2.7/site-packages/postomaat/plugins/

Upload File :
current_dir [ Writeable ] document_root [ Writeable ]

 

Current File : //opt/alt/python27/lib/python2.7/site-packages/postomaat/plugins/messagesize.py
# -*- coding: UTF-8 -*-
#   Copyright 2012-2018 Fumail Project
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
#
#

"""
This plugin allows setting individual message size limits per recipient domain

"""

from postomaat.shared import ScannerPlugin, DUNNO, REJECT, apply_template, strip_address, extract_domain, get_default_cache
from postomaat.extensions.sql import SQL_EXTENSION_ENABLED, get_session, get_domain_setting



class MessageSize(ScannerPlugin):
    
    def __init__(self,config,section=None):
        ScannerPlugin.__init__(self,config,section)
        self.logger = self._logger()
        self.requiredvars={
            'dbconnection':{
                'default':"mysql://root@localhost/config?charset=utf8",
                'description':'SQLAlchemy Connection string. Leave empty to disable SQL lookups',
            },
            'domain_sql_query':{
                'default':"SELECT max_size from domain where domain_name=:domain",
                'description':'get from sql database :domain will be replaced with the actual domain name. must return field check_spf',
            },
            'messagetemplate':{
                'default':'message size ${msg_size} exceeds size limit ${max_size} of recipient domain ${to_domain}'
            },
        }
        
        
        
    def _get_domain_limit(self, to_domain):
        max_size = None
        dbconnection = self.config.get(self.section, 'dbconnection').strip()
        sqlquery = self.config.get(self.section, 'domain_sql_query')
        
        if dbconnection!='' and SQL_EXTENSION_ENABLED:
            cache = get_default_cache()
            max_size = get_domain_setting(to_domain, dbconnection, sqlquery, cache, self.section, False, self.logger)
        
        return max_size
        
        
        
    def examine(self, suspect):
        if not SQL_EXTENSION_ENABLED:
            return DUNNO
        
        rcpt = suspect.get_value('recipient')
        if rcpt is None:
            self.logger.error('No TO address found')
            return DUNNO
        
        try:
            msg_size = int(suspect.get_value('size'))
        except (ValueError, TypeError):
            msg_size = 0
        
        if msg_size == 0:
            self.logger.debug('skipped: message size unknown (not specified or not in end-of-data restrictions)')
            return DUNNO
        
        to_domain=extract_domain(strip_address(rcpt))
        max_size = self._get_domain_limit(to_domain)
        
        if max_size is None or max_size==0:
            self.logger.debug('skipped: no max size for domain %s specified' % to_domain)
            return DUNNO
        
        if msg_size > max_size:
            message = apply_template(self.config.get(self.section, 'messagetemplate'), suspect, dict(msg_size=msg_size, max_size=max_size))
            return REJECT, message
    
    
    
    def lint(self):
        if not SQL_EXTENSION_ENABLED:
            print("sqlalchemy is not installed")
            return False
    
        if not self.checkConfig():
            return False
        
        try:
            dbconnection = self.config.get(self.section, 'dbconnection')
            conn=get_session(dbconnection)
            conn.execute("SELECT 1")
        except Exception as e:
            print("Failed to connect to SQL database: %s" % str(e))
            return False
            
        return True
        
        
        
        
    

Al-HUWAITI Shell