Server : LiteSpeed System : Linux in-mum-web1949.main-hosting.eu 5.14.0-503.40.1.el9_5.x86_64 #1 SMP PREEMPT_DYNAMIC Mon May 5 06:06:04 EDT 2025 x86_64 User : u595547767 ( 595547767) PHP Version : 7.4.33 Disable Function : NONE Directory : /opt/alt/python27/lib/python2.7/site-packages/postomaat/plugins/ |
# -*- coding: UTF-8 -*-
# Copyright 2012-2018 Oli Schacher
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
#
#
import time
from ConfigParser import NoOptionError
from postomaat.shared import ScannerPlugin, DUNNO
try:
from fluent import sender
FLUENT_AVAILABLE = True
except ImportError:
FLUENT_AVAILABLE = False
class Event(object):
def __init__(self, label, data, **kwargs):
self.data = data
assert isinstance(self.data, dict), 'data must be a dict'
self.label = label
self.sender_ = kwargs.get('sender', sender.get_global_sender())
self.timestamp = kwargs.get('time', int(time.time()))
def send(self):
return self.sender_.emit_with_time(self.label, self.timestamp, self.data)
class FluentWriter(ScannerPlugin):
def __init__(self, config, section=None):
ScannerPlugin.__init__(self, config, section)
self.logger = self._logger()
self.requiredvars = {
'host': {
'default': None,
'description': 'Where to connect to fluentd. Local connecion if not specified.'
},
'port': {
'default': '24224',
'description': 'TCP port to send events to'
},
'tag': {
'default': 'postomaat.stats',
'description': 'Tag events sent to fluetnd'
},
'fields': {
'default': 'from_address to_address from_domain to_domain size',
'description': 'Field names or maps that should be exported'
}
}
self.fieldmap = {}
self.fields_to_lower = [
'from_address',
'from_domain',
'to_address',
'to_domain',
'sender',
'recipient',
'sasl_sender',
'sasl_username',
'sasl_user'
]
try:
host = self.config.get(self.section, 'host')
except NoOptionError:
host = None
self.host = host
try:
port = self.config.getint(self.section, 'port')
except NoOptionError:
port = 24224
self.port = port
try:
tag = self.config.get(self.section, 'tag')
except NoOptionError:
tag = 'postomaat.stats'
self.tag = tag
def connect(self):
if sender.get_global_sender() is not None:
return
kw = {}
kw['host'] = self.host
kw['port'] = self.port
tag = self.tag
if 'host' not in kw:
del kw['port']
if 'host' in kw and kw['host'] is None:
del kw['host']
del kw['port']
sender.setup(tag=tag, **kw)
def get_fieldmap(self):
"""Create the mapping from attribute names to field names based on the config string
by default, fluentd field name is the same as Postfix Policy attribute name,
but, to override the mapping, the config can be in the form:
postfix_attribute:fluentd_field
Attribute is any name that is available in the suspect object.
That is:
- it's an attribute name in Postix Policy protocol
- it was appended to suspect object by previous plugins
eg.
fields=to_address to_domain from_address:sender from_domain:senderdomain size
"""
configstring = self.config.get(self.section, 'fields')
fields = configstring.split()
fieldmap = {}
for field in fields:
if ':' in field:
(tag, column) = field.split(':', 1)
fieldmap[column] = tag
else:
fieldmap[field] = field
return fieldmap
def build_event_data(self, suspect, fieldmap):
event_data = {}
for fluent_field, postfix_attr in fieldmap.items():
if postfix_attr == 'from_address':
suspect_attribute = suspect.from_address
elif postfix_attr == 'from_domain':
suspect_attribute = suspect.from_domain
elif postfix_attr == 'to_address':
suspect_attribute = suspect.to_address
elif postfix_attr == 'to_domain':
suspect_attribute = suspect.to_domain
else:
suspect_attribute = suspect.get_value(postfix_attr)
if suspect_attribute is not None:
event_data[fluent_field] = suspect_attribute
elif postfix_attr in suspect.tags:
event_data[fluent_field] = suspect.tags[fluent_field]
else:
self.logger.warning("Suspect does not have attribute %s. Typo or missing plugin?",
postfix_attr)
event_data[fluent_field] = None
return self.normalized_event_fields(event_data)
def normalized_event_fields(self, event_data):
for field, _ in event_data.items():
if field in self.fields_to_lower:
try:
event_data[field] = event_data[field].lower()
except AttributeError:
pass
return event_data
def lint(self):
lint_ok = True
if not self.check_config():
print('Error checking config')
lint_ok = False
if not FLUENT_AVAILABLE:
print("fluent-logger module is not installed")
lint_ok = False
if FLUENT_AVAILABLE:
try:
self.connect()
gs = sender.get_global_sender()
gs._reconnect()
except Exception as ex:
error = type(ex).__name__, ex.message
print("Failed to connect to FluentD {}".format(str(error)))
lint_ok = False
return lint_ok
def examine(self, suspect):
try:
self.connect()
self.fieldmap = self.get_fieldmap()
data = self.build_event_data(suspect, self.fieldmap)
fleuntd_event = Event(None, data)
# socket.error Exception is tainted in fluent.sender._send_internal
if not fleuntd_event.send():
raise Exception('Failed to send event to fluentd, probably socket error')
except Exception as ex: # pylint: disable=W0703
error = type(ex).__name__, ex.message
self.logger.error("FluentD Writer plugin failed, log not written: %s", str(error))
return DUNNO
def __str__(self):
return "FluentD Writer Plugin"