diff options
author | Jan200101 <sentrycraft123@gmail.com> | 2024-06-12 23:03:17 +0200 |
---|---|---|
committer | Jan200101 <sentrycraft123@gmail.com> | 2024-06-12 23:04:02 +0200 |
commit | 4ccc9dfd8e38348d527d3704b87a680ba43756cd (patch) | |
tree | 2d72166c60df258cebe6160b5943624dceb8855d /SOURCES/filtermods.py | |
parent | d39b424f868921cd22fbfac392912a911c72bcf2 (diff) | |
download | kernel-fsync-4ccc9dfd8e38348d527d3704b87a680ba43756cd.tar.gz kernel-fsync-4ccc9dfd8e38348d527d3704b87a680ba43756cd.zip |
kernel 6.9.4
Diffstat (limited to 'SOURCES/filtermods.py')
-rwxr-xr-x | SOURCES/filtermods.py | 1099 |
1 files changed, 1099 insertions, 0 deletions
diff --git a/SOURCES/filtermods.py b/SOURCES/filtermods.py new file mode 100755 index 0000000..9d7e33d --- /dev/null +++ b/SOURCES/filtermods.py @@ -0,0 +1,1099 @@ +#!/usr/bin/env python3 +""" +filter kmods into groups for packaging, see filtermods.adoc +""" + +import argparse +import os +import re +import subprocess +import sys +import yaml +import unittest + +from logging import getLogger, DEBUG, INFO, WARN, ERROR, CRITICAL, NOTSET, FileHandler, StreamHandler, Formatter, Logger +from typing import Optional + +log = getLogger('filtermods') + + +def get_td(filename): + script_dir = os.path.dirname(os.path.realpath(__file__)) + return os.path.join(script_dir, 'filtermods-testdata', filename) + + +def run_command(cmd, cwddir=None): + p = subprocess.Popen(cmd, cwd=cwddir, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True) + out, err = p.communicate() + out_str = out.decode('utf-8') + err_str = err.decode('utf-8') + return p.returncode, out_str, err_str + + +def safe_run_command(cmd, cwddir=None): + log.info('%s', cmd) + retcode, out, err = run_command(cmd, cwddir) + if retcode != 0: + log.warning('Command failed: %s, ret_code: %d', cmd, retcode) + log.warning(out) + log.warning(err) + raise Exception(err) + log.info(' ^^[OK]') + return retcode, out, err + + +def setup_logging(log_filename, stdout_log_level): + log_format = '%(asctime)s %(levelname)7.7s %(funcName)20.20s:%(lineno)4s %(message)s' + log = getLogger('filtermods') + log.setLevel(DEBUG) + + handler = StreamHandler(sys.stdout) + formatter = Formatter(log_format, '%H:%M:%S') + handler.setFormatter(formatter) + handler.setLevel(stdout_log_level) + log.addHandler(handler) + log.debug('stdout logging on') + + if log_filename: + file_handler = FileHandler(log_filename, 'w') + file_handler.setFormatter(formatter) + file_handler.setLevel(DEBUG) + log.addHandler(file_handler) + log.info('file logging on: %s', log_filename) + + return log + + +def canon_modname(kmod_pathname: str) -> str: + name = os.path.basename(kmod_pathname) + if name.endswith('.xz'): + name = name[:-3] + return name + + +class HierarchyObject: + def __init__(self): + self.depends_on = set() + + +def get_topo_order(obj_list: list[HierarchyObject], func_get_linked_objs=lambda x: x.depends_on) -> list[HierarchyObject]: + topo_order = [] + objs_to_sort = set(obj_list) + objs_sorted = set() + + while len(objs_to_sort) > 0: + no_deps = set() + for obj in objs_to_sort: + linked = func_get_linked_objs(obj) + if not linked: + no_deps.add(obj) + else: + all_deps_sorted = True + for dep in linked: + if dep not in objs_sorted: + all_deps_sorted = False + break + if all_deps_sorted: + no_deps.add(obj) + + for obj in no_deps: + topo_order.append(obj) + objs_sorted.add(obj) + objs_to_sort.remove(obj) + + return topo_order + + +class KMod(HierarchyObject): + def __init__(self, kmod_pathname: str) -> None: + super(KMod, self).__init__() + self.name: str = canon_modname(kmod_pathname) + self.kmod_pathname: str = kmod_pathname + self.is_dependency_for: set[KMod] = set() + self.assigned_to_pkg: Optional[KModPackage] = None + self.preferred_pkg: Optional[KModPackage] = None + self.rule_specifity: int = 0 + self.allowed_list: Optional[set[KModPackage]] = None + self.err = 0 + + def __str__(self): + depends_on = '' + for kmod in self.depends_on: + depends_on = depends_on + ' ' + kmod.name + return '%s {%s}' % (self.name, depends_on) + + +class KModList(): + def __init__(self) -> None: + self.name_to_kmod_map: dict[str, KMod] = {} + self.topo_order: Optional[list[KMod]] = None + + def get(self, kmod_pathname, create_if_missing=False): + kmod_name = canon_modname(kmod_pathname) + if kmod_name in self.name_to_kmod_map: + return self.name_to_kmod_map[kmod_name] + if not create_if_missing: + return None + + kmod = KMod(kmod_pathname) + # log.debug('Adding kmod %s (%s) to list', kmod.name, kmod.kmod_pathname) + if kmod.kmod_pathname != kmod_pathname: + raise Exception('Already have %s, but path changed? %s', kmod_name, kmod_pathname) + if not kmod.name: + raise Exception('Each kmod needs a name') + self.name_to_kmod_map[kmod_name] = kmod + return kmod + + def process_depmod_line(self, line): + tmp = line.split(':') + if len(tmp) != 2: + raise Exception('Depmod line has unexpected format: %s', line) + kmod_pathname = tmp[0].strip() + dependencies_pathnames = tmp[1].strip() + kmod = self.get(kmod_pathname, create_if_missing=True) + + if dependencies_pathnames: + for dep_pathname in dependencies_pathnames.split(' '): + dep_kmod = self.get(dep_pathname, create_if_missing=True) + kmod.depends_on.add(dep_kmod) + dep_kmod.is_dependency_for.add(kmod) + + def load_depmod_file(self, filepath): + with open(filepath) as f: + lines = f.readlines() + for line in lines: + if not line or line.startswith('#'): + continue + self.process_depmod_line(line) + log.info('depmod %s loaded, number of kmods: %s', filepath, len(self.name_to_kmod_map)) + + def dump(self): + for kmod in self.name_to_kmod_map.values(): + print(kmod) + + def get_topo_order(self): + if self.topo_order is None: + self.topo_order = get_topo_order(self.name_to_kmod_map.values()) + # TODO: what if we add something after? + return self.topo_order + + def get_alphabetical_order(self): + kmods = list(self.name_to_kmod_map.values()) + kmods.sort(key=lambda k: k.kmod_pathname) + return kmods + + def load_kmods_from_dir(self, topdir): + ret = [] + for root, dirs, files in os.walk(topdir): + for filename in files: + if filename.endswith('.xz'): + filename = filename[:-3] + if filename.endswith('.ko'): + kmod_pathname = os.path.join(root, filename) + ret.append(kmod_pathname) + + return ret + + def check_depmod_has_all_kmods(self, dirpath): + ret = self.load_kmods_from_dir(dirpath) + for kmod_pathname in ret: + kmod = self.get(kmod_pathname) + if not kmod: + raise Exception('Could not find kmod %s in depmod', kmod_pathname) + log.debug('OK: all (%s) kmods from %s are known', len(ret), dirpath) + + +class KModPackage(HierarchyObject): + def _get_depends_on(pkg): + return pkg.depends_on + + def _get_deps_for(pkg): + return pkg.is_dependency_for + + def __init__(self, name: str, depends_on=[]) -> None: + self.name: str = name + self.depends_on: set[KModPackage] = set(depends_on) + self.is_dependency_for: set[KModPackage] = set() + + for pkg in self.depends_on: + pkg.is_dependency_for.add(self) + self.all_depends_on_list: list[KModPackage] = self._get_all_linked(KModPackage._get_depends_on) + self.all_depends_on: set[KModPackage] = set(self.all_depends_on_list) + self.all_deps_for: Optional[set[KModPackage]] = None + self.default = False + log.debug('KModPackage created %s, depends_on: %s', name, [pkg.name for pkg in depends_on]) + + def __repr__(self): + return self.name + + def get_all_deps_for(self): + if self.all_deps_for is None: + self.all_deps_for = set(self._get_all_linked(KModPackage._get_deps_for)) + return self.all_deps_for + + def _get_all_linked(self, func_get_links): + ret = [] + explore = func_get_links(self) + + while len(explore) > 0: + new_explore = set() + for pkg in explore: + if pkg not in ret: + ret.append(pkg) + for dep in func_get_links(pkg): + new_explore.add(dep) + explore = new_explore + return ret + + +class KModPackageList(HierarchyObject): + def __init__(self) -> None: + self.name_to_obj: dict[str, KModPackage] = {} + self.kmod_pkg_list: list[KModPackage] = [] + self.rules: list[tuple[str, str, str]] = [] + + def get(self, pkgname): + if pkgname in self.name_to_obj: + return self.name_to_obj[pkgname] + return None + + def add_kmod_pkg(self, pkg): + self.name_to_obj[pkg.name] = pkg + self.kmod_pkg_list.append(pkg) + + def __iter__(self): + return iter(self.kmod_pkg_list) + + +def get_kmods_matching_re(kmod_list: KModList, param_re: str) -> list[KMod]: + ret = [] + # first subdir can be anything - this is because during build everything + # goes to kernel, but subpackages can move it (e.g. to extra) + param_re = '[^/]+/' + param_re + pattern = re.compile(param_re) + + for kmod in kmod_list.get_topo_order(): + m = pattern.match(kmod.kmod_pathname) + if m: + ret.append(kmod) + return ret + + +def walk_kmod_chain(kmod, myfunc): + visited = set() + + def visit_kmod(kmod, parent_kmod, func_to_call): + func_to_call(kmod, parent_kmod) + visited.add(kmod) + for dep in kmod.depends_on: + if dep not in visited: + visit_kmod(dep, kmod, func_to_call) + + visit_kmod(kmod, None, myfunc) + return visited + + +# is pkg a parent to any pkg from "alist" +def is_pkg_parent_to_any(pkg: KModPackage, alist: set[KModPackage]) -> bool: + if pkg in alist: + return True + + for some_pkg in alist: + if some_pkg in pkg.all_depends_on: + return True + return False + + +# is pkg a child to any pkg from "alist" +def is_pkg_child_to_any(pkg: KModPackage, alist: set[KModPackage]) -> bool: + if pkg in alist: + return True + + for some_pkg in alist: + if pkg in some_pkg.all_depends_on: + return True + return False + + +def update_allowed(kmod: KMod, visited: set[KMod], update_linked: bool = False) -> int: + num_updated = 0 + init = False + to_remove = set() + + if kmod in visited: + return num_updated + visited.add(kmod) + + # if we have nothing, try to initialise based on parents and children + if kmod.allowed_list is None: + init_allowed_list: set[KModPackage] = set() + + # init from children + for kmod_dep in kmod.depends_on: + if kmod_dep.allowed_list: + init_allowed_list.update(kmod_dep.allowed_list) + init = True + + if init: + # also add any pkgs that pkgs from list could depend on + deps_for = set() + for pkg in init_allowed_list: + deps_for.update(pkg.get_all_deps_for()) + init_allowed_list.update(deps_for) + + # init from parents + if not init: + for kmod_par in kmod.is_dependency_for: + if kmod_par.allowed_list: + init_allowed_list.update(kmod_par.allowed_list) + # also add any pkgs that depend on pkgs from list + for pkg in kmod_par.allowed_list: + init_allowed_list.update(pkg.all_depends_on) + init = True + + if init: + kmod.allowed_list = init_allowed_list + log.debug('%s: init to %s', kmod.name, [x.name for x in kmod.allowed_list]) + + kmod_allowed_list = kmod.allowed_list or set() + # log.debug('%s: update to %s', kmod.name, [x.name for x in kmod_allowed_list]) + + # each allowed is parent to at least one child allowed [for _all_ children] + for pkg in kmod_allowed_list: + for kmod_dep in kmod.depends_on: + if kmod_dep.allowed_list is None or kmod_dep.err: + continue + if not is_pkg_parent_to_any(pkg, kmod_dep.allowed_list): + to_remove.add(pkg) + log.debug('%s: remove %s from allowed, child: %s [%s]', + kmod.name, [pkg.name], kmod_dep.name, [x.name for x in kmod_dep.allowed_list]) + + # each allowed is child to at least one parent allowed [for _all_ parents] + for pkg in kmod_allowed_list: + for kmod_par in kmod.is_dependency_for: + if kmod_par.allowed_list is None or kmod_par.err: + continue + + if not is_pkg_child_to_any(pkg, kmod_par.allowed_list): + to_remove.add(pkg) + log.debug('%s: remove %s from allowed, parent: %s %s', + kmod.name, [pkg.name], kmod_par.name, [x.name for x in kmod_par.allowed_list]) + + for pkg in to_remove: + kmod_allowed_list.remove(pkg) + num_updated = num_updated + 1 + if len(kmod_allowed_list) == 0: + log.error('%s: cleared entire allow list', kmod.name) + kmod.err = 1 + + if init or to_remove or update_linked: + if to_remove: + log.debug('%s: updated to %s', kmod.name, [x.name for x in kmod_allowed_list]) + + for kmod_dep in kmod.depends_on: + num_updated = num_updated + update_allowed(kmod_dep, visited) + + for kmod_dep in kmod.is_dependency_for: + num_updated = num_updated + update_allowed(kmod_dep, visited) + + return num_updated + + +def apply_initial_labels(pkg_list: KModPackageList, kmod_list: KModList, treat_default_as_wants=False): + log.debug('') + for cur_rule in ['needs', 'wants', 'default']: + for package_name, rule_type, rule in pkg_list.rules: + pkg_obj = pkg_list.get(package_name) + + if not pkg_obj: + log.error('no package with name %s', package_name) + + if cur_rule != rule_type: + continue + + if rule_type == 'default' and treat_default_as_wants: + rule_type = 'wants' + + if 'needs' == rule_type: + # kmod_matching is already in topo_order + kmod_matching = get_kmods_matching_re(kmod_list, rule) + for kmod in kmod_matching: + if kmod.assigned_to_pkg and kmod.assigned_to_pkg != pkg_obj: + log.error('%s: can not be required by 2 pkgs %s %s', kmod.name, kmod.assigned_to_pkg, pkg_obj.name) + else: + kmod.assigned_to_pkg = pkg_obj + kmod.allowed_list = set([pkg_obj]) + kmod.rule_specifity = len(kmod_matching) + log.debug('%s: needed by %s', kmod.name, [pkg_obj.name]) + + if 'wants' == rule_type: + # kmod_matching is already in topo_order + kmod_matching = get_kmods_matching_re(kmod_list, rule) + for kmod in kmod_matching: + if kmod.allowed_list is None: + kmod.allowed_list = set(pkg_obj.all_depends_on) + kmod.allowed_list.add(pkg_obj) + kmod.preferred_pkg = pkg_obj + kmod.rule_specifity = len(kmod_matching) + log.debug('%s: wanted by %s, init allowed to %s', kmod.name, [pkg_obj.name], [pkg.name for pkg in kmod.allowed_list]) + else: + if kmod.assigned_to_pkg: + log.debug('%s: ignoring wants by %s, already assigned to %s', kmod.name, pkg_obj.name, kmod.assigned_to_pkg.name) + else: + # rule specifity may not be good idea, so just log it + # e.g. .*test.* may not be more specific than arch/x86/.* + log.debug('already have wants for %s %s, new rule: %s', kmod.name, kmod.preferred_pkg, rule) + + if 'default' == rule_type: + pkg_obj.default = True + + +def settle(kmod_list: KModList) -> None: + kmod_topo_order = list(kmod_list.get_topo_order()) + + for i in range(0, 25): + log.debug('settle start %s', i) + + ret = 0 + for kmod in kmod_topo_order: + visited: set[KMod] = set() + ret = ret + update_allowed(kmod, visited) + log.debug('settle %s updated nodes: %s', i, ret) + + if ret == 0: + break + + kmod_topo_order.reverse() + + +# phase 1 - propagate initial labels +def propagate_labels_1(pkg_list: KModPackageList, kmod_list: KModList): + log.info('') + settle(kmod_list) + + +def pick_closest_to_preffered(preferred_pkg: KModPackage, allowed_set: set[KModPackage]): + for child in preferred_pkg.all_depends_on_list: + if child in allowed_set: + return child + return None + + +# phase 2 - if some kmods allow more than one pkg, pick wanted package +def propagate_labels_2(pkg_list: KModPackageList, kmod_list: KModList): + log.info('') + ret = 0 + for kmod in kmod_list.get_topo_order(): + update_linked = False + + if kmod.allowed_list is None and kmod.preferred_pkg: + log.error('%s: has no allowed list but has preferred_pkg %s', kmod.name, kmod.preferred_pkg.name) + kmod.err = 1 + + if kmod.allowed_list and kmod.preferred_pkg: + chosen_pkg = None + if kmod.preferred_pkg in kmod.allowed_list: + chosen_pkg = kmod.preferred_pkg + else: + chosen_pkg = pick_closest_to_preffered(kmod.preferred_pkg, kmod.allowed_list) + + if chosen_pkg is not None: + kmod.allowed_list = set([chosen_pkg]) + log.debug('%s: making to prefer %s (preffered is %s), allowed: %s', kmod.name, chosen_pkg.name, + kmod.preferred_pkg.name, [pkg.name for pkg in kmod.allowed_list]) + update_linked = True + + visited: set[KMod] = set() + ret = ret + update_allowed(kmod, visited, update_linked) + + log.debug('updated nodes: %s', ret) + settle(kmod_list) + + +# Is this the best pick? ¯\_(ツ)_/¯ +def pick_topmost_allowed(allowed_set: set[KModPackage]) -> KModPackage: + topmost = next(iter(allowed_set)) + for pkg in allowed_set: + if len(pkg.all_depends_on) > len(topmost.all_depends_on): + topmost = pkg + + return topmost + + +# phase 3 - assign everything else that remained +def propagate_labels_3(pkg_list: KModPackageList, kmod_list: KModList): + log.info('') + ret = 0 + kmod_topo_order = list(kmod_list.get_topo_order()) + # do reverse topo order to cover children faster + kmod_topo_order.reverse() + + default_pkg = None + default_name = '' + for pkg_obj in pkg_list: + if pkg_obj.default: + if default_pkg: + log.error('Already have default pkg: %s / %s', default_pkg.name, pkg_obj.name) + else: + default_pkg = pkg_obj + default_name = default_pkg.name + + for kmod in kmod_topo_order: + update_linked = False + chosen_pkg = None + + if kmod.allowed_list is None: + if default_pkg: + chosen_pkg = default_pkg + else: + log.error('%s not assigned and there is no default', kmod.name) + elif len(kmod.allowed_list) > 1: + if default_pkg: + if default_pkg in kmod.allowed_list: + chosen_pkg = default_pkg + else: + chosen_pkg = pick_closest_to_preffered(default_pkg, kmod.allowed_list) + if chosen_pkg: + log.debug('closest is %s', chosen_pkg.name) + if not chosen_pkg: + # multiple pkgs are allowed, but none is preferred or default + chosen_pkg = pick_topmost_allowed(kmod.allowed_list) + log.debug('topmost is %s', chosen_pkg.name) + + if chosen_pkg: + kmod.allowed_list = set([chosen_pkg]) + log.debug('%s: making to prefer %s (default: %s)', kmod.name, [chosen_pkg.name], default_name) + update_linked = True + + visited: set[KMod] = set() + ret = ret + update_allowed(kmod, visited, update_linked) + + log.debug('updated nodes: %s', ret) + settle(kmod_list) + + +def load_config(config_pathname: str, kmod_list: KModList, variants=[]): + kmod_pkg_list = KModPackageList() + + with open(config_pathname, 'r') as file: + yobj = yaml.safe_load(file) + + for pkg_dict in yobj['packages']: + pkg_name = pkg_dict['name'] + depends_on = pkg_dict.get('depends-on', []) + if_variant_in = pkg_dict.get('if_variant_in') + + if if_variant_in is not None: + if not (set(variants) & set(if_variant_in)): + log.debug('Skipping %s for variants %s', pkg_name, variants) + continue + + pkg_dep_list = [] + for pkg_dep_name in depends_on: + pkg_dep = kmod_pkg_list.get(pkg_dep_name) + pkg_dep_list.append(pkg_dep) + + pkg_obj = kmod_pkg_list.get(pkg_name) + if not pkg_obj: + pkg_obj = KModPackage(pkg_name, pkg_dep_list) + kmod_pkg_list.add_kmod_pkg(pkg_obj) + else: + log.error('package %s already exists?', pkg_name) + + rules_list = yobj.get('rules', []) + for rule_dict in rules_list: + if_variant_in = rule_dict.get('if_variant_in') + exact_pkg = rule_dict.get('exact_pkg') + + for key, value in rule_dict.items(): + if key in ['if_variant_in', 'exact_pkg']: + continue + + if if_variant_in is not None: + if not (set(variants) & set(if_variant_in)): + continue + + rule = key + package_name = value + + if not kmod_pkg_list.get(package_name): + raise Exception('Unknown package ' + package_name) + + rule_type = 'wants' + if exact_pkg is True: + rule_type = 'needs' + elif key == 'default': + rule_type = 'default' + rule = '.*' + + log.debug('found rule: %s', (package_name, rule_type, rule)) + kmod_pkg_list.rules.append((package_name, rule_type, rule)) + + log.info('loaded config, rules: %s', len(kmod_pkg_list.rules)) + return kmod_pkg_list + + +def make_pictures(pkg_list: KModPackageList, kmod_list: KModList, filename: str, print_allowed=True): + f = open(filename + '.dot', 'w') + + f.write('digraph {\n') + f.write('node [style=filled fillcolor="#f8f8f8"]\n') + f.write(' subgraph kmods {\n') + f.write(' "Legend" [shape=note label="kmod name\\n{desired package}\\nresulting package(s)"]\n') + + for kmod in kmod_list.get_topo_order(): + pkg_name = '' + attr = '' + if kmod.assigned_to_pkg: + attr = 'fillcolor="#eddad5" color="#b22800"' + pkg_name = kmod.assigned_to_pkg.name + "!" + if kmod.preferred_pkg: + attr = 'fillcolor="#ddddf5" color="#b268fe"' + pkg_name = kmod.preferred_pkg.name + "?" + allowed = '' + if kmod.allowed_list and print_allowed: + allowed = '=' + ' '.join([pkg.name for pkg in kmod.allowed_list]) + f.write(' "%s" [label="%s\\n%s\\n%s" shape=box %s] \n' % (kmod.name, kmod.name, pkg_name, allowed, attr)) + + for kmod in kmod_list.get_topo_order(): + for kmod_dep in kmod.depends_on: + f.write(' "%s" -> "%s";\n' % (kmod.name, kmod_dep.name)) + f.write(' }\n') + + f.write(' subgraph packages {\n') + for pkg in pkg_list: + desc = '' + if pkg.default: + desc = '/default' + f.write(' "%s" [label="%s\\n%s"] \n' % (pkg.name, pkg.name, desc)) + for pkg_dep in pkg.depends_on: + f.write(' "%s" -> "%s";\n' % (pkg.name, pkg_dep.name)) + f.write(' }\n') + f.write('}\n') + + f.close() + + # safe_run_command('dot -Tpng -Gdpi=150 %s.dot > %s.png' % (filename, filename)) + safe_run_command('dot -Tsvg %s.dot > %s.svg' % (filename, filename)) + + +def sort_kmods(depmod_pathname: str, config_str: str, variants=[], do_pictures=''): + log.info('%s %s', depmod_pathname, config_str) + kmod_list = KModList() + kmod_list.load_depmod_file(depmod_pathname) + + pkg_list = load_config(config_str, kmod_list, variants) + + basename = os.path.splitext(config_str)[0] + + apply_initial_labels(pkg_list, kmod_list) + if '0' in do_pictures: + make_pictures(pkg_list, kmod_list, basename + "_0", print_allowed=False) + + try: + + propagate_labels_1(pkg_list, kmod_list) + if '1' in do_pictures: + make_pictures(pkg_list, kmod_list, basename + "_1") + propagate_labels_2(pkg_list, kmod_list) + propagate_labels_3(pkg_list, kmod_list) + finally: + if 'f' in do_pictures: + make_pictures(pkg_list, kmod_list, basename + "_f") + + return pkg_list, kmod_list + + +def abbrev_list_for_report(alist: list[KMod]) -> str: + tmp_str = [] + for kmod in alist[:2]: + if kmod.allowed_list: + tmp_str.append('%s(%s)' % (kmod.name, ' '.join([x.name for x in kmod.allowed_list]))) + ret = ', '.join(tmp_str) + other_len = len(alist[2:]) + if other_len > 0: + ret = ret + ' and %s other(s)' % (other_len) + return ret + + +def print_report(pkg_list: KModPackageList, kmod_list: KModList): + log.info('*'*26 + ' REPORT ' + '*'*26) + + kmods_err = 0 + kmods_moved = 0 + kmods_good = 0 + for kmod in kmod_list.get_topo_order(): + if not kmod.allowed_list: + log.error('%s: not assigned to any package! Please check the full log for details', kmod.name) + kmods_err = kmods_err + 1 + continue + + if len(kmod.allowed_list) > 1: + log.error('%s: assigned to more than one package! Please check the full log for details', kmod.name) + kmods_err = kmods_err + 1 + continue + + if not kmod.preferred_pkg: + # config doesn't care where it ended up + kmods_good = kmods_good + 1 + continue + + if kmod.preferred_pkg in kmod.allowed_list: + # it ended up where it needs to be + kmods_good = kmods_good + 1 + continue + + bad_parent_list = [] + for kmod_parent in kmod.is_dependency_for: + if not is_pkg_child_to_any(kmod.preferred_pkg, kmod_parent.allowed_list): + bad_parent_list.append(kmod_parent) + + bad_child_list = [] + for kmod_child in kmod.depends_on: + if not is_pkg_parent_to_any(kmod.preferred_pkg, kmod_child.allowed_list): + bad_child_list.append(kmod_parent) + + log.info('%s: wanted by %s but ended up in %s', kmod.name, [kmod.preferred_pkg.name], [pkg.name for pkg in kmod.allowed_list]) + if bad_parent_list: + log.info('\thas conflicting parent: %s', abbrev_list_for_report(bad_parent_list)) + if bad_child_list: + log.info('\thas conflicting children: %s', abbrev_list_for_report(bad_child_list)) + + kmods_moved = kmods_moved + 1 + + log.info('No. of kmod(s) assigned to preferred package: %s', kmods_good) + log.info('No. of kmod(s) moved to a related package: %s', kmods_moved) + log.info('No. of kmod(s) which could not be assigned: %s', kmods_err) + log.info('*'*60) + + return kmods_err + + +def write_modules_lists(path_prefix: str, pkg_list: KModPackageList, kmod_list: KModList): + kmod_list_alphabetical = sorted(kmod_list.get_topo_order(), key=lambda x: x.kmod_pathname) + for pkg in pkg_list: + output_path = os.path.join(path_prefix, pkg.name + '.list') + i = 0 + with open(output_path, "w") as file: + for kmod in kmod_list_alphabetical: + if kmod.allowed_list and pkg in kmod.allowed_list: + file.write(kmod.kmod_pathname) + file.write('\n') + i = i + 1 + log.info('Module list %s created with %s kmods', output_path, i) + + +class FiltermodTests(unittest.TestCase): + do_pictures = '' + + def setUp(self): + self.pkg_list = None + self.kmod_list = None + + def _is_kmod_pkg(self, kmodname, pkgnames): + self.assertIsNotNone(self.pkg_list) + self.assertIsNotNone(self.kmod_list) + + if type(pkgnames) is str: + pkgnames = [pkgnames] + + expected_pkgs = [] + for pkgname in pkgnames: + pkg = self.pkg_list.get(pkgname) + self.assertIsNotNone(pkg) + expected_pkgs.append(pkg) + + kmod = self.kmod_list.get(kmodname) + self.assertIsNotNone(kmod) + + if expected_pkgs: + self.assertTrue(len(kmod.allowed_list) == 1) + self.assertIn(next(iter(kmod.allowed_list)), expected_pkgs) + else: + self.assertEqual(kmod.allowed_list, set()) + + def test1a(self): + self.pkg_list, self.kmod_list = sort_kmods(get_td('test1.dep'), get_td('test1.yaml'), + do_pictures=FiltermodTests.do_pictures) + + self._is_kmod_pkg('kmod1', 'modules-core') + self._is_kmod_pkg('kmod2', 'modules-core') + self._is_kmod_pkg('kmod3', 'modules') + self._is_kmod_pkg('kmod4', 'modules') + + def test1b(self): + self.pkg_list, self.kmod_list = sort_kmods(get_td('test1.dep'), get_td('test1.yaml'), + do_pictures=FiltermodTests.do_pictures, + variants=['rt']) + + self.assertIsNotNone(self.pkg_list.get('rt-kvm')) + self._is_kmod_pkg('kmod1', 'modules-core') + self._is_kmod_pkg('kmod2', 'modules-core') + self._is_kmod_pkg('kmod3', 'modules') + self._is_kmod_pkg('kmod4', 'rt-kvm') + + def test2(self): + self.pkg_list, self.kmod_list = sort_kmods(get_td('test2.dep'), get_td('test2.yaml'), + do_pictures=FiltermodTests.do_pictures) + + self._is_kmod_pkg('kmod1', 'modules-extra') + self._is_kmod_pkg('kmod2', 'modules') + self._is_kmod_pkg('kmod3', 'modules-core') + self._is_kmod_pkg('kmod4', 'modules-core') + self._is_kmod_pkg('kmod5', 'modules-core') + self._is_kmod_pkg('kmod6', 'modules-extra') + self._is_kmod_pkg('kmod8', 'modules') + + def test3(self): + self.pkg_list, self.kmod_list = sort_kmods(get_td('test3.dep'), get_td('test3.yaml'), + do_pictures=FiltermodTests.do_pictures) + + self._is_kmod_pkg('kmod2', ['modules-core', 'modules']) + self._is_kmod_pkg('kmod4', ['modules-core', 'modules-extra']) + self._is_kmod_pkg('kmod5', 'modules-core') + self._is_kmod_pkg('kmod6', 'modules-core') + + def test4(self): + self.pkg_list, self.kmod_list = sort_kmods(get_td('test4.dep'), get_td('test4.yaml'), + do_pictures=FiltermodTests.do_pictures) + + self._is_kmod_pkg('kmod0', 'modules') + self._is_kmod_pkg('kmod1', 'modules') + self._is_kmod_pkg('kmod2', 'modules') + self._is_kmod_pkg('kmod3', 'modules') + self._is_kmod_pkg('kmod4', 'modules') + self._is_kmod_pkg('kmod5', 'modules') + self._is_kmod_pkg('kmod6', 'modules') + self._is_kmod_pkg('kmod7', 'modules-partner2') + self._is_kmod_pkg('kmod8', 'modules-partner') + self._is_kmod_pkg('kmod9', 'modules-partner') + + def _check_preffered_pkg(self, kmodname, pkgname): + kmod = self.kmod_list.get(kmodname) + self.assertIsNotNone(kmod) + self.assertEqual(kmod.preferred_pkg.name, pkgname) + + def test5(self): + self.pkg_list, self.kmod_list = sort_kmods(get_td('test5.dep'), get_td('test5.yaml'), + do_pictures=FiltermodTests.do_pictures) + + self._check_preffered_pkg('kmod2', 'modules') + self._check_preffered_pkg('kmod3', 'modules-partner') + self._check_preffered_pkg('kmod4', 'modules-partner') + + def test6(self): + self.pkg_list, self.kmod_list = sort_kmods(get_td('test6.dep'), get_td('test6.yaml'), + do_pictures=FiltermodTests.do_pictures) + + self._is_kmod_pkg('kmod2', 'modules-core') + self._is_kmod_pkg('kmod3', 'modules') + self._is_kmod_pkg('kmod4', 'modules') + self._is_kmod_pkg('kmod1', []) + + def test7(self): + self.pkg_list, self.kmod_list = sort_kmods(get_td('test7.dep'), get_td('test7.yaml'), + do_pictures=FiltermodTests.do_pictures) + + self._is_kmod_pkg('kmod1', 'modules-core') + self._is_kmod_pkg('kmod2', 'modules-core') + self._is_kmod_pkg('kmod3', 'modules-other') + self._is_kmod_pkg('kmod4', 'modules') + + +def do_rpm_mapping_test(config_pathname, kmod_rpms): + kmod_dict = {} + + def get_kmods_matching_re(pkgname, param_re): + matched = [] + param_re = '^kernel/' + param_re + pattern = re.compile(param_re) + + for kmod_pathname, kmod_rec in kmod_dict.items(): + m = pattern.match(kmod_pathname) + if m: + matched.append(kmod_pathname) + + return matched + + for kmod_rpm in kmod_rpms.split(): + filename = os.path.basename(kmod_rpm) + + m = re.match(r'.*-modules-([^-]+)', filename) + if not m: + raise Exception('Unrecognized rpm ' + kmod_rpm + ', expected a kernel-modules* rpm') + pkgname = 'modules-' + m.group(1) + m = re.match(r'modules-([0-9.]+)', pkgname) + if m: + pkgname = 'modules' + + tmpdir = os.path.join('tmp.filtermods', filename, pkgname) + if not os.path.exists(tmpdir): + log.info('creating tmp dir %s', tmpdir) + os.makedirs(tmpdir) + safe_run_command('rpm2cpio %s | cpio -id' % (os.path.abspath(kmod_rpm)), cwddir=tmpdir) + else: + log.info('using cached content of tmp dir: %s', tmpdir) + + for path, subdirs, files in os.walk(tmpdir): + for name in files: + ret = re.match(r'.*/'+pkgname+'/lib/modules/[^/]+/[^/]+/(.*)', os.path.join(path, name)) + if not ret: + continue + + kmod_pathname = 'kernel/' + ret.group(1) + if not kmod_pathname.endswith('.xz') and not kmod_pathname.endswith('.ko'): + continue + if kmod_pathname in kmod_dict: + if pkgname not in kmod_dict[kmod_pathname]['target_pkgs']: + kmod_dict[kmod_pathname]['target_pkgs'].append(pkgname) + else: + kmod_dict[kmod_pathname] = {} + kmod_dict[kmod_pathname]['target_pkgs'] = [pkgname] + kmod_dict[kmod_pathname]['pkg'] = None + kmod_dict[kmod_pathname]['matched'] = False + + kmod_pkg_list = load_config(config_pathname, None) + + for package_name, rule_type, rule in kmod_pkg_list.rules: + kmod_names = get_kmods_matching_re(package_name, rule) + + for kmod_pathname in kmod_names: + kmod_rec = kmod_dict[kmod_pathname] + + if not kmod_rec['matched']: + kmod_rec['matched'] = True + kmod_rec['pkg'] = package_name + for kmod_pathname, kmod_rec in kmod_dict.items(): + if kmod_rec['pkg'] not in kmod_rec['target_pkgs']: + log.warning('kmod %s wanted by config in %s, in tree it is: %s', kmod_pathname, [kmod_rec['pkg']], kmod_rec['target_pkgs']) + elif len(kmod_rec['target_pkgs']) > 1: + # if set(kmod_rec['target_pkgs']) != set(['modules', 'modules-core']): + log.warning('kmod %s multiple matches in tree: %s/%s', kmod_pathname, [kmod_rec['pkg']], kmod_rec['target_pkgs']) + + +def cmd_sort(options): + do_pictures = '' + if options.graphviz: + do_pictures = '0f' + + pkg_list, kmod_list = sort_kmods(options.depmod, options.config, + options.variants, do_pictures) + ret = print_report(pkg_list, kmod_list) + if options.output: + write_modules_lists(options.output, pkg_list, kmod_list) + + return ret + + +def cmd_print_rule_map(options): + kmod_list = KModList() + kmod_list.load_depmod_file(options.depmod) + pkg_list = load_config(options.config, kmod_list, options.variants) + apply_initial_labels(pkg_list, kmod_list, treat_default_as_wants=True) + + for kmod in kmod_list.get_alphabetical_order(): + print('%-20s %s' % (kmod.preferred_pkg, kmod.kmod_pathname)) + + +def cmd_selftest(options): + if options.graphviz: + FiltermodTests.do_pictures = '0f' + + for arg in ['selftest', '-g', '--graphviz']: + if arg in sys.argv: + sys.argv.remove(arg) + + unittest.main() + sys.exit(0) + + +def cmd_cmp2rpm(options): + do_rpm_mapping_test(options.config, options.kmod_rpms) + + +def main(): + global log + + parser = argparse.ArgumentParser() + parser.add_argument('-v', '--verbose', dest='verbose', + help='be more verbose', action='count', default=4) + parser.add_argument('-q', '--quiet', dest='quiet', + help='be more quiet', action='count', default=0) + parser.add_argument('-l', '--log-filename', dest='log_filename', + help='log filename', default='filtermods.log') + + subparsers = parser.add_subparsers(dest='cmd') + + def add_graphviz_arg(p): + p.add_argument('-g', '--graphviz', dest='graphviz', + help='generate graphviz visualizations', + action='store_true', default=False) + + def add_config_arg(p): + p.add_argument('-c', '--config', dest='config', required=True, + help='path to yaml config with rules') + + def add_depmod_arg(p): + p.add_argument('-d', '--depmod', dest='depmod', required=True, + help='path to modules.dep file') + + def add_output_arg(p): + p.add_argument('-o', '--output', dest='output', default=None, + help='output $module_name.list files to directory specified by this parameter') + + def add_variants_arg(p): + p.add_argument('-r', '--variants', dest='variants', action='append', default=[], + help='variants to enable in config') + + def add_kmod_rpms_arg(p): + p.add_argument('-k', '--kmod-rpms', dest='kmod_rpms', required=True, + help='compare content of specified rpm(s) against yaml config rules') + + parser_sort = subparsers.add_parser('sort', help='assign kmods specified by modules.dep using rules from yaml config') + add_config_arg(parser_sort) + add_depmod_arg(parser_sort) + add_output_arg(parser_sort) + add_variants_arg(parser_sort) + add_graphviz_arg(parser_sort) + + parser_rule_map = subparsers.add_parser('rulemap', help='print how yaml config maps to kmods') + add_config_arg(parser_rule_map) + add_depmod_arg(parser_rule_map) + add_variants_arg(parser_rule_map) + + parser_test = subparsers.add_parser('selftest', help='runs a self-test') + add_graphviz_arg(parser_test) + + parser_cmp2rpm = subparsers.add_parser('cmp2rpm', help='compare ruleset against RPM(s)') + add_config_arg(parser_cmp2rpm) + add_kmod_rpms_arg(parser_cmp2rpm) + + options = parser.parse_args() + + if options.cmd == "selftest": + options.verbose = options.verbose - 2 + options.verbose = max(options.verbose - options.quiet, 0) + levels = [NOTSET, CRITICAL, ERROR, WARN, INFO, DEBUG] + stdout_log_level = levels[min(options.verbose, len(levels) - 1)] + + log = setup_logging(options.log_filename, stdout_log_level) + + ret = 0 + if options.cmd == "sort": + ret = cmd_sort(options) + elif options.cmd == "rulemap": + cmd_print_rule_map(options) + elif options.cmd == "selftest": + cmd_selftest(options) + elif options.cmd == "cmp2rpm": + cmd_cmp2rpm(options) + else: + parser.print_help() + + return ret + + +if __name__ == '__main__': + # import profile + # profile.run('main()', sort=1) + sys.exit(main()) |