[elbe-devel] [PATCH v4 01/41] log.py - New logging system

dion at linutronix.de dion at linutronix.de
Mon Jul 8 17:11:06 CEST 2019


From: Olivier Dion <dion at linutronix.de>

* logging_methods

  A logging method is registered through the decorator
  "@logging_method(name)" where name is an unique string that identify
  the logging method to use.

  Logging methods are used by elbe_logging to open specific locations
  of logging for the calling thread.

  Logging methods should also be wrapped with the "@with_list"
  decorator that ensure that the first argument passed is always a
  list.

  Finally, a logging method should act as an iterator that return a
  list of handlers to add to the thread for each iteration.

** add_stream_handlers

   This logging method name is "streams" and accepts a list of stream
   objects to open as logging output.

   Each created handlers allowed the following logger to log to it:

     - root
     - log
     - report
     - validation
     - echo
     - soap

   Every messages from those handlers are redirect to the list of
   streams provided.

** add_project_handlers

   This logging method name is "projects" and accepts a list of
   project's directory.

   Every projects open 5 handlers.

*** validation

    This handler will only receive messages from the 'validation' logger
    and output them to "project_dir/validation.txt".

*** report

    This handler will only receive messages from the 'report' logger
    and output them to "project_dir/elbe-report.txt"

*** log

    This handler will only receive messages from the 'root' and 'log'
    loggers and output them to "project_dir/log.txt".

*** echo

    This handler will only receive messages from the 'root', 'report'
    and 'validation' loggers and output them to the project's message
    queue.

*** soap

    This handler will only receive messages from the 'soap' logger and
    output them to the project's message queue.

** add_file_handlers

   This logging method name is "files" and is similar to the
   add_stream_handlers logging method, except that it takes a list of
   filepaths instead of a list of streams.

   NOTE: If a filepath is None, then an handler to sys.stdout is
   created instead.

** add_projectQ_handlers

   This logging method name is "projectsQ" and it accepts a list of
   project's directory and install two handlers per projects.

*** echo

    This handler accepts all messages from the 'root', 'report' and
    'validation' loggers and output them to all project's messages
    queue.

*** soap

    Same as echo but only accepts the 'soap' logger.

* elbe_logging context manager

  Since multiple threads can work on multiple projects and since
  logging is shared between threads, this context manager allows to
  register handlers for that thread and remove them after.

  The caller have to pass a dict with keys referencing to registered
  logging methods.  Each keys in the dict point to either a single or
  a list of accepted objects by the corresponding logging method.

** {open,close}_logging

   These functions should never be called.  Always use the
   elbe_logging context manager.

** Example

   ```````````````````````````````````````````````````````````````````
   with elbe_log({"projectsQ":"/var/cache/elbe/A",
                  "files:""/mnt/log.txt",
		  "streams":pipe})
   ```````````````````````````````````````````````````````````````````

   In this example, messages will be logged to the project's messages
   queue of project "/var/cache/elbe/A", to the file "/mnt/log.txt"
   and to the stream object 'pipe', according to the policies
   described above.

* QHandler

  A project's messages queue is unique among threads.  The queue as a
  limit of messages and will overwrite previous one if not readed.

  The queue is intended to be read by an external client, e.g SOAP.

  To read from a project's message queue, always use the read_loggingQ
  function by specifying a project directory.  If no logging message
  is avaible for that project, the empty string is returned.

* async_logging

  This function takes two files descriptors and two loggers.

  The files descriptors should act as a pipe.  The first file
  descriptor is the reading end and the second is the writing end.

  Only the reading end will be used and the writing one will be closed
  uppon starting the thread.  The reading end is close whenever the
  pipe is broken (no more messages).

  The first of the two loggers is a 'streamable' logger, i.e. that it
  will receive messages from the thread every time a newline is read.
  This means that messages sent to that logger can be out of order.

  The second loggers is a 'block' logger.  It will receive everything
  that the first logger received, but in one big blobs and thus in
  order.

  For both loggers, messages are logged with the 'info' level.

** atmost

   By default, async_logging will read from the pipe 80 characters at
   a time.  This can be changed, by passing atmost=1024 for example,
   if you know that the output will be heavy and thus reduce the
   number of system call.

Signed-off-by: Olivier Dion <dion at linutronix.de>
---
 elbepack/log.py | 224 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++
 1 file changed, 224 insertions(+)
 create mode 100644 elbepack/log.py

diff --git a/elbepack/log.py b/elbepack/log.py
new file mode 100644
index 00000000..f42e91bd
--- /dev/null
+++ b/elbepack/log.py
@@ -0,0 +1,224 @@
+# ELBE - Debian Based Embedded Rootfilesystem Builder
+# Copyright (c) 2019 Olivier Dion <dion at linutronix.de>
+#
+# SPDX-License-Identifier: GPL-3.0-or-later
+
+
+import collections
+import logging
+import os
+import select
+import threading
+from contextlib import contextmanager
+
+
+root = logging.getLogger()
+root.setLevel(logging.DEBUG)
+local = threading.local()
+context_fmt = logging.Formatter("%(context)s%(message)s")
+logging_methods = []
+
+
+class QHandler(logging.Handler):
+
+    queues = {}
+
+    def __init__(self, target, *args, **kwargs):
+        super(QHandler, self).__init__(*args, **kwargs)
+        if target not in QHandler.queues:
+            QHandler.queues[target] = collections.deque(maxlen=1024)
+        self.Q = QHandler.queues[target]
+
+    def emit(self, record):
+        self.Q.append(self.format(record))
+
+    @classmethod
+    def pop(cls, target):
+        try:
+            return cls.queues[target].popleft()
+        except (IndexError, KeyError):
+            return ''
+
+
+def read_loggingQ(proj):
+    return QHandler.pop(proj)
+
+
+class ThreadFilter(logging.Filter):
+
+    def __init__(self, allowed, *args, **kwargs):
+        super(ThreadFilter, self).__init__(*args, **kwargs)
+        self.allowed = allowed
+        self.thread = threading.current_thread().ident
+
+    def filter(self, record):
+        if hasattr(record, '_thread'):
+            thread = record._thread
+        else:
+            thread = record.thread
+        retval = record.name in self.allowed and thread == self.thread
+        if retval and not hasattr(record, 'context'):
+            record.context = "[%s] " % record.levelname
+        return retval
+
+
+def with_list(func):
+    def wrapper(_list):
+        if not isinstance(_list, list):
+            _list = [_list]
+        return func(_list)
+    return wrapper
+
+def logging_method(name):
+    def decorator(func):
+        def wrapper(*args, **kwargs):
+            for handlers in func(*args, **kwargs):
+                for h in handlers:
+                    h.setFormatter(context_fmt)
+                    local.handlers.append(h)
+                    root.addHandler(h)
+        logging_methods.append((name, wrapper))
+        return wrapper
+    return decorator
+
+ at logging_method("streams")
+ at with_list
+def add_stream_handlers(streams):
+
+    for stream in streams:
+        out = logging.StreamHandler(stream)
+        out.addFilter(ThreadFilter(['root' 'log', 'report', 'validation', 'echo', 'soap']))
+        yield [out]
+
+ at logging_method("projects")
+ at with_list
+def add_project_handlers(projects):
+
+    for proj in projects:
+        validation = logging.FileHandler(os.path.join(proj, "validation.txt"))
+        report = logging.FileHandler(os.path.join(proj, "elbe-report.txt"))
+        log = logging.FileHandler(os.path.join(proj, "log.txt"))
+        echo = QHandler(proj)
+        soap = QHandler(proj)
+
+        validation.addFilter(ThreadFilter(['validation']))
+        report.addFilter(ThreadFilter(['report']))
+        log.addFilter(ThreadFilter(['root', 'log']))
+        echo.addFilter(ThreadFilter(['root', 'report', 'validation']))
+        soap.addFilter(ThreadFilter(['soap']))
+
+        yield [validation, report, log, echo, soap]
+
+ at logging_method("files")
+ at with_list
+def add_file_handlers(files):
+
+    for f in files:
+        if f is None:
+            out = logging.StreamHandler(os.sys.stdout)
+        else:
+            out = logging.FileHandler(f)
+        out.addFilter(ThreadFilter(['root' 'log', 'report', 'validation', 'echo', 'soap']))
+        yield [out]
+
+
+ at logging_method("projectsQ")
+ at with_list
+def add_projectQ_handlers(projects):
+
+    for proj in projects:
+        echo = QHandler(proj)
+        soap = QHandler(proj)
+        echo.addFilter(ThreadFilter(['root', 'report', 'validation']))
+        soap.addFilter(ThreadFilter(['soap']))
+        yield [echo, soap]
+
+
+ at contextmanager
+def elbe_logging(*args, **kwargs):
+    try:
+        open_logging(*args, **kwargs)
+        yield
+    finally:
+        close_logging()
+
+
+def open_logging(targets):
+
+    close_logging()
+
+    for method in logging_methods:
+        key = method[0]
+        call = method[1]
+        if key in targets:
+            call(targets[key])
+
+def close_logging():
+    if hasattr(local, "handlers"):
+        for h in local.handlers:
+            root.removeHandler(h)
+    local.handlers = []
+
+
+class AsyncLogging(object):
+
+    def __init__(self, atmost, stream, block):
+        self.lines = []
+        self.epoll = select.epoll()
+        self.atmost = atmost
+        self.fd = None
+        calling_thread = threading.current_thread().ident
+        extra = {"_thread":calling_thread}
+        extra["context"] = ""
+        self.stream = logging.LoggerAdapter(stream, extra)
+        self.block = logging.LoggerAdapter(block, extra)
+
+    def __call__(self, r, w):
+        os.close(w)
+        self.epoll.register(r, select.EPOLLIN | select.EPOLLHUP)
+        self.fd = r
+        try:
+            self.run()
+        finally:
+            os.close(r)
+
+    def run(self):
+        alive = True
+        rest = ""
+        while alive:
+            events = self.epoll.poll()
+            for fd, event in events:
+                if event & select.EPOLLIN:
+                    rest = self.read(rest)
+                if event & select.EPOLLHUP:
+                    alive = False
+
+        # Reading rest after pipe hang up
+        while True:
+            rest = self.read(rest)
+            if not rest:
+                break
+
+        if self.lines:
+            self.lines[-1] += rest
+            self.block.info("\n".join(self.lines))
+
+    def read(self, rest):
+        buff = rest + os.read(self.fd, self.atmost)
+        j = 0
+        count = 0
+        for i in xrange(len(buff)):
+            if buff[i] == '\n':
+                self.lines.append(buff[j:i])
+                count += 1
+                j = i + 1
+        if count:
+            self.stream.info("\n".join(self.lines[-count:]))
+        return buff[j:]
+
+
+def async_logging(r, w, stream, block, atmost=80):
+    t = threading.Thread(target=AsyncLogging(atmost, stream, block),
+                         args=(r, w))
+    t.daemon = True
+    t.start()
-- 
2.11.0




More information about the elbe-devel mailing list