[elbe-devel] [PATCH] Revert "elbepack: asyncworker: use multiprocessing as execution backend"

Thomas Weißschuh thomas.weissschuh at linutronix.de
Fri Mar 14 17:24:21 CET 2025


The different multiprocessing managers used within ELBE seem to interact badly
in some cases and may lead to errors like the following:

```
  File "/usr/lib/python3.11/logging/__init__.py", line 978, in handle
    self.emit(record)
  File "/var/cache/elbe/devel/elbepack/log.py", line 79, in emit
    self.Q.append(self.format(record))
  File "<string>", line 2, in append
  File "/usr/lib/python3.11/multiprocessing/managers.py", line 821, in _callmethod
    conn.send((self._id, methodname, args, kwds))
  File "/usr/lib/python3.11/multiprocessing/connection.py", line 205, in send
    self._send_bytes(_ForkingPickler.dumps(obj))
  File "/usr/lib/python3.11/multiprocessing/connection.py", line 410, in _send_bytes
    self._send(header + buf)
  File "/usr/lib/python3.11/multiprocessing/connection.py", line 367, in _send
    n = write(self._handle, buf)
        ^^^^^^^^^^^^^^^^^^^^^^^^
OSError: [Errno 9] Bad file descriptor
```

Depending on the specific ELBE usage pattern these exception may trigger very
infrequently or every time.
The usage of multiprocessing in the asyncworker framework was a preparation for
parallel execution of async workers.
As that followup work is not finished yet, fall back to the old threading based
implementation and figure out the root cause of the issue afterwards.

This reverts commit 8850f7ec5482 ("elbepack: asyncworker: use multiprocessing as execution backend")

Signed-off-by: Thomas Weißschuh <thomas.weissschuh at linutronix.de>
---
 elbepack/asyncworker.py |  7 ++++---
 elbepack/log.py         | 22 ++--------------------
 2 files changed, 6 insertions(+), 23 deletions(-)

diff --git a/elbepack/asyncworker.py b/elbepack/asyncworker.py
index f07debe023639d8b8fe67f29fe7799dfed31becc..112b844c141d12c9e3bca114118e952875eea1f4 100644
--- a/elbepack/asyncworker.py
+++ b/elbepack/asyncworker.py
@@ -3,8 +3,9 @@
 # SPDX-FileCopyrightText: 2014-2018 Linutronix GmbH
 
 import logging
-import multiprocessing
 import os
+from queue import Queue
+from threading import Thread
 
 from elbepack.elbeproject import AptCacheCommitError, AptCacheUpdateError
 from elbepack.log import elbe_logging, read_maxlevel, reset_level
@@ -277,11 +278,11 @@ class UpdatePbuilderJob(AsyncWorkerJob):
             db.reset_busy(self.project.builddir, success)
 
 
-class AsyncWorker(multiprocessing.Process):
+class AsyncWorker(Thread):
     def __init__(self, db):
         super().__init__(name='AsyncWorker')
         self.db = db
-        self.queue = multiprocessing.JoinableQueue()
+        self.queue = Queue()
         self.start()
 
     def stop(self):
diff --git a/elbepack/log.py b/elbepack/log.py
index ef7eb5a5d9fb4cee3570a4ad46d7d64b7e40d231..136ec521fbd25a6a73fd079c85f59ad2ecf18ef4 100644
--- a/elbepack/log.py
+++ b/elbepack/log.py
@@ -6,11 +6,8 @@
 import collections
 import functools
 import logging
-import multiprocessing
-import multiprocessing.managers
 import os
 import re
-import sys
 import threading
 from contextlib import contextmanager
 
@@ -34,12 +31,6 @@ def _swallow_kwargs(func, *names):
     return _wrapper
 
 
-if sys.version_info < (3, 9, 7) and hasattr(multiprocessing.managers, 'AutoProxy'):
-    # See https://bugs.python.org/issue30256 and linked issues
-    multiprocessing.managers.AutoProxy = _swallow_kwargs(
-            multiprocessing.managers.AutoProxy, 'manager_owned')
-
-
 class LoggingQueue(collections.deque):
     def __init__(self):
         super().__init__(maxlen=1024)
@@ -56,23 +47,14 @@ class LoggingQueue(collections.deque):
         return self._max_level
 
 
-class _LoggingManager(multiprocessing.managers.BaseManager):
-    pass
-
-
-_LoggingManager.register('dict', dict, multiprocessing.managers.DictProxy)
-_LoggingManager.register('LoggingQueue', LoggingQueue)
-
-_manager = _LoggingManager()
-_manager.start()
-_queues = _manager.dict()  # type: ignore
+_queues = {}
 
 
 class QHandler(logging.Handler):
     def __init__(self, target, *args, **kwargs):
         super().__init__(*args, **kwargs)
         if target not in _queues:
-            _queues[target] = _manager.LoggingQueue()
+            _queues[target] = LoggingQueue()
         self.Q = _queues[target]
 
     def emit(self, record):

---
base-commit: 18572c85862e22931e83c83c7b9cff3f1d1f0c8f
change-id: 20250314-asyncworker-multiprocessing-bug-8f83029601a8

Best regards,
-- 
Thomas Weißschuh <thomas.weissschuh at linutronix.de>



More information about the elbe-devel mailing list