[elbe-devel] [PATCH 2/4] elbepack: log: introduce shutdown protocol for async logger

Thomas Weißschuh thomas.weissschuh at linutronix.de
Fri Apr 26 16:25:37 CEST 2024


The AsyncLogging object reads data from its pipe until all copies of
the writing end fd are closed.

The call to os.close(w) assumes that there are other copies of the write
fd around which is a very close coupling to its current callers, namely
the rpcaptcache manager and subprocess.Popen(close_fds=True, stderr=w).

These existing assumptions are non-obvious and brittle in the face of
code refactorings. Replace it with an obvious shutdown() method.

Signed-off-by: Thomas Weißschuh <thomas.weissschuh at linutronix.de>
---
 elbepack/log.py         | 28 +++++++++++++++-------------
 elbepack/rpcaptcache.py |  3 ++-
 2 files changed, 17 insertions(+), 14 deletions(-)

diff --git a/elbepack/log.py b/elbepack/log.py
index a76ee53b0518..c47d9ea03e2a 100644
--- a/elbepack/log.py
+++ b/elbepack/log.py
@@ -221,32 +221,35 @@ def close_logging():
     local.handlers = []
 
 
-class AsyncLogging:
+class AsyncLogging(threading.Thread):
 
-    def __init__(self, atmost, stream, block):
+    def __init__(self, atmost, stream, block, r, w):
+        super().__init__(daemon=True)
         self.lines = []
         self.atmost = atmost
-        self.fd = None
+        self.read_fd = r
+        self.write_fd = w
         calling_thread = threading.current_thread().ident
         extra = {'_thread': calling_thread}
         extra['context'] = ''
         self.stream = logging.LoggerAdapter(stream, extra)
         self.block = logging.LoggerAdapter(block, extra)
 
-    def __call__(self, r, w):
-        os.close(w)
-        self.fd = r
+    def run(self):
         try:
-            self.run()
+            self.__run()
         finally:
-            os.close(r)
+            os.close(self.read_fd)
 
-    def run(self):
+    def shutdown(self):
+        os.close(self.write_fd)
+
+    def __run(self):
         rest = ''
 
         while True:
 
-            buf = os.read(self.fd, self.atmost).decode('utf-8', errors='replace')
+            buf = os.read(self.read_fd, self.atmost).decode('utf-8', errors='replace')
 
             # Pipe broke
             if not buf:
@@ -282,9 +285,7 @@ class AsyncLogging:
 
 
 def async_logging(r, w, stream, block, atmost=4096):
-    t = threading.Thread(target=AsyncLogging(atmost, stream, block),
-                         args=(r, w))
-    t.daemon = True
+    t = AsyncLogging(atmost, stream, block, r, w)
     t.start()
     return t
 
@@ -295,4 +296,5 @@ def async_logging_ctx(*args, **kwargs):
     try:
         yield
     finally:
+        t.shutdown()
         t.join()
diff --git a/elbepack/rpcaptcache.py b/elbepack/rpcaptcache.py
index b9985414091f..cb9f71804c01 100644
--- a/elbepack/rpcaptcache.py
+++ b/elbepack/rpcaptcache.py
@@ -56,8 +56,9 @@ class MyMan(BaseManager):
     def start(self):
         """Redirect outputs of the process to an async logging thread"""
         r, w = os.pipe()
+        alog = async_logging(r, w, soap, log)
+        self.log_finalizer = Finalize(self, alog.shutdown)
         super(MyMan, self).start(MyMan.redirect_outputs, [r, w])
-        async_logging(r, w, soap, log)
 
 
 class InChRootObject:

-- 
2.44.0



More information about the elbe-devel mailing list