Server : LiteSpeed System : Linux in-mum-web1949.main-hosting.eu 5.14.0-503.40.1.el9_5.x86_64 #1 SMP PREEMPT_DYNAMIC Mon May 5 06:06:04 EDT 2025 x86_64 User : u595547767 ( 595547767) PHP Version : 7.4.33 Disable Function : NONE Directory : /opt/alt/python27/lib/python2.7/site-packages/postomaat/plugins/ |
# -*- coding: UTF-8 -*-
# Copyright 2012-2018 Fumail Project
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
#
#
"""
This plugin allows setting individual message size limits per recipient domain
"""
from postomaat.shared import ScannerPlugin, DUNNO, REJECT, apply_template, strip_address, extract_domain, get_default_cache
from postomaat.extensions.sql import SQL_EXTENSION_ENABLED, get_session, get_domain_setting
class MessageSize(ScannerPlugin):
def __init__(self,config,section=None):
ScannerPlugin.__init__(self,config,section)
self.logger = self._logger()
self.requiredvars={
'dbconnection':{
'default':"mysql://root@localhost/config?charset=utf8",
'description':'SQLAlchemy Connection string. Leave empty to disable SQL lookups',
},
'domain_sql_query':{
'default':"SELECT max_size from domain where domain_name=:domain",
'description':'get from sql database :domain will be replaced with the actual domain name. must return field check_spf',
},
'messagetemplate':{
'default':'message size ${msg_size} exceeds size limit ${max_size} of recipient domain ${to_domain}'
},
}
def _get_domain_limit(self, to_domain):
max_size = None
dbconnection = self.config.get(self.section, 'dbconnection').strip()
sqlquery = self.config.get(self.section, 'domain_sql_query')
if dbconnection!='' and SQL_EXTENSION_ENABLED:
cache = get_default_cache()
max_size = get_domain_setting(to_domain, dbconnection, sqlquery, cache, self.section, False, self.logger)
return max_size
def examine(self, suspect):
if not SQL_EXTENSION_ENABLED:
return DUNNO
rcpt = suspect.get_value('recipient')
if rcpt is None:
self.logger.error('No TO address found')
return DUNNO
try:
msg_size = int(suspect.get_value('size'))
except (ValueError, TypeError):
msg_size = 0
if msg_size == 0:
self.logger.debug('skipped: message size unknown (not specified or not in end-of-data restrictions)')
return DUNNO
to_domain=extract_domain(strip_address(rcpt))
max_size = self._get_domain_limit(to_domain)
if max_size is None or max_size==0:
self.logger.debug('skipped: no max size for domain %s specified' % to_domain)
return DUNNO
if msg_size > max_size:
message = apply_template(self.config.get(self.section, 'messagetemplate'), suspect, dict(msg_size=msg_size, max_size=max_size))
return REJECT, message
def lint(self):
if not SQL_EXTENSION_ENABLED:
print("sqlalchemy is not installed")
return False
if not self.checkConfig():
return False
try:
dbconnection = self.config.get(self.section, 'dbconnection')
conn=get_session(dbconnection)
conn.execute("SELECT 1")
except Exception as e:
print("Failed to connect to SQL database: %s" % str(e))
return False
return True