[PATCH v2 1/3] commands: test: Extended testing

Olivier Dion dion at linutronix.de
Mon Jun 22 19:15:30 CEST 2020

* Test level

  Every test can be categorize by their level.  Current level are,
  from lowest to highest:

    - BASE
    - EXTEND
    - INITVM
    - FULL

  the higher the level, the more time it will take to complete the
  test run.

  Test cases can use the 'unittest.skipIf' decorator to skip tests
  that don't respect a criteria based on 'ElbeTestCase.level'.

* Parameterization of tests

  Test cases can parameterize their tests by defining the class
  attribute 'params' to an iterable.  By doing so, every tests in a
  test case that has defined the 'params' class attribute will be
  cloned as many time as there's parameters in the iterable.  Every
  clone is assigned a different value that can be retrieved with

  It's recommend to make a class that parameterizes its tests to
  inherit from 'ElbeTestCase' instead of 'unittest.TestCase'.  The
  former will be able to print the parameter of a test when string
  formatted or matching when filtering.

  For example:
  class MyTests(ElbeTestCase):

      params = [1, 2, 3]

      def test_foo(self):
          print("foo %d" % self.params)

      def test_bar(self):
          print("bar %d" % self.params)

  will result in 6 tests (3 parameters x 2 tests).  The output might
  be something like:
  foo 1
  foo 3
  bar 2
  bar 1
  foo 2
  bar 3

* Test discovery

  Tests are discovered the same way as before.  The only difference
  here is that the loader's suite class is set to 'ElbeTestSuite'.
  This allows us to capture all tests.  From there, we can clone tests
  that have the 'params' attribute set.

* Test filtering

  Tests can be filtered by matching their name or by using the
  parallel option.

  To filter tests by their name, the command line option '-f' or
  '--filter' can be used to filter tests based on an insensitive case
  regular expression.  The filtering can be inverse using the '-i' or
  '--invert' flag.  The default regular expression for filtering is
  '.*', which match anything.

  To filter tests with parallel, use the command line option '-p' or
  '--parallel'.  The parallel expression should be the node ID
  followed by a comma followed by the module expression to used.  For
  ./elbe test -p 7,10
  will run only test that match the predicate: (TEST_ID % 10 == 7).
  This is useful for launching parallel tests from outside Elbe and is
  intended to be use for automatic testing, not manual testing.

  Because tests don't have ID, and because test discover doesn't
  guarante order, the list of tests is sorted by the test's name.
  Thus, the test's ID are equal to their index in the list of tests
  after sorted.

* Dry run

  If one needs to test their filtering rules before running tests, the
  '-d' or '--dry-run' flag can be used to only print tests that
  would've run and exit.

Signed-off-by: Olivier Dion <dion at linutronix.de>
 elbepack/commands/test.py | 182 +++++++++++++++++++++++++++++++++++++-
 1 file changed, 179 insertions(+), 3 deletions(-)

diff --git a/elbepack/commands/test.py b/elbepack/commands/test.py
index dca3be72..761d457e 100644
--- a/elbepack/commands/test.py
+++ b/elbepack/commands/test.py
@@ -5,12 +5,188 @@
 # elbepack/commands/test.py - Elbe unit test wrapper
+import copy
+import enum
+import optparse
 import os
+import re
+import unittest
-from elbepack.shellhelper import system
+import junit_xml as junit
+class ElbeTestLevel(enum.IntEnum):
+    BASE   = enum.auto()
+    EXTEND = enum.auto()
+    INITVM = enum.auto()
+    FULL   = enum.auto()
+class ElbeTestCase(unittest.TestCase):
+    level = ElbeTestLevel.BASE
+    def __str__(self):
+        name = super(ElbeTestCase, self).__str__()
+        if hasattr(self, "params"):
+            return "%s : params=%s" % (name, getattr(self, "params"))
+        return name
+# TODO:py3 - Remove useless object inheritance
+# pylint: disable=useless-object-inheritance
+class ElbeTestSuite(object):
+    # This must be a list not a set!!!
+    tests  = []
+    def __init__(self, tests):
+        for test in tests:
+            if isinstance(test, ElbeTestSuite):
+                continue
+            if not hasattr(test, "params"):
+                self.tests.append(test)
+                continue
+            for param in test.params:
+                clone        = copy.deepcopy(test)
+                clone.params = param
+                self.tests.append(clone)
+    def __iter__(self):
+        for test in self.tests:
+            yield test
+    def filter_test(self, parallel, regex, invert):
+        node_id, N = parallel.split(',')
+        node_id = int(node_id)
+        N       = int(N)
+        elected = []
+        rc = re.compile(regex, re.IGNORECASE)
+        self.tests.sort(key=lambda x: str(x))
+        # Tests filtered here are skipped quietly
+        i = 0
+        for test in self.tests:
+            skip = False
+            if i % N != node_id:
+                skip = True
+            if not skip and ((rc.search(str(test)) is None) ^ invert):
+                skip = True
+            if not skip:
+                elected.append(test)
+            i += 1
+        self.tests = elected
+    def ls(self):
+        for test in self:
+            print(test)
 def run_command(argv):
+    # pylint: disable=too-many-locals
     this_dir = os.path.dirname(os.path.realpath(__file__))
     top_dir  = os.path.join(this_dir, "..", "..")
-    system("python3 -m unittest discover --start-directory '%s' %s" %
-           (top_dir, " ".join(argv)), allow_fail=True)
+    oparser = optparse.OptionParser(usage="usage: %prog [options]")
+    oparser.add_option("-f", "--filter", dest="filter",
+                       metavar="REGEX", type="string", default=".*",
+                       help="Run specific test according to a filter rule")
+    oparser.add_option("-l", "--level", dest="level",
+                       type="string", default="BASE",
+                       help="Set test level threshold")
+    oparser.add_option("-i", "--invert", dest="invert_re",
+                      action="store_true", default=False,
+                      help="Invert the matching of --filter")
+    oparser.add_option("-d", "--dry-run", dest="dry_run",
+                       action="store_true", default=False,
+                       help="List tests that would have been executed and exit")
+    oparser.add_option("-p", "--parallel", dest="parallel",
+                       type="string", default="0,1",
+                       help="Run every thest where test_ID % N == node_ID")
+    oparser.add_option("-o", "--output", dest="output",
+                       type="string", default=None,
+                       help="Write XML output to file")
+    (opt, _) = oparser.parse_args(argv)
+    # Set test level threshold
+    if opt.level not in ElbeTestLevel.__members__:
+        print("Invalid level value '%s'. Valid values are: %s" %
+              (opt.level, ", ".join(key for key in ElbeTestLevel.__members__)))
+        os.sys.exit(20)
+    ElbeTestCase.level = ElbeTestLevel[opt.level]
+    # Find all tests
+    loader            = unittest.defaultTestLoader
+    loader.suiteClass = ElbeTestSuite
+    suite             = loader.discover(top_dir)
+    # then filter them
+    suite.filter_test(opt.parallel, opt.filter, opt.invert_re)
+    # print them
+    suite.ls()
+    # Dry run? Just exit gently
+    if opt.dry_run:
+        print("This was a dry run. No tests were executed")
+        os.sys.exit(0)
+    cases = []
+    err_cnt  = 0
+    fail_cnt = 0
+    for test in suite:
+        result = unittest.TestResult()
+        test.run(result)
+        case = junit.TestCase(name=str(test))
+        for error in result.errors:
+            case.add_error_info(message=error[1])
+            err_cnt += 1
+        for failure in result.failures:
+            case.add_failure_info(message=failure[1])
+            fail_cnt += 1
+        for skip in result.skipped:
+            case.add_skipped_info(message=skip[1])
+        cases.append(case)
+    ts = junit.TestSuite(name="test", test_cases=cases)
+    results = junit.to_xml_report_string([ts], encoding="utf-8")
+    if opt.output is None:
+        print(results)
+    else:
+        with open(opt.output, "w") as f:
+            f.write(results)
+    os.sys.exit(err_cnt | fail_cnt)

