[elbe-devel] [PATCH v4 01/41] log.py - New logging system
dion at linutronix.de
dion at linutronix.de
Mon Jul 8 17:11:06 CEST 2019
From: Olivier Dion <dion at linutronix.de>
* logging_methods
A logging method is registered through the decorator
"@logging_method(name)" where name is an unique string that identify
the logging method to use.
Logging methods are used by elbe_logging to open specific locations
of logging for the calling thread.
Logging methods should also be wrapped with the "@with_list"
decorator that ensure that the first argument passed is always a
list.
Finally, a logging method should act as an iterator that return a
list of handlers to add to the thread for each iteration.
** add_stream_handlers
This logging method name is "streams" and accepts a list of stream
objects to open as logging output.
Each created handlers allowed the following logger to log to it:
- root
- log
- report
- validation
- echo
- soap
Every messages from those handlers are redirect to the list of
streams provided.
** add_project_handlers
This logging method name is "projects" and accepts a list of
project's directory.
Every projects open 5 handlers.
*** validation
This handler will only receive messages from the 'validation' logger
and output them to "project_dir/validation.txt".
*** report
This handler will only receive messages from the 'report' logger
and output them to "project_dir/elbe-report.txt"
*** log
This handler will only receive messages from the 'root' and 'log'
loggers and output them to "project_dir/log.txt".
*** echo
This handler will only receive messages from the 'root', 'report'
and 'validation' loggers and output them to the project's message
queue.
*** soap
This handler will only receive messages from the 'soap' logger and
output them to the project's message queue.
** add_file_handlers
This logging method name is "files" and is similar to the
add_stream_handlers logging method, except that it takes a list of
filepaths instead of a list of streams.
NOTE: If a filepath is None, then an handler to sys.stdout is
created instead.
** add_projectQ_handlers
This logging method name is "projectsQ" and it accepts a list of
project's directory and install two handlers per projects.
*** echo
This handler accepts all messages from the 'root', 'report' and
'validation' loggers and output them to all project's messages
queue.
*** soap
Same as echo but only accepts the 'soap' logger.
* elbe_logging context manager
Since multiple threads can work on multiple projects and since
logging is shared between threads, this context manager allows to
register handlers for that thread and remove them after.
The caller have to pass a dict with keys referencing to registered
logging methods. Each keys in the dict point to either a single or
a list of accepted objects by the corresponding logging method.
** {open,close}_logging
These functions should never be called. Always use the
elbe_logging context manager.
** Example
```````````````````````````````````````````````````````````````````
with elbe_log({"projectsQ":"/var/cache/elbe/A",
"files:""/mnt/log.txt",
"streams":pipe})
```````````````````````````````````````````````````````````````````
In this example, messages will be logged to the project's messages
queue of project "/var/cache/elbe/A", to the file "/mnt/log.txt"
and to the stream object 'pipe', according to the policies
described above.
* QHandler
A project's messages queue is unique among threads. The queue as a
limit of messages and will overwrite previous one if not readed.
The queue is intended to be read by an external client, e.g SOAP.
To read from a project's message queue, always use the read_loggingQ
function by specifying a project directory. If no logging message
is avaible for that project, the empty string is returned.
* async_logging
This function takes two files descriptors and two loggers.
The files descriptors should act as a pipe. The first file
descriptor is the reading end and the second is the writing end.
Only the reading end will be used and the writing one will be closed
uppon starting the thread. The reading end is close whenever the
pipe is broken (no more messages).
The first of the two loggers is a 'streamable' logger, i.e. that it
will receive messages from the thread every time a newline is read.
This means that messages sent to that logger can be out of order.
The second loggers is a 'block' logger. It will receive everything
that the first logger received, but in one big blobs and thus in
order.
For both loggers, messages are logged with the 'info' level.
** atmost
By default, async_logging will read from the pipe 80 characters at
a time. This can be changed, by passing atmost=1024 for example,
if you know that the output will be heavy and thus reduce the
number of system call.
Signed-off-by: Olivier Dion <dion at linutronix.de>
---
elbepack/log.py | 224 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++
1 file changed, 224 insertions(+)
create mode 100644 elbepack/log.py
diff --git a/elbepack/log.py b/elbepack/log.py
new file mode 100644
index 00000000..f42e91bd
--- /dev/null
+++ b/elbepack/log.py
@@ -0,0 +1,224 @@
+# ELBE - Debian Based Embedded Rootfilesystem Builder
+# Copyright (c) 2019 Olivier Dion <dion at linutronix.de>
+#
+# SPDX-License-Identifier: GPL-3.0-or-later
+
+
+import collections
+import logging
+import os
+import select
+import threading
+from contextlib import contextmanager
+
+
+root = logging.getLogger()
+root.setLevel(logging.DEBUG)
+local = threading.local()
+context_fmt = logging.Formatter("%(context)s%(message)s")
+logging_methods = []
+
+
+class QHandler(logging.Handler):
+
+ queues = {}
+
+ def __init__(self, target, *args, **kwargs):
+ super(QHandler, self).__init__(*args, **kwargs)
+ if target not in QHandler.queues:
+ QHandler.queues[target] = collections.deque(maxlen=1024)
+ self.Q = QHandler.queues[target]
+
+ def emit(self, record):
+ self.Q.append(self.format(record))
+
+ @classmethod
+ def pop(cls, target):
+ try:
+ return cls.queues[target].popleft()
+ except (IndexError, KeyError):
+ return ''
+
+
+def read_loggingQ(proj):
+ return QHandler.pop(proj)
+
+
+class ThreadFilter(logging.Filter):
+
+ def __init__(self, allowed, *args, **kwargs):
+ super(ThreadFilter, self).__init__(*args, **kwargs)
+ self.allowed = allowed
+ self.thread = threading.current_thread().ident
+
+ def filter(self, record):
+ if hasattr(record, '_thread'):
+ thread = record._thread
+ else:
+ thread = record.thread
+ retval = record.name in self.allowed and thread == self.thread
+ if retval and not hasattr(record, 'context'):
+ record.context = "[%s] " % record.levelname
+ return retval
+
+
+def with_list(func):
+ def wrapper(_list):
+ if not isinstance(_list, list):
+ _list = [_list]
+ return func(_list)
+ return wrapper
+
+def logging_method(name):
+ def decorator(func):
+ def wrapper(*args, **kwargs):
+ for handlers in func(*args, **kwargs):
+ for h in handlers:
+ h.setFormatter(context_fmt)
+ local.handlers.append(h)
+ root.addHandler(h)
+ logging_methods.append((name, wrapper))
+ return wrapper
+ return decorator
+
+ at logging_method("streams")
+ at with_list
+def add_stream_handlers(streams):
+
+ for stream in streams:
+ out = logging.StreamHandler(stream)
+ out.addFilter(ThreadFilter(['root' 'log', 'report', 'validation', 'echo', 'soap']))
+ yield [out]
+
+ at logging_method("projects")
+ at with_list
+def add_project_handlers(projects):
+
+ for proj in projects:
+ validation = logging.FileHandler(os.path.join(proj, "validation.txt"))
+ report = logging.FileHandler(os.path.join(proj, "elbe-report.txt"))
+ log = logging.FileHandler(os.path.join(proj, "log.txt"))
+ echo = QHandler(proj)
+ soap = QHandler(proj)
+
+ validation.addFilter(ThreadFilter(['validation']))
+ report.addFilter(ThreadFilter(['report']))
+ log.addFilter(ThreadFilter(['root', 'log']))
+ echo.addFilter(ThreadFilter(['root', 'report', 'validation']))
+ soap.addFilter(ThreadFilter(['soap']))
+
+ yield [validation, report, log, echo, soap]
+
+ at logging_method("files")
+ at with_list
+def add_file_handlers(files):
+
+ for f in files:
+ if f is None:
+ out = logging.StreamHandler(os.sys.stdout)
+ else:
+ out = logging.FileHandler(f)
+ out.addFilter(ThreadFilter(['root' 'log', 'report', 'validation', 'echo', 'soap']))
+ yield [out]
+
+
+ at logging_method("projectsQ")
+ at with_list
+def add_projectQ_handlers(projects):
+
+ for proj in projects:
+ echo = QHandler(proj)
+ soap = QHandler(proj)
+ echo.addFilter(ThreadFilter(['root', 'report', 'validation']))
+ soap.addFilter(ThreadFilter(['soap']))
+ yield [echo, soap]
+
+
+ at contextmanager
+def elbe_logging(*args, **kwargs):
+ try:
+ open_logging(*args, **kwargs)
+ yield
+ finally:
+ close_logging()
+
+
+def open_logging(targets):
+
+ close_logging()
+
+ for method in logging_methods:
+ key = method[0]
+ call = method[1]
+ if key in targets:
+ call(targets[key])
+
+def close_logging():
+ if hasattr(local, "handlers"):
+ for h in local.handlers:
+ root.removeHandler(h)
+ local.handlers = []
+
+
+class AsyncLogging(object):
+
+ def __init__(self, atmost, stream, block):
+ self.lines = []
+ self.epoll = select.epoll()
+ self.atmost = atmost
+ self.fd = None
+ calling_thread = threading.current_thread().ident
+ extra = {"_thread":calling_thread}
+ extra["context"] = ""
+ self.stream = logging.LoggerAdapter(stream, extra)
+ self.block = logging.LoggerAdapter(block, extra)
+
+ def __call__(self, r, w):
+ os.close(w)
+ self.epoll.register(r, select.EPOLLIN | select.EPOLLHUP)
+ self.fd = r
+ try:
+ self.run()
+ finally:
+ os.close(r)
+
+ def run(self):
+ alive = True
+ rest = ""
+ while alive:
+ events = self.epoll.poll()
+ for fd, event in events:
+ if event & select.EPOLLIN:
+ rest = self.read(rest)
+ if event & select.EPOLLHUP:
+ alive = False
+
+ # Reading rest after pipe hang up
+ while True:
+ rest = self.read(rest)
+ if not rest:
+ break
+
+ if self.lines:
+ self.lines[-1] += rest
+ self.block.info("\n".join(self.lines))
+
+ def read(self, rest):
+ buff = rest + os.read(self.fd, self.atmost)
+ j = 0
+ count = 0
+ for i in xrange(len(buff)):
+ if buff[i] == '\n':
+ self.lines.append(buff[j:i])
+ count += 1
+ j = i + 1
+ if count:
+ self.stream.info("\n".join(self.lines[-count:]))
+ return buff[j:]
+
+
+def async_logging(r, w, stream, block, atmost=80):
+ t = threading.Thread(target=AsyncLogging(atmost, stream, block),
+ args=(r, w))
+ t.daemon = True
+ t.start()
--
2.11.0
More information about the elbe-devel
mailing list