[elbe-devel] [PATCH v4 01/41] log.py - New logging system
Torben Hohn
torben.hohn at linutronix.de
Wed Aug 7 11:43:37 CEST 2019
On Mon, Jul 08, 2019 at 05:11:06PM +0200, dion at linutronix.de wrote:
> From: Olivier Dion <dion at linutronix.de>
>
> * logging_methods
>
> A logging method is registered through the decorator
> "@logging_method(name)" where name is an unique string that identify
> the logging method to use.
>
> Logging methods are used by elbe_logging to open specific locations
> of logging for the calling thread.
>
> Logging methods should also be wrapped with the "@with_list"
> decorator that ensure that the first argument passed is always a
> list.
>
> Finally, a logging method should act as an iterator that return a
> list of handlers to add to the thread for each iteration.
>
> ** add_stream_handlers
>
> This logging method name is "streams" and accepts a list of stream
> objects to open as logging output.
>
> Each created handlers allowed the following logger to log to it:
>
> - root
> - log
> - report
> - validation
> - echo
> - soap
>
> Every messages from those handlers are redirect to the list of
> streams provided.
>
> ** add_project_handlers
>
> This logging method name is "projects" and accepts a list of
> project's directory.
>
> Every projects open 5 handlers.
>
> *** validation
>
> This handler will only receive messages from the 'validation' logger
> and output them to "project_dir/validation.txt".
>
> *** report
>
> This handler will only receive messages from the 'report' logger
> and output them to "project_dir/elbe-report.txt"
>
> *** log
>
> This handler will only receive messages from the 'root' and 'log'
> loggers and output them to "project_dir/log.txt".
>
> *** echo
>
> This handler will only receive messages from the 'root', 'report'
> and 'validation' loggers and output them to the project's message
> queue.
>
> *** soap
>
> This handler will only receive messages from the 'soap' logger and
> output them to the project's message queue.
>
> ** add_file_handlers
>
> This logging method name is "files" and is similar to the
> add_stream_handlers logging method, except that it takes a list of
> filepaths instead of a list of streams.
>
> NOTE: If a filepath is None, then an handler to sys.stdout is
> created instead.
>
> ** add_projectQ_handlers
>
> This logging method name is "projectsQ" and it accepts a list of
> project's directory and install two handlers per projects.
>
> *** echo
>
> This handler accepts all messages from the 'root', 'report' and
> 'validation' loggers and output them to all project's messages
> queue.
>
> *** soap
>
> Same as echo but only accepts the 'soap' logger.
>
> * elbe_logging context manager
>
> Since multiple threads can work on multiple projects and since
> logging is shared between threads, this context manager allows to
> register handlers for that thread and remove them after.
>
> The caller have to pass a dict with keys referencing to registered
> logging methods. Each keys in the dict point to either a single or
> a list of accepted objects by the corresponding logging method.
>
> ** {open,close}_logging
>
> These functions should never be called. Always use the
> elbe_logging context manager.
>
> ** Example
>
> ```````````````````````````````````````````````````````````````````
> with elbe_log({"projectsQ":"/var/cache/elbe/A",
> "files:""/mnt/log.txt",
> "streams":pipe})
> ```````````````````````````````````````````````````````````````````
>
> In this example, messages will be logged to the project's messages
> queue of project "/var/cache/elbe/A", to the file "/mnt/log.txt"
> and to the stream object 'pipe', according to the policies
> described above.
>
> * QHandler
>
> A project's messages queue is unique among threads. The queue as a
> limit of messages and will overwrite previous one if not readed.
>
> The queue is intended to be read by an external client, e.g SOAP.
>
> To read from a project's message queue, always use the read_loggingQ
> function by specifying a project directory. If no logging message
> is avaible for that project, the empty string is returned.
>
> * async_logging
>
> This function takes two files descriptors and two loggers.
>
> The files descriptors should act as a pipe. The first file
> descriptor is the reading end and the second is the writing end.
>
> Only the reading end will be used and the writing one will be closed
> uppon starting the thread. The reading end is close whenever the
> pipe is broken (no more messages).
>
> The first of the two loggers is a 'streamable' logger, i.e. that it
> will receive messages from the thread every time a newline is read.
> This means that messages sent to that logger can be out of order.
>
> The second loggers is a 'block' logger. It will receive everything
> that the first logger received, but in one big blobs and thus in
> order.
>
> For both loggers, messages are logged with the 'info' level.
>
> ** atmost
>
> By default, async_logging will read from the pipe 80 characters at
> a time. This can be changed, by passing atmost=1024 for example,
> if you know that the output will be heavy and thus reduce the
> number of system call.
>
> Signed-off-by: Olivier Dion <dion at linutronix.de>
Reviewed-by: Torben Hohn <torben.hohn at linutronix.de>
> ---
> elbepack/log.py | 224 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++
> 1 file changed, 224 insertions(+)
> create mode 100644 elbepack/log.py
>
> diff --git a/elbepack/log.py b/elbepack/log.py
> new file mode 100644
> index 00000000..f42e91bd
> --- /dev/null
> +++ b/elbepack/log.py
> @@ -0,0 +1,224 @@
> +# ELBE - Debian Based Embedded Rootfilesystem Builder
> +# Copyright (c) 2019 Olivier Dion <dion at linutronix.de>
> +#
> +# SPDX-License-Identifier: GPL-3.0-or-later
> +
> +
> +import collections
> +import logging
> +import os
> +import select
> +import threading
> +from contextlib import contextmanager
> +
> +
> +root = logging.getLogger()
> +root.setLevel(logging.DEBUG)
> +local = threading.local()
> +context_fmt = logging.Formatter("%(context)s%(message)s")
> +logging_methods = []
> +
> +
> +class QHandler(logging.Handler):
> +
> + queues = {}
> +
> + def __init__(self, target, *args, **kwargs):
> + super(QHandler, self).__init__(*args, **kwargs)
> + if target not in QHandler.queues:
> + QHandler.queues[target] = collections.deque(maxlen=1024)
> + self.Q = QHandler.queues[target]
> +
> + def emit(self, record):
> + self.Q.append(self.format(record))
> +
> + @classmethod
> + def pop(cls, target):
> + try:
> + return cls.queues[target].popleft()
> + except (IndexError, KeyError):
> + return ''
> +
> +
> +def read_loggingQ(proj):
> + return QHandler.pop(proj)
> +
> +
> +class ThreadFilter(logging.Filter):
> +
> + def __init__(self, allowed, *args, **kwargs):
> + super(ThreadFilter, self).__init__(*args, **kwargs)
> + self.allowed = allowed
> + self.thread = threading.current_thread().ident
> +
> + def filter(self, record):
> + if hasattr(record, '_thread'):
> + thread = record._thread
> + else:
> + thread = record.thread
> + retval = record.name in self.allowed and thread == self.thread
> + if retval and not hasattr(record, 'context'):
> + record.context = "[%s] " % record.levelname
> + return retval
> +
> +
> +def with_list(func):
> + def wrapper(_list):
> + if not isinstance(_list, list):
> + _list = [_list]
> + return func(_list)
> + return wrapper
> +
> +def logging_method(name):
> + def decorator(func):
> + def wrapper(*args, **kwargs):
> + for handlers in func(*args, **kwargs):
> + for h in handlers:
> + h.setFormatter(context_fmt)
> + local.handlers.append(h)
> + root.addHandler(h)
> + logging_methods.append((name, wrapper))
> + return wrapper
> + return decorator
> +
> + at logging_method("streams")
> + at with_list
> +def add_stream_handlers(streams):
> +
> + for stream in streams:
> + out = logging.StreamHandler(stream)
> + out.addFilter(ThreadFilter(['root' 'log', 'report', 'validation', 'echo', 'soap']))
> + yield [out]
> +
> + at logging_method("projects")
> + at with_list
> +def add_project_handlers(projects):
> +
> + for proj in projects:
> + validation = logging.FileHandler(os.path.join(proj, "validation.txt"))
> + report = logging.FileHandler(os.path.join(proj, "elbe-report.txt"))
> + log = logging.FileHandler(os.path.join(proj, "log.txt"))
> + echo = QHandler(proj)
> + soap = QHandler(proj)
> +
> + validation.addFilter(ThreadFilter(['validation']))
> + report.addFilter(ThreadFilter(['report']))
> + log.addFilter(ThreadFilter(['root', 'log']))
> + echo.addFilter(ThreadFilter(['root', 'report', 'validation']))
> + soap.addFilter(ThreadFilter(['soap']))
> +
> + yield [validation, report, log, echo, soap]
> +
> + at logging_method("files")
> + at with_list
> +def add_file_handlers(files):
> +
> + for f in files:
> + if f is None:
> + out = logging.StreamHandler(os.sys.stdout)
> + else:
> + out = logging.FileHandler(f)
> + out.addFilter(ThreadFilter(['root' 'log', 'report', 'validation', 'echo', 'soap']))
> + yield [out]
> +
> +
> + at logging_method("projectsQ")
> + at with_list
> +def add_projectQ_handlers(projects):
> +
> + for proj in projects:
> + echo = QHandler(proj)
> + soap = QHandler(proj)
> + echo.addFilter(ThreadFilter(['root', 'report', 'validation']))
> + soap.addFilter(ThreadFilter(['soap']))
> + yield [echo, soap]
> +
> +
> + at contextmanager
> +def elbe_logging(*args, **kwargs):
> + try:
> + open_logging(*args, **kwargs)
> + yield
> + finally:
> + close_logging()
> +
> +
> +def open_logging(targets):
> +
> + close_logging()
> +
> + for method in logging_methods:
> + key = method[0]
> + call = method[1]
> + if key in targets:
> + call(targets[key])
> +
> +def close_logging():
> + if hasattr(local, "handlers"):
> + for h in local.handlers:
> + root.removeHandler(h)
> + local.handlers = []
> +
> +
> +class AsyncLogging(object):
> +
> + def __init__(self, atmost, stream, block):
> + self.lines = []
> + self.epoll = select.epoll()
> + self.atmost = atmost
> + self.fd = None
> + calling_thread = threading.current_thread().ident
> + extra = {"_thread":calling_thread}
> + extra["context"] = ""
> + self.stream = logging.LoggerAdapter(stream, extra)
> + self.block = logging.LoggerAdapter(block, extra)
> +
> + def __call__(self, r, w):
> + os.close(w)
> + self.epoll.register(r, select.EPOLLIN | select.EPOLLHUP)
> + self.fd = r
> + try:
> + self.run()
> + finally:
> + os.close(r)
> +
> + def run(self):
> + alive = True
> + rest = ""
> + while alive:
> + events = self.epoll.poll()
> + for fd, event in events:
> + if event & select.EPOLLIN:
> + rest = self.read(rest)
> + if event & select.EPOLLHUP:
> + alive = False
> +
> + # Reading rest after pipe hang up
> + while True:
> + rest = self.read(rest)
> + if not rest:
> + break
> +
> + if self.lines:
> + self.lines[-1] += rest
> + self.block.info("\n".join(self.lines))
> +
> + def read(self, rest):
> + buff = rest + os.read(self.fd, self.atmost)
> + j = 0
> + count = 0
> + for i in xrange(len(buff)):
> + if buff[i] == '\n':
> + self.lines.append(buff[j:i])
> + count += 1
> + j = i + 1
> + if count:
> + self.stream.info("\n".join(self.lines[-count:]))
> + return buff[j:]
> +
> +
> +def async_logging(r, w, stream, block, atmost=80):
> + t = threading.Thread(target=AsyncLogging(atmost, stream, block),
> + args=(r, w))
> + t.daemon = True
> + t.start()
> --
> 2.11.0
>
>
> _______________________________________________
> elbe-devel mailing list
> elbe-devel at linutronix.de
> https://lists.linutronix.de/mailman/listinfo/elbe-devel
--
Torben Hohn
Linutronix GmbH | Bahnhofstrasse 3 | D-88690 Uhldingen-Mühlhofen
Phone: +49 7556 25 999 18; Fax.: +49 7556 25 999 99
Hinweise zum Datenschutz finden Sie hier (Informations on data privacy
can be found here): https://linutronix.de/kontakt/Datenschutz.php
Linutronix GmbH | Firmensitz (Registered Office): Uhldingen-Mühlhofen |
Registergericht (Registration Court): Amtsgericht Freiburg i.Br., HRB700
806 | Geschäftsführer (Managing Directors): Heinz Egger, Thomas Gleixner
More information about the elbe-devel
mailing list