[elbe-devel] [PATCH 6/7] elbepack: asyncworker: use multiprocessing as execution backend
Thomas Weißschuh
thomas.weissschuh at linutronix.de
Mon Aug 19 16:50:18 CEST 2024
Using a thread is has impacts on the general state of the elbe process.
For example the working directory, as can be seen by the calls to
os.chdir().
Instead use a multiprocessing.Process which has better isolation,
especially if at some time multiple async jobs are execute concurrently.
Signed-off-by: Thomas Weißschuh <thomas.weissschuh at linutronix.de>
---
elbepack/asyncworker.py | 7 +++----
elbepack/log.py | 15 +++++++++++++--
2 files changed, 16 insertions(+), 6 deletions(-)
diff --git a/elbepack/asyncworker.py b/elbepack/asyncworker.py
index b4ead5afed51..1db4ab819d34 100644
--- a/elbepack/asyncworker.py
+++ b/elbepack/asyncworker.py
@@ -3,10 +3,9 @@
# SPDX-FileCopyrightText: 2014-2018 Linutronix GmbH
import logging
+import multiprocessing
from contextlib import contextmanager
from os import chdir, getcwd
-from queue import Queue
-from threading import Thread
from elbepack.elbeproject import AptCacheCommitError, AptCacheUpdateError
from elbepack.log import elbe_logging, read_maxlevel, reset_level
@@ -288,11 +287,11 @@ def savecwd():
chdir(oldcwd)
-class AsyncWorker(Thread):
+class AsyncWorker(multiprocessing.Process):
def __init__(self, db):
super().__init__(name='AsyncWorker')
self.db = db
- self.queue = Queue()
+ self.queue = multiprocessing.JoinableQueue()
self.start()
def stop(self):
diff --git a/elbepack/log.py b/elbepack/log.py
index 0ca828dfb999..c5f803170205 100644
--- a/elbepack/log.py
+++ b/elbepack/log.py
@@ -5,6 +5,8 @@
import collections
import logging
+import multiprocessing
+import multiprocessing.managers
import os
import re
import threading
@@ -37,14 +39,23 @@ class LoggingQueue(collections.deque):
return self._max_level
-_queues = {}
+class _LoggingManager(multiprocessing.managers.BaseManager):
+ pass
+
+
+_LoggingManager.register('dict', dict, multiprocessing.managers.DictProxy)
+_LoggingManager.register('LoggingQueue', LoggingQueue)
+
+_manager = _LoggingManager()
+_manager.start()
+_queues = _manager.dict() # type: ignore
class QHandler(logging.Handler):
def __init__(self, target, *args, **kwargs):
super().__init__(*args, **kwargs)
if target not in _queues:
- _queues[target] = LoggingQueue()
+ _queues[target] = _manager.LoggingQueue()
self.Q = _queues[target]
def emit(self, record):
--
2.46.0
More information about the elbe-devel
mailing list