[elbe-devel] [PATCH 6/7] elbepack: asyncworker: use multiprocessing as execution backend

Thomas Weißschuh thomas.weissschuh at linutronix.de
Mon Aug 19 16:50:18 CEST 2024


Using a thread is has impacts on the general state of the elbe process.
For example the working directory, as can be seen by the calls to
os.chdir().
Instead use a multiprocessing.Process which has better isolation,
especially if at some time multiple async jobs are execute concurrently.

Signed-off-by: Thomas Weißschuh <thomas.weissschuh at linutronix.de>
---
 elbepack/asyncworker.py |  7 +++----
 elbepack/log.py         | 15 +++++++++++++--
 2 files changed, 16 insertions(+), 6 deletions(-)

diff --git a/elbepack/asyncworker.py b/elbepack/asyncworker.py
index b4ead5afed51..1db4ab819d34 100644
--- a/elbepack/asyncworker.py
+++ b/elbepack/asyncworker.py
@@ -3,10 +3,9 @@
 # SPDX-FileCopyrightText: 2014-2018 Linutronix GmbH
 
 import logging
+import multiprocessing
 from contextlib import contextmanager
 from os import chdir, getcwd
-from queue import Queue
-from threading import Thread
 
 from elbepack.elbeproject import AptCacheCommitError, AptCacheUpdateError
 from elbepack.log import elbe_logging, read_maxlevel, reset_level
@@ -288,11 +287,11 @@ def savecwd():
         chdir(oldcwd)
 
 
-class AsyncWorker(Thread):
+class AsyncWorker(multiprocessing.Process):
     def __init__(self, db):
         super().__init__(name='AsyncWorker')
         self.db = db
-        self.queue = Queue()
+        self.queue = multiprocessing.JoinableQueue()
         self.start()
 
     def stop(self):
diff --git a/elbepack/log.py b/elbepack/log.py
index 0ca828dfb999..c5f803170205 100644
--- a/elbepack/log.py
+++ b/elbepack/log.py
@@ -5,6 +5,8 @@
 
 import collections
 import logging
+import multiprocessing
+import multiprocessing.managers
 import os
 import re
 import threading
@@ -37,14 +39,23 @@ class LoggingQueue(collections.deque):
         return self._max_level
 
 
-_queues = {}
+class _LoggingManager(multiprocessing.managers.BaseManager):
+    pass
+
+
+_LoggingManager.register('dict', dict, multiprocessing.managers.DictProxy)
+_LoggingManager.register('LoggingQueue', LoggingQueue)
+
+_manager = _LoggingManager()
+_manager.start()
+_queues = _manager.dict()  # type: ignore
 
 
 class QHandler(logging.Handler):
     def __init__(self, target, *args, **kwargs):
         super().__init__(*args, **kwargs)
         if target not in _queues:
-            _queues[target] = LoggingQueue()
+            _queues[target] = _manager.LoggingQueue()
         self.Q = _queues[target]
 
     def emit(self, record):

-- 
2.46.0



More information about the elbe-devel mailing list