[elbe-devel] [PATCH v4 01/41] log.py - New logging system

Torben Hohn torben.hohn at linutronix.de
Wed Aug 7 11:43:37 CEST 2019


On Mon, Jul 08, 2019 at 05:11:06PM +0200, dion at linutronix.de wrote:
> From: Olivier Dion <dion at linutronix.de>
> 
> * logging_methods
> 
>   A logging method is registered through the decorator
>   "@logging_method(name)" where name is an unique string that identify
>   the logging method to use.
> 
>   Logging methods are used by elbe_logging to open specific locations
>   of logging for the calling thread.
> 
>   Logging methods should also be wrapped with the "@with_list"
>   decorator that ensure that the first argument passed is always a
>   list.
> 
>   Finally, a logging method should act as an iterator that return a
>   list of handlers to add to the thread for each iteration.
> 
> ** add_stream_handlers
> 
>    This logging method name is "streams" and accepts a list of stream
>    objects to open as logging output.
> 
>    Each created handlers allowed the following logger to log to it:
> 
>      - root
>      - log
>      - report
>      - validation
>      - echo
>      - soap
> 
>    Every messages from those handlers are redirect to the list of
>    streams provided.
> 
> ** add_project_handlers
> 
>    This logging method name is "projects" and accepts a list of
>    project's directory.
> 
>    Every projects open 5 handlers.
> 
> *** validation
> 
>     This handler will only receive messages from the 'validation' logger
>     and output them to "project_dir/validation.txt".
> 
> *** report
> 
>     This handler will only receive messages from the 'report' logger
>     and output them to "project_dir/elbe-report.txt"
> 
> *** log
> 
>     This handler will only receive messages from the 'root' and 'log'
>     loggers and output them to "project_dir/log.txt".
> 
> *** echo
> 
>     This handler will only receive messages from the 'root', 'report'
>     and 'validation' loggers and output them to the project's message
>     queue.
> 
> *** soap
> 
>     This handler will only receive messages from the 'soap' logger and
>     output them to the project's message queue.
> 
> ** add_file_handlers
> 
>    This logging method name is "files" and is similar to the
>    add_stream_handlers logging method, except that it takes a list of
>    filepaths instead of a list of streams.
> 
>    NOTE: If a filepath is None, then an handler to sys.stdout is
>    created instead.
> 
> ** add_projectQ_handlers
> 
>    This logging method name is "projectsQ" and it accepts a list of
>    project's directory and install two handlers per projects.
> 
> *** echo
> 
>     This handler accepts all messages from the 'root', 'report' and
>     'validation' loggers and output them to all project's messages
>     queue.
> 
> *** soap
> 
>     Same as echo but only accepts the 'soap' logger.
> 
> * elbe_logging context manager
> 
>   Since multiple threads can work on multiple projects and since
>   logging is shared between threads, this context manager allows to
>   register handlers for that thread and remove them after.
> 
>   The caller have to pass a dict with keys referencing to registered
>   logging methods.  Each keys in the dict point to either a single or
>   a list of accepted objects by the corresponding logging method.
> 
> ** {open,close}_logging
> 
>    These functions should never be called.  Always use the
>    elbe_logging context manager.
> 
> ** Example
> 
>    ```````````````````````````````````````````````````````````````````
>    with elbe_log({"projectsQ":"/var/cache/elbe/A",
>                   "files:""/mnt/log.txt",
> 		  "streams":pipe})
>    ```````````````````````````````````````````````````````````````````
> 
>    In this example, messages will be logged to the project's messages
>    queue of project "/var/cache/elbe/A", to the file "/mnt/log.txt"
>    and to the stream object 'pipe', according to the policies
>    described above.
> 
> * QHandler
> 
>   A project's messages queue is unique among threads.  The queue as a
>   limit of messages and will overwrite previous one if not readed.
> 
>   The queue is intended to be read by an external client, e.g SOAP.
> 
>   To read from a project's message queue, always use the read_loggingQ
>   function by specifying a project directory.  If no logging message
>   is avaible for that project, the empty string is returned.
> 
> * async_logging
> 
>   This function takes two files descriptors and two loggers.
> 
>   The files descriptors should act as a pipe.  The first file
>   descriptor is the reading end and the second is the writing end.
> 
>   Only the reading end will be used and the writing one will be closed
>   uppon starting the thread.  The reading end is close whenever the
>   pipe is broken (no more messages).
> 
>   The first of the two loggers is a 'streamable' logger, i.e. that it
>   will receive messages from the thread every time a newline is read.
>   This means that messages sent to that logger can be out of order.
> 
>   The second loggers is a 'block' logger.  It will receive everything
>   that the first logger received, but in one big blobs and thus in
>   order.
> 
>   For both loggers, messages are logged with the 'info' level.
> 
> ** atmost
> 
>    By default, async_logging will read from the pipe 80 characters at
>    a time.  This can be changed, by passing atmost=1024 for example,
>    if you know that the output will be heavy and thus reduce the
>    number of system call.
> 
> Signed-off-by: Olivier Dion <dion at linutronix.de>

Reviewed-by: Torben Hohn <torben.hohn at linutronix.de>

> ---
>  elbepack/log.py | 224 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++
>  1 file changed, 224 insertions(+)
>  create mode 100644 elbepack/log.py
> 
> diff --git a/elbepack/log.py b/elbepack/log.py
> new file mode 100644
> index 00000000..f42e91bd
> --- /dev/null
> +++ b/elbepack/log.py
> @@ -0,0 +1,224 @@
> +# ELBE - Debian Based Embedded Rootfilesystem Builder
> +# Copyright (c) 2019 Olivier Dion <dion at linutronix.de>
> +#
> +# SPDX-License-Identifier: GPL-3.0-or-later
> +
> +
> +import collections
> +import logging
> +import os
> +import select
> +import threading
> +from contextlib import contextmanager
> +
> +
> +root = logging.getLogger()
> +root.setLevel(logging.DEBUG)
> +local = threading.local()
> +context_fmt = logging.Formatter("%(context)s%(message)s")
> +logging_methods = []
> +
> +
> +class QHandler(logging.Handler):
> +
> +    queues = {}
> +
> +    def __init__(self, target, *args, **kwargs):
> +        super(QHandler, self).__init__(*args, **kwargs)
> +        if target not in QHandler.queues:
> +            QHandler.queues[target] = collections.deque(maxlen=1024)
> +        self.Q = QHandler.queues[target]
> +
> +    def emit(self, record):
> +        self.Q.append(self.format(record))
> +
> +    @classmethod
> +    def pop(cls, target):
> +        try:
> +            return cls.queues[target].popleft()
> +        except (IndexError, KeyError):
> +            return ''
> +
> +
> +def read_loggingQ(proj):
> +    return QHandler.pop(proj)
> +
> +
> +class ThreadFilter(logging.Filter):
> +
> +    def __init__(self, allowed, *args, **kwargs):
> +        super(ThreadFilter, self).__init__(*args, **kwargs)
> +        self.allowed = allowed
> +        self.thread = threading.current_thread().ident
> +
> +    def filter(self, record):
> +        if hasattr(record, '_thread'):
> +            thread = record._thread
> +        else:
> +            thread = record.thread
> +        retval = record.name in self.allowed and thread == self.thread
> +        if retval and not hasattr(record, 'context'):
> +            record.context = "[%s] " % record.levelname
> +        return retval
> +
> +
> +def with_list(func):
> +    def wrapper(_list):
> +        if not isinstance(_list, list):
> +            _list = [_list]
> +        return func(_list)
> +    return wrapper
> +
> +def logging_method(name):
> +    def decorator(func):
> +        def wrapper(*args, **kwargs):
> +            for handlers in func(*args, **kwargs):
> +                for h in handlers:
> +                    h.setFormatter(context_fmt)
> +                    local.handlers.append(h)
> +                    root.addHandler(h)
> +        logging_methods.append((name, wrapper))
> +        return wrapper
> +    return decorator
> +
> + at logging_method("streams")
> + at with_list
> +def add_stream_handlers(streams):
> +
> +    for stream in streams:
> +        out = logging.StreamHandler(stream)
> +        out.addFilter(ThreadFilter(['root' 'log', 'report', 'validation', 'echo', 'soap']))
> +        yield [out]
> +
> + at logging_method("projects")
> + at with_list
> +def add_project_handlers(projects):
> +
> +    for proj in projects:
> +        validation = logging.FileHandler(os.path.join(proj, "validation.txt"))
> +        report = logging.FileHandler(os.path.join(proj, "elbe-report.txt"))
> +        log = logging.FileHandler(os.path.join(proj, "log.txt"))
> +        echo = QHandler(proj)
> +        soap = QHandler(proj)
> +
> +        validation.addFilter(ThreadFilter(['validation']))
> +        report.addFilter(ThreadFilter(['report']))
> +        log.addFilter(ThreadFilter(['root', 'log']))
> +        echo.addFilter(ThreadFilter(['root', 'report', 'validation']))
> +        soap.addFilter(ThreadFilter(['soap']))
> +
> +        yield [validation, report, log, echo, soap]
> +
> + at logging_method("files")
> + at with_list
> +def add_file_handlers(files):
> +
> +    for f in files:
> +        if f is None:
> +            out = logging.StreamHandler(os.sys.stdout)
> +        else:
> +            out = logging.FileHandler(f)
> +        out.addFilter(ThreadFilter(['root' 'log', 'report', 'validation', 'echo', 'soap']))
> +        yield [out]
> +
> +
> + at logging_method("projectsQ")
> + at with_list
> +def add_projectQ_handlers(projects):
> +
> +    for proj in projects:
> +        echo = QHandler(proj)
> +        soap = QHandler(proj)
> +        echo.addFilter(ThreadFilter(['root', 'report', 'validation']))
> +        soap.addFilter(ThreadFilter(['soap']))
> +        yield [echo, soap]
> +
> +
> + at contextmanager
> +def elbe_logging(*args, **kwargs):
> +    try:
> +        open_logging(*args, **kwargs)
> +        yield
> +    finally:
> +        close_logging()
> +
> +
> +def open_logging(targets):
> +
> +    close_logging()
> +
> +    for method in logging_methods:
> +        key = method[0]
> +        call = method[1]
> +        if key in targets:
> +            call(targets[key])
> +
> +def close_logging():
> +    if hasattr(local, "handlers"):
> +        for h in local.handlers:
> +            root.removeHandler(h)
> +    local.handlers = []
> +
> +
> +class AsyncLogging(object):
> +
> +    def __init__(self, atmost, stream, block):
> +        self.lines = []
> +        self.epoll = select.epoll()
> +        self.atmost = atmost
> +        self.fd = None
> +        calling_thread = threading.current_thread().ident
> +        extra = {"_thread":calling_thread}
> +        extra["context"] = ""
> +        self.stream = logging.LoggerAdapter(stream, extra)
> +        self.block = logging.LoggerAdapter(block, extra)
> +
> +    def __call__(self, r, w):
> +        os.close(w)
> +        self.epoll.register(r, select.EPOLLIN | select.EPOLLHUP)
> +        self.fd = r
> +        try:
> +            self.run()
> +        finally:
> +            os.close(r)
> +
> +    def run(self):
> +        alive = True
> +        rest = ""
> +        while alive:
> +            events = self.epoll.poll()
> +            for fd, event in events:
> +                if event & select.EPOLLIN:
> +                    rest = self.read(rest)
> +                if event & select.EPOLLHUP:
> +                    alive = False
> +
> +        # Reading rest after pipe hang up
> +        while True:
> +            rest = self.read(rest)
> +            if not rest:
> +                break
> +
> +        if self.lines:
> +            self.lines[-1] += rest
> +            self.block.info("\n".join(self.lines))
> +
> +    def read(self, rest):
> +        buff = rest + os.read(self.fd, self.atmost)
> +        j = 0
> +        count = 0
> +        for i in xrange(len(buff)):
> +            if buff[i] == '\n':
> +                self.lines.append(buff[j:i])
> +                count += 1
> +                j = i + 1
> +        if count:
> +            self.stream.info("\n".join(self.lines[-count:]))
> +        return buff[j:]
> +
> +
> +def async_logging(r, w, stream, block, atmost=80):
> +    t = threading.Thread(target=AsyncLogging(atmost, stream, block),
> +                         args=(r, w))
> +    t.daemon = True
> +    t.start()
> -- 
> 2.11.0
> 
> 
> _______________________________________________
> elbe-devel mailing list
> elbe-devel at linutronix.de
> https://lists.linutronix.de/mailman/listinfo/elbe-devel

-- 
Torben Hohn
Linutronix GmbH | Bahnhofstrasse 3 | D-88690 Uhldingen-Mühlhofen
Phone: +49 7556 25 999 18; Fax.: +49 7556 25 999 99

Hinweise zum Datenschutz finden Sie hier (Informations on data privacy 
can be found here): https://linutronix.de/kontakt/Datenschutz.php

Linutronix GmbH | Firmensitz (Registered Office): Uhldingen-Mühlhofen | 
Registergericht (Registration Court): Amtsgericht Freiburg i.Br., HRB700 
806 | Geschäftsführer (Managing Directors): Heinz Egger, Thomas Gleixner



More information about the elbe-devel mailing list