[elbe-devel] [PATCH 1/3] commands test: Asynchronous testing

Olivier Dion dion at linutronix.de
Tue Jun 9 05:18:51 CEST 2020


Tests now run in their own process.  Their outputs are captured into a
temporary file.  When a test is done, its result is send back to the
main process, which will print the outputs of the former if and only
if it has failed or there was an error.  When all tests are done, the
main process will print a summary and exit with none zero if any test
failed.

* Test level

  Every test can be categorize by its level.  Current level are, from
  lowest to highest:

    - BASE
    - EXTEND
    - INITVM
    - FULL

  the higher the level, the more time it will take to complete the
  test run.

  Classes that derived from 'unittest.TestCase' can set the attribute
  'level'.  This attribute is inherited by all tests in a class.  If
  no level is defined in a test case, then the default level, 'BASE',
  is set.

* Parameterization of tests

  Test cases can parameterize their tests by defining the class
  attribute 'params' to an iterable.  By doing so, every tests in a
  test case that has defined the 'params' class attribute will be
  cloned has many time has there's parameters in the iterable.  Every
  clone is assigned a different value that can be retrieve with
  'self.params'.

  It's recommend to make a class that parameterizes its tests to
  inherit from 'ElbeTestCase' instead of 'unittest.TestCase'.  The
  former will be able to print the parameter of a test when string
  formatted or matching when filtering.

  For example:
  --------------------------------------------------------------------
  class MyTests(ElbeTestCase):

      params = [1, 2, 3]

      def test_foo(self):
          print("foo %d" % self.params)

      def test_bar(self):
          print("bar %d" % self.params)
  --------------------------------------------------------------------

  will result in 6 tests (3 parameters x 2 tests).  The output might
  be something like:
  --------------------------------------------------------------------
  foo 1
  foo 3
  bar 2
  bar 1
  foo 2
  bar 3
  --------------------------------------------------------------------

* Parallel testing

  As seen in the previous example, all tests are run in parallel and
  thus one can not assumes the order of execution.  Anyway, all tests
  are run in their own address space, and their outputs are redirect
  to a temporary file.

  When a test is finished, it puts into the results queue, which is
  shared between _all_ processes, its results and the name of its
  temporary file.  The main process then proceed to collect the
  results and will print the outputs of a test only if the latter has
  failed in some way.

* Test discovery

  Tests are discovered the same way as before.  The only difference
  here is that the loader's suite class is set to 'ElbeTestSuite'.
  This allows us to capture all tests.  From there, we can set the
  'level' attribute if there's none and we can clone tests that have
  the 'params' attribute set.

* Test filtering

  Tests can be filtered using their level or by matching their name.

  To filter tests by their level, the command line option '-l' or
  '--level' can be used to set the level threshold.  All tests with a
  level less or equal to that threshold will run.  The default
  threshold is 'BASE'.

  To filter tests by their name, the command line option '-f' or
  '--filter' can be used to filter tests based on an insensitive case
  regular expression.  The filtering can be inverse using the '-i' or
  '--invert' flag.  The default regular expression for filtering is
  '.*', which match anything.

* Dry run

  If one needs to test their filtering rules before running tests, the
  '-d' or '--dry-run' flag can be used to only print tests that
  would've run and exit.

Signed-off-by: Olivier Dion <dion at linutronix.de>
---
 elbepack/commands/test.py | 215 +++++++++++++++++++++++++++++++++++++-
 1 file changed, 212 insertions(+), 3 deletions(-)

diff --git a/elbepack/commands/test.py b/elbepack/commands/test.py
index c306b8f6..4caa32f5 100644
--- a/elbepack/commands/test.py
+++ b/elbepack/commands/test.py
@@ -5,12 +5,221 @@
 
 # elbepack/commands/test.py - Elbe unit test wrapper
 
+import copy
+import enum
+import multiprocessing
+import optparse
 import os
+import re
+import tempfile
+import unittest
 
-from elbepack.shellhelper import system
+TMP_PREFIX = "elbe-test-"
+
+class ElbeTestLevel(enum.IntEnum):
+    BASE   = enum.auto()
+    EXTEND = enum.auto()
+    INITVM = enum.auto()
+    FULL   = enum.auto()
+
+# TODO:py3 - Remove useless object inheritance
+# pylint: disable=useless-object-inheritance
+class ElbeTestResult(object):
+
+    """Simple wrapper around unittest.TestReport
+
+    Some object of the latter can not be serialized.  This wrapper
+    fake a TestReport that can be used by the parent process and it
+    can be passed with a pipe by its children.
+    """
+
+    def __init__(self, unittest_result):
+
+        # These should all be of length of 0 or 1
+        self.errors   = [(str(x), y) for x, y in unittest_result.errors]
+        self.failures = [(str(x), y) for x, y in unittest_result.failures]
+        assert len(self.errors) + len(self.failures) < 2
+
+        # This should always be 1
+        self.testsRun = unittest_result.testsRun
+        assert self.testsRun == 1
+
+class ElbeTestCase(unittest.TestCase):
+
+    def __str__(self):
+        name = super(ElbeTestCase, self).__str__()
+        if hasattr(self, "params"):
+            return "%s : params=%s" % (name, getattr(self, "params"))
+        return name
+
+# TODO:py3 - Remove useless object inheritance
+# pylint: disable=useless-object-inheritance
+class ElbeTestSuite(object):
+
+    # This must be a list not a set!!!
+    tests  = []
+
+    def __init__(self, tests):
+
+        for test in tests:
+
+            if isinstance(test, ElbeTestSuite):
+                continue
+
+            # Compatibility with doctests
+            if not hasattr(test, "level"):
+                setattr(test, "level", ElbeTestLevel.BASE)
+
+            if not hasattr(test, "params"):
+                self.tests.append(test)
+                continue
+
+            for param in test.params:
+                clone        = copy.deepcopy(test)
+                clone.params = param
+                self.tests.append(clone)
+
+    def __iter__(self):
+        for test in self.tests:
+            yield test
+
+    def filter_test(self, level, regex, invert):
+
+        rc = re.compile(regex, re.IGNORECASE)
+
+        elected  = []
+
+        for test in self.tests:
+
+            if test.level > level:
+                continue
+
+            if (rc.search(str(test)) is None) ^ invert:
+                continue
+
+            elected.append(test)
+
+        self.tests = elected
+
+    def ls(self):
+        for test in self:
+            print("Running %s" % test)
+
+# This function is execute in the children's context
+def do_test(test, Q):
+
+    # Capture everything
+    out = tempfile.NamedTemporaryFile(delete=False, prefix=TMP_PREFIX)
+    os.dup2(out.fileno(), os.sys.stdout.fileno())
+    os.dup2(out.fileno(), os.sys.stderr.fileno())
+
+    result        = unittest.TestResult()
+    result.buffer = False
+
+    try:
+        test.run(result)
+    finally:
+        Q.put((ElbeTestResult(result), out.name))
 
 def run_command(argv):
+
+    # pylint: disable=too-many-locals
+
     this_dir = os.path.dirname(os.path.realpath(__file__))
     top_dir  = os.path.join(this_dir, "..", "..")
-    system("python -m unittest discover --start-directory '%s' %s" %
-           (top_dir, " ".join(argv)), allow_fail=True)
+
+    oparser = optparse.OptionParser(usage="usage: %prog [options]")
+
+    oparser.add_option("-f", "--filter", dest="filter",
+                       metavar="REGEX", type="string", default=".*",
+                       help="Run specific test according to a filter rule")
+
+    oparser.add_option("-l", "--level", dest="level",
+                       type="string", default="BASE",
+                       help="Set test level threshold")
+
+    oparser.add_option("-i", "--invert", dest="invert",
+                      action="store_true", default=False,
+                      help="Invert the matching of --filter")
+
+    oparser.add_option("-d", "--dry-run", dest="dry_run",
+                       action="store_true", default=False,
+                       help="List tests that would have been executed and exit")
+
+    (opt, _) = oparser.parse_args(argv)
+
+    # Set test threshold
+    if opt.level not in ElbeTestLevel.__members__:
+        print("Invalid level value '%s'. Valid values are: %s" %
+              (opt.level, ", ".join(key for key in ElbeTestLevel.__members__)))
+        os.sys.exit(20)
+
+    # Find all tests
+    loader            = unittest.defaultTestLoader
+    loader.suiteClass = ElbeTestSuite
+    suite             = loader.discover(top_dir)
+
+    # then filter them
+    suite.filter_test(ElbeTestLevel[opt.level], opt.filter, opt.invert)
+
+    # print them
+    suite.ls()
+
+    # Dry run? Just exit gently
+    if opt.dry_run:
+        print("SUMMARY: This was a dry run. No tests were executed")
+        os.sys.exit(0)
+
+    # Result queue shared between the parent and its children
+    results = multiprocessing.Queue()
+
+    # Number of tests to run
+    cnt = 0
+
+    # Start all tests in their seperate process
+    for test in suite:
+        p = multiprocessing.Process(target=do_test,
+                                    args=(test, results))
+        p.start()
+        cnt += 1
+
+    def print_result(out, result, ctx):
+
+        with open(out, "r") as f:
+
+            header = "%s %s" % (ctx, result[0])
+            body   = result[1]
+
+            print("\n" + "=" * len(header))
+            print(header)
+            print("-" * len(header))
+            print(f.read(), end="")
+            print(body, end="")
+            print("-" * len(header))
+
+    run_cnt  = 0
+    err_cnt  = 0
+    fail_cnt = 0
+
+    # Wait for tests to complete
+    while cnt:
+
+        result, out = results.get()
+
+        run_cnt  += result.testsRun
+        err_cnt  += len(result.errors)
+        fail_cnt += len(result.failures)
+
+        if result.errors:
+            print_result(out, result.errors[0], "ERROR")
+        if result.failures:
+            print_result(out, result.failures[0], "FAIL")
+
+        os.remove(out)
+
+        cnt -= 1
+
+    print("SUMMARRY: Ran %d tests, %d errors and %d failures" %
+          (run_cnt, err_cnt, fail_cnt))
+
+    os.sys.exit(err_cnt | fail_cnt)
-- 
2.27.0




More information about the elbe-devel mailing list