aboutsummaryrefslogtreecommitdiff
path: root/SOURCES/filtermods.py
diff options
context:
space:
mode:
authorJan200101 <sentrycraft123@gmail.com>2024-06-12 23:03:17 +0200
committerJan200101 <sentrycraft123@gmail.com>2024-06-12 23:04:02 +0200
commit4ccc9dfd8e38348d527d3704b87a680ba43756cd (patch)
tree2d72166c60df258cebe6160b5943624dceb8855d /SOURCES/filtermods.py
parentd39b424f868921cd22fbfac392912a911c72bcf2 (diff)
downloadkernel-fsync-4ccc9dfd8e38348d527d3704b87a680ba43756cd.tar.gz
kernel-fsync-4ccc9dfd8e38348d527d3704b87a680ba43756cd.zip
kernel 6.9.4
Diffstat (limited to 'SOURCES/filtermods.py')
-rwxr-xr-xSOURCES/filtermods.py1099
1 files changed, 1099 insertions, 0 deletions
diff --git a/SOURCES/filtermods.py b/SOURCES/filtermods.py
new file mode 100755
index 0000000..9d7e33d
--- /dev/null
+++ b/SOURCES/filtermods.py
@@ -0,0 +1,1099 @@
+#!/usr/bin/env python3
+"""
+filter kmods into groups for packaging, see filtermods.adoc
+"""
+
+import argparse
+import os
+import re
+import subprocess
+import sys
+import yaml
+import unittest
+
+from logging import getLogger, DEBUG, INFO, WARN, ERROR, CRITICAL, NOTSET, FileHandler, StreamHandler, Formatter, Logger
+from typing import Optional
+
+log = getLogger('filtermods')
+
+
+def get_td(filename):
+ script_dir = os.path.dirname(os.path.realpath(__file__))
+ return os.path.join(script_dir, 'filtermods-testdata', filename)
+
+
+def run_command(cmd, cwddir=None):
+ p = subprocess.Popen(cmd, cwd=cwddir, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True)
+ out, err = p.communicate()
+ out_str = out.decode('utf-8')
+ err_str = err.decode('utf-8')
+ return p.returncode, out_str, err_str
+
+
+def safe_run_command(cmd, cwddir=None):
+ log.info('%s', cmd)
+ retcode, out, err = run_command(cmd, cwddir)
+ if retcode != 0:
+ log.warning('Command failed: %s, ret_code: %d', cmd, retcode)
+ log.warning(out)
+ log.warning(err)
+ raise Exception(err)
+ log.info(' ^^[OK]')
+ return retcode, out, err
+
+
+def setup_logging(log_filename, stdout_log_level):
+ log_format = '%(asctime)s %(levelname)7.7s %(funcName)20.20s:%(lineno)4s %(message)s'
+ log = getLogger('filtermods')
+ log.setLevel(DEBUG)
+
+ handler = StreamHandler(sys.stdout)
+ formatter = Formatter(log_format, '%H:%M:%S')
+ handler.setFormatter(formatter)
+ handler.setLevel(stdout_log_level)
+ log.addHandler(handler)
+ log.debug('stdout logging on')
+
+ if log_filename:
+ file_handler = FileHandler(log_filename, 'w')
+ file_handler.setFormatter(formatter)
+ file_handler.setLevel(DEBUG)
+ log.addHandler(file_handler)
+ log.info('file logging on: %s', log_filename)
+
+ return log
+
+
+def canon_modname(kmod_pathname: str) -> str:
+ name = os.path.basename(kmod_pathname)
+ if name.endswith('.xz'):
+ name = name[:-3]
+ return name
+
+
+class HierarchyObject:
+ def __init__(self):
+ self.depends_on = set()
+
+
+def get_topo_order(obj_list: list[HierarchyObject], func_get_linked_objs=lambda x: x.depends_on) -> list[HierarchyObject]:
+ topo_order = []
+ objs_to_sort = set(obj_list)
+ objs_sorted = set()
+
+ while len(objs_to_sort) > 0:
+ no_deps = set()
+ for obj in objs_to_sort:
+ linked = func_get_linked_objs(obj)
+ if not linked:
+ no_deps.add(obj)
+ else:
+ all_deps_sorted = True
+ for dep in linked:
+ if dep not in objs_sorted:
+ all_deps_sorted = False
+ break
+ if all_deps_sorted:
+ no_deps.add(obj)
+
+ for obj in no_deps:
+ topo_order.append(obj)
+ objs_sorted.add(obj)
+ objs_to_sort.remove(obj)
+
+ return topo_order
+
+
+class KMod(HierarchyObject):
+ def __init__(self, kmod_pathname: str) -> None:
+ super(KMod, self).__init__()
+ self.name: str = canon_modname(kmod_pathname)
+ self.kmod_pathname: str = kmod_pathname
+ self.is_dependency_for: set[KMod] = set()
+ self.assigned_to_pkg: Optional[KModPackage] = None
+ self.preferred_pkg: Optional[KModPackage] = None
+ self.rule_specifity: int = 0
+ self.allowed_list: Optional[set[KModPackage]] = None
+ self.err = 0
+
+ def __str__(self):
+ depends_on = ''
+ for kmod in self.depends_on:
+ depends_on = depends_on + ' ' + kmod.name
+ return '%s {%s}' % (self.name, depends_on)
+
+
+class KModList():
+ def __init__(self) -> None:
+ self.name_to_kmod_map: dict[str, KMod] = {}
+ self.topo_order: Optional[list[KMod]] = None
+
+ def get(self, kmod_pathname, create_if_missing=False):
+ kmod_name = canon_modname(kmod_pathname)
+ if kmod_name in self.name_to_kmod_map:
+ return self.name_to_kmod_map[kmod_name]
+ if not create_if_missing:
+ return None
+
+ kmod = KMod(kmod_pathname)
+ # log.debug('Adding kmod %s (%s) to list', kmod.name, kmod.kmod_pathname)
+ if kmod.kmod_pathname != kmod_pathname:
+ raise Exception('Already have %s, but path changed? %s', kmod_name, kmod_pathname)
+ if not kmod.name:
+ raise Exception('Each kmod needs a name')
+ self.name_to_kmod_map[kmod_name] = kmod
+ return kmod
+
+ def process_depmod_line(self, line):
+ tmp = line.split(':')
+ if len(tmp) != 2:
+ raise Exception('Depmod line has unexpected format: %s', line)
+ kmod_pathname = tmp[0].strip()
+ dependencies_pathnames = tmp[1].strip()
+ kmod = self.get(kmod_pathname, create_if_missing=True)
+
+ if dependencies_pathnames:
+ for dep_pathname in dependencies_pathnames.split(' '):
+ dep_kmod = self.get(dep_pathname, create_if_missing=True)
+ kmod.depends_on.add(dep_kmod)
+ dep_kmod.is_dependency_for.add(kmod)
+
+ def load_depmod_file(self, filepath):
+ with open(filepath) as f:
+ lines = f.readlines()
+ for line in lines:
+ if not line or line.startswith('#'):
+ continue
+ self.process_depmod_line(line)
+ log.info('depmod %s loaded, number of kmods: %s', filepath, len(self.name_to_kmod_map))
+
+ def dump(self):
+ for kmod in self.name_to_kmod_map.values():
+ print(kmod)
+
+ def get_topo_order(self):
+ if self.topo_order is None:
+ self.topo_order = get_topo_order(self.name_to_kmod_map.values())
+ # TODO: what if we add something after?
+ return self.topo_order
+
+ def get_alphabetical_order(self):
+ kmods = list(self.name_to_kmod_map.values())
+ kmods.sort(key=lambda k: k.kmod_pathname)
+ return kmods
+
+ def load_kmods_from_dir(self, topdir):
+ ret = []
+ for root, dirs, files in os.walk(topdir):
+ for filename in files:
+ if filename.endswith('.xz'):
+ filename = filename[:-3]
+ if filename.endswith('.ko'):
+ kmod_pathname = os.path.join(root, filename)
+ ret.append(kmod_pathname)
+
+ return ret
+
+ def check_depmod_has_all_kmods(self, dirpath):
+ ret = self.load_kmods_from_dir(dirpath)
+ for kmod_pathname in ret:
+ kmod = self.get(kmod_pathname)
+ if not kmod:
+ raise Exception('Could not find kmod %s in depmod', kmod_pathname)
+ log.debug('OK: all (%s) kmods from %s are known', len(ret), dirpath)
+
+
+class KModPackage(HierarchyObject):
+ def _get_depends_on(pkg):
+ return pkg.depends_on
+
+ def _get_deps_for(pkg):
+ return pkg.is_dependency_for
+
+ def __init__(self, name: str, depends_on=[]) -> None:
+ self.name: str = name
+ self.depends_on: set[KModPackage] = set(depends_on)
+ self.is_dependency_for: set[KModPackage] = set()
+
+ for pkg in self.depends_on:
+ pkg.is_dependency_for.add(self)
+ self.all_depends_on_list: list[KModPackage] = self._get_all_linked(KModPackage._get_depends_on)
+ self.all_depends_on: set[KModPackage] = set(self.all_depends_on_list)
+ self.all_deps_for: Optional[set[KModPackage]] = None
+ self.default = False
+ log.debug('KModPackage created %s, depends_on: %s', name, [pkg.name for pkg in depends_on])
+
+ def __repr__(self):
+ return self.name
+
+ def get_all_deps_for(self):
+ if self.all_deps_for is None:
+ self.all_deps_for = set(self._get_all_linked(KModPackage._get_deps_for))
+ return self.all_deps_for
+
+ def _get_all_linked(self, func_get_links):
+ ret = []
+ explore = func_get_links(self)
+
+ while len(explore) > 0:
+ new_explore = set()
+ for pkg in explore:
+ if pkg not in ret:
+ ret.append(pkg)
+ for dep in func_get_links(pkg):
+ new_explore.add(dep)
+ explore = new_explore
+ return ret
+
+
+class KModPackageList(HierarchyObject):
+ def __init__(self) -> None:
+ self.name_to_obj: dict[str, KModPackage] = {}
+ self.kmod_pkg_list: list[KModPackage] = []
+ self.rules: list[tuple[str, str, str]] = []
+
+ def get(self, pkgname):
+ if pkgname in self.name_to_obj:
+ return self.name_to_obj[pkgname]
+ return None
+
+ def add_kmod_pkg(self, pkg):
+ self.name_to_obj[pkg.name] = pkg
+ self.kmod_pkg_list.append(pkg)
+
+ def __iter__(self):
+ return iter(self.kmod_pkg_list)
+
+
+def get_kmods_matching_re(kmod_list: KModList, param_re: str) -> list[KMod]:
+ ret = []
+ # first subdir can be anything - this is because during build everything
+ # goes to kernel, but subpackages can move it (e.g. to extra)
+ param_re = '[^/]+/' + param_re
+ pattern = re.compile(param_re)
+
+ for kmod in kmod_list.get_topo_order():
+ m = pattern.match(kmod.kmod_pathname)
+ if m:
+ ret.append(kmod)
+ return ret
+
+
+def walk_kmod_chain(kmod, myfunc):
+ visited = set()
+
+ def visit_kmod(kmod, parent_kmod, func_to_call):
+ func_to_call(kmod, parent_kmod)
+ visited.add(kmod)
+ for dep in kmod.depends_on:
+ if dep not in visited:
+ visit_kmod(dep, kmod, func_to_call)
+
+ visit_kmod(kmod, None, myfunc)
+ return visited
+
+
+# is pkg a parent to any pkg from "alist"
+def is_pkg_parent_to_any(pkg: KModPackage, alist: set[KModPackage]) -> bool:
+ if pkg in alist:
+ return True
+
+ for some_pkg in alist:
+ if some_pkg in pkg.all_depends_on:
+ return True
+ return False
+
+
+# is pkg a child to any pkg from "alist"
+def is_pkg_child_to_any(pkg: KModPackage, alist: set[KModPackage]) -> bool:
+ if pkg in alist:
+ return True
+
+ for some_pkg in alist:
+ if pkg in some_pkg.all_depends_on:
+ return True
+ return False
+
+
+def update_allowed(kmod: KMod, visited: set[KMod], update_linked: bool = False) -> int:
+ num_updated = 0
+ init = False
+ to_remove = set()
+
+ if kmod in visited:
+ return num_updated
+ visited.add(kmod)
+
+ # if we have nothing, try to initialise based on parents and children
+ if kmod.allowed_list is None:
+ init_allowed_list: set[KModPackage] = set()
+
+ # init from children
+ for kmod_dep in kmod.depends_on:
+ if kmod_dep.allowed_list:
+ init_allowed_list.update(kmod_dep.allowed_list)
+ init = True
+
+ if init:
+ # also add any pkgs that pkgs from list could depend on
+ deps_for = set()
+ for pkg in init_allowed_list:
+ deps_for.update(pkg.get_all_deps_for())
+ init_allowed_list.update(deps_for)
+
+ # init from parents
+ if not init:
+ for kmod_par in kmod.is_dependency_for:
+ if kmod_par.allowed_list:
+ init_allowed_list.update(kmod_par.allowed_list)
+ # also add any pkgs that depend on pkgs from list
+ for pkg in kmod_par.allowed_list:
+ init_allowed_list.update(pkg.all_depends_on)
+ init = True
+
+ if init:
+ kmod.allowed_list = init_allowed_list
+ log.debug('%s: init to %s', kmod.name, [x.name for x in kmod.allowed_list])
+
+ kmod_allowed_list = kmod.allowed_list or set()
+ # log.debug('%s: update to %s', kmod.name, [x.name for x in kmod_allowed_list])
+
+ # each allowed is parent to at least one child allowed [for _all_ children]
+ for pkg in kmod_allowed_list:
+ for kmod_dep in kmod.depends_on:
+ if kmod_dep.allowed_list is None or kmod_dep.err:
+ continue
+ if not is_pkg_parent_to_any(pkg, kmod_dep.allowed_list):
+ to_remove.add(pkg)
+ log.debug('%s: remove %s from allowed, child: %s [%s]',
+ kmod.name, [pkg.name], kmod_dep.name, [x.name for x in kmod_dep.allowed_list])
+
+ # each allowed is child to at least one parent allowed [for _all_ parents]
+ for pkg in kmod_allowed_list:
+ for kmod_par in kmod.is_dependency_for:
+ if kmod_par.allowed_list is None or kmod_par.err:
+ continue
+
+ if not is_pkg_child_to_any(pkg, kmod_par.allowed_list):
+ to_remove.add(pkg)
+ log.debug('%s: remove %s from allowed, parent: %s %s',
+ kmod.name, [pkg.name], kmod_par.name, [x.name for x in kmod_par.allowed_list])
+
+ for pkg in to_remove:
+ kmod_allowed_list.remove(pkg)
+ num_updated = num_updated + 1
+ if len(kmod_allowed_list) == 0:
+ log.error('%s: cleared entire allow list', kmod.name)
+ kmod.err = 1
+
+ if init or to_remove or update_linked:
+ if to_remove:
+ log.debug('%s: updated to %s', kmod.name, [x.name for x in kmod_allowed_list])
+
+ for kmod_dep in kmod.depends_on:
+ num_updated = num_updated + update_allowed(kmod_dep, visited)
+
+ for kmod_dep in kmod.is_dependency_for:
+ num_updated = num_updated + update_allowed(kmod_dep, visited)
+
+ return num_updated
+
+
+def apply_initial_labels(pkg_list: KModPackageList, kmod_list: KModList, treat_default_as_wants=False):
+ log.debug('')
+ for cur_rule in ['needs', 'wants', 'default']:
+ for package_name, rule_type, rule in pkg_list.rules:
+ pkg_obj = pkg_list.get(package_name)
+
+ if not pkg_obj:
+ log.error('no package with name %s', package_name)
+
+ if cur_rule != rule_type:
+ continue
+
+ if rule_type == 'default' and treat_default_as_wants:
+ rule_type = 'wants'
+
+ if 'needs' == rule_type:
+ # kmod_matching is already in topo_order
+ kmod_matching = get_kmods_matching_re(kmod_list, rule)
+ for kmod in kmod_matching:
+ if kmod.assigned_to_pkg and kmod.assigned_to_pkg != pkg_obj:
+ log.error('%s: can not be required by 2 pkgs %s %s', kmod.name, kmod.assigned_to_pkg, pkg_obj.name)
+ else:
+ kmod.assigned_to_pkg = pkg_obj
+ kmod.allowed_list = set([pkg_obj])
+ kmod.rule_specifity = len(kmod_matching)
+ log.debug('%s: needed by %s', kmod.name, [pkg_obj.name])
+
+ if 'wants' == rule_type:
+ # kmod_matching is already in topo_order
+ kmod_matching = get_kmods_matching_re(kmod_list, rule)
+ for kmod in kmod_matching:
+ if kmod.allowed_list is None:
+ kmod.allowed_list = set(pkg_obj.all_depends_on)
+ kmod.allowed_list.add(pkg_obj)
+ kmod.preferred_pkg = pkg_obj
+ kmod.rule_specifity = len(kmod_matching)
+ log.debug('%s: wanted by %s, init allowed to %s', kmod.name, [pkg_obj.name], [pkg.name for pkg in kmod.allowed_list])
+ else:
+ if kmod.assigned_to_pkg:
+ log.debug('%s: ignoring wants by %s, already assigned to %s', kmod.name, pkg_obj.name, kmod.assigned_to_pkg.name)
+ else:
+ # rule specifity may not be good idea, so just log it
+ # e.g. .*test.* may not be more specific than arch/x86/.*
+ log.debug('already have wants for %s %s, new rule: %s', kmod.name, kmod.preferred_pkg, rule)
+
+ if 'default' == rule_type:
+ pkg_obj.default = True
+
+
+def settle(kmod_list: KModList) -> None:
+ kmod_topo_order = list(kmod_list.get_topo_order())
+
+ for i in range(0, 25):
+ log.debug('settle start %s', i)
+
+ ret = 0
+ for kmod in kmod_topo_order:
+ visited: set[KMod] = set()
+ ret = ret + update_allowed(kmod, visited)
+ log.debug('settle %s updated nodes: %s', i, ret)
+
+ if ret == 0:
+ break
+
+ kmod_topo_order.reverse()
+
+
+# phase 1 - propagate initial labels
+def propagate_labels_1(pkg_list: KModPackageList, kmod_list: KModList):
+ log.info('')
+ settle(kmod_list)
+
+
+def pick_closest_to_preffered(preferred_pkg: KModPackage, allowed_set: set[KModPackage]):
+ for child in preferred_pkg.all_depends_on_list:
+ if child in allowed_set:
+ return child
+ return None
+
+
+# phase 2 - if some kmods allow more than one pkg, pick wanted package
+def propagate_labels_2(pkg_list: KModPackageList, kmod_list: KModList):
+ log.info('')
+ ret = 0
+ for kmod in kmod_list.get_topo_order():
+ update_linked = False
+
+ if kmod.allowed_list is None and kmod.preferred_pkg:
+ log.error('%s: has no allowed list but has preferred_pkg %s', kmod.name, kmod.preferred_pkg.name)
+ kmod.err = 1
+
+ if kmod.allowed_list and kmod.preferred_pkg:
+ chosen_pkg = None
+ if kmod.preferred_pkg in kmod.allowed_list:
+ chosen_pkg = kmod.preferred_pkg
+ else:
+ chosen_pkg = pick_closest_to_preffered(kmod.preferred_pkg, kmod.allowed_list)
+
+ if chosen_pkg is not None:
+ kmod.allowed_list = set([chosen_pkg])
+ log.debug('%s: making to prefer %s (preffered is %s), allowed: %s', kmod.name, chosen_pkg.name,
+ kmod.preferred_pkg.name, [pkg.name for pkg in kmod.allowed_list])
+ update_linked = True
+
+ visited: set[KMod] = set()
+ ret = ret + update_allowed(kmod, visited, update_linked)
+
+ log.debug('updated nodes: %s', ret)
+ settle(kmod_list)
+
+
+# Is this the best pick? ¯\_(ツ)_/¯
+def pick_topmost_allowed(allowed_set: set[KModPackage]) -> KModPackage:
+ topmost = next(iter(allowed_set))
+ for pkg in allowed_set:
+ if len(pkg.all_depends_on) > len(topmost.all_depends_on):
+ topmost = pkg
+
+ return topmost
+
+
+# phase 3 - assign everything else that remained
+def propagate_labels_3(pkg_list: KModPackageList, kmod_list: KModList):
+ log.info('')
+ ret = 0
+ kmod_topo_order = list(kmod_list.get_topo_order())
+ # do reverse topo order to cover children faster
+ kmod_topo_order.reverse()
+
+ default_pkg = None
+ default_name = ''
+ for pkg_obj in pkg_list:
+ if pkg_obj.default:
+ if default_pkg:
+ log.error('Already have default pkg: %s / %s', default_pkg.name, pkg_obj.name)
+ else:
+ default_pkg = pkg_obj
+ default_name = default_pkg.name
+
+ for kmod in kmod_topo_order:
+ update_linked = False
+ chosen_pkg = None
+
+ if kmod.allowed_list is None:
+ if default_pkg:
+ chosen_pkg = default_pkg
+ else:
+ log.error('%s not assigned and there is no default', kmod.name)
+ elif len(kmod.allowed_list) > 1:
+ if default_pkg:
+ if default_pkg in kmod.allowed_list:
+ chosen_pkg = default_pkg
+ else:
+ chosen_pkg = pick_closest_to_preffered(default_pkg, kmod.allowed_list)
+ if chosen_pkg:
+ log.debug('closest is %s', chosen_pkg.name)
+ if not chosen_pkg:
+ # multiple pkgs are allowed, but none is preferred or default
+ chosen_pkg = pick_topmost_allowed(kmod.allowed_list)
+ log.debug('topmost is %s', chosen_pkg.name)
+
+ if chosen_pkg:
+ kmod.allowed_list = set([chosen_pkg])
+ log.debug('%s: making to prefer %s (default: %s)', kmod.name, [chosen_pkg.name], default_name)
+ update_linked = True
+
+ visited: set[KMod] = set()
+ ret = ret + update_allowed(kmod, visited, update_linked)
+
+ log.debug('updated nodes: %s', ret)
+ settle(kmod_list)
+
+
+def load_config(config_pathname: str, kmod_list: KModList, variants=[]):
+ kmod_pkg_list = KModPackageList()
+
+ with open(config_pathname, 'r') as file:
+ yobj = yaml.safe_load(file)
+
+ for pkg_dict in yobj['packages']:
+ pkg_name = pkg_dict['name']
+ depends_on = pkg_dict.get('depends-on', [])
+ if_variant_in = pkg_dict.get('if_variant_in')
+
+ if if_variant_in is not None:
+ if not (set(variants) & set(if_variant_in)):
+ log.debug('Skipping %s for variants %s', pkg_name, variants)
+ continue
+
+ pkg_dep_list = []
+ for pkg_dep_name in depends_on:
+ pkg_dep = kmod_pkg_list.get(pkg_dep_name)
+ pkg_dep_list.append(pkg_dep)
+
+ pkg_obj = kmod_pkg_list.get(pkg_name)
+ if not pkg_obj:
+ pkg_obj = KModPackage(pkg_name, pkg_dep_list)
+ kmod_pkg_list.add_kmod_pkg(pkg_obj)
+ else:
+ log.error('package %s already exists?', pkg_name)
+
+ rules_list = yobj.get('rules', [])
+ for rule_dict in rules_list:
+ if_variant_in = rule_dict.get('if_variant_in')
+ exact_pkg = rule_dict.get('exact_pkg')
+
+ for key, value in rule_dict.items():
+ if key in ['if_variant_in', 'exact_pkg']:
+ continue
+
+ if if_variant_in is not None:
+ if not (set(variants) & set(if_variant_in)):
+ continue
+
+ rule = key
+ package_name = value
+
+ if not kmod_pkg_list.get(package_name):
+ raise Exception('Unknown package ' + package_name)
+
+ rule_type = 'wants'
+ if exact_pkg is True:
+ rule_type = 'needs'
+ elif key == 'default':
+ rule_type = 'default'
+ rule = '.*'
+
+ log.debug('found rule: %s', (package_name, rule_type, rule))
+ kmod_pkg_list.rules.append((package_name, rule_type, rule))
+
+ log.info('loaded config, rules: %s', len(kmod_pkg_list.rules))
+ return kmod_pkg_list
+
+
+def make_pictures(pkg_list: KModPackageList, kmod_list: KModList, filename: str, print_allowed=True):
+ f = open(filename + '.dot', 'w')
+
+ f.write('digraph {\n')
+ f.write('node [style=filled fillcolor="#f8f8f8"]\n')
+ f.write(' subgraph kmods {\n')
+ f.write(' "Legend" [shape=note label="kmod name\\n{desired package}\\nresulting package(s)"]\n')
+
+ for kmod in kmod_list.get_topo_order():
+ pkg_name = ''
+ attr = ''
+ if kmod.assigned_to_pkg:
+ attr = 'fillcolor="#eddad5" color="#b22800"'
+ pkg_name = kmod.assigned_to_pkg.name + "!"
+ if kmod.preferred_pkg:
+ attr = 'fillcolor="#ddddf5" color="#b268fe"'
+ pkg_name = kmod.preferred_pkg.name + "?"
+ allowed = ''
+ if kmod.allowed_list and print_allowed:
+ allowed = '=' + ' '.join([pkg.name for pkg in kmod.allowed_list])
+ f.write(' "%s" [label="%s\\n%s\\n%s" shape=box %s] \n' % (kmod.name, kmod.name, pkg_name, allowed, attr))
+
+ for kmod in kmod_list.get_topo_order():
+ for kmod_dep in kmod.depends_on:
+ f.write(' "%s" -> "%s";\n' % (kmod.name, kmod_dep.name))
+ f.write(' }\n')
+
+ f.write(' subgraph packages {\n')
+ for pkg in pkg_list:
+ desc = ''
+ if pkg.default:
+ desc = '/default'
+ f.write(' "%s" [label="%s\\n%s"] \n' % (pkg.name, pkg.name, desc))
+ for pkg_dep in pkg.depends_on:
+ f.write(' "%s" -> "%s";\n' % (pkg.name, pkg_dep.name))
+ f.write(' }\n')
+ f.write('}\n')
+
+ f.close()
+
+ # safe_run_command('dot -Tpng -Gdpi=150 %s.dot > %s.png' % (filename, filename))
+ safe_run_command('dot -Tsvg %s.dot > %s.svg' % (filename, filename))
+
+
+def sort_kmods(depmod_pathname: str, config_str: str, variants=[], do_pictures=''):
+ log.info('%s %s', depmod_pathname, config_str)
+ kmod_list = KModList()
+ kmod_list.load_depmod_file(depmod_pathname)
+
+ pkg_list = load_config(config_str, kmod_list, variants)
+
+ basename = os.path.splitext(config_str)[0]
+
+ apply_initial_labels(pkg_list, kmod_list)
+ if '0' in do_pictures:
+ make_pictures(pkg_list, kmod_list, basename + "_0", print_allowed=False)
+
+ try:
+
+ propagate_labels_1(pkg_list, kmod_list)
+ if '1' in do_pictures:
+ make_pictures(pkg_list, kmod_list, basename + "_1")
+ propagate_labels_2(pkg_list, kmod_list)
+ propagate_labels_3(pkg_list, kmod_list)
+ finally:
+ if 'f' in do_pictures:
+ make_pictures(pkg_list, kmod_list, basename + "_f")
+
+ return pkg_list, kmod_list
+
+
+def abbrev_list_for_report(alist: list[KMod]) -> str:
+ tmp_str = []
+ for kmod in alist[:2]:
+ if kmod.allowed_list:
+ tmp_str.append('%s(%s)' % (kmod.name, ' '.join([x.name for x in kmod.allowed_list])))
+ ret = ', '.join(tmp_str)
+ other_len = len(alist[2:])
+ if other_len > 0:
+ ret = ret + ' and %s other(s)' % (other_len)
+ return ret
+
+
+def print_report(pkg_list: KModPackageList, kmod_list: KModList):
+ log.info('*'*26 + ' REPORT ' + '*'*26)
+
+ kmods_err = 0
+ kmods_moved = 0
+ kmods_good = 0
+ for kmod in kmod_list.get_topo_order():
+ if not kmod.allowed_list:
+ log.error('%s: not assigned to any package! Please check the full log for details', kmod.name)
+ kmods_err = kmods_err + 1
+ continue
+
+ if len(kmod.allowed_list) > 1:
+ log.error('%s: assigned to more than one package! Please check the full log for details', kmod.name)
+ kmods_err = kmods_err + 1
+ continue
+
+ if not kmod.preferred_pkg:
+ # config doesn't care where it ended up
+ kmods_good = kmods_good + 1
+ continue
+
+ if kmod.preferred_pkg in kmod.allowed_list:
+ # it ended up where it needs to be
+ kmods_good = kmods_good + 1
+ continue
+
+ bad_parent_list = []
+ for kmod_parent in kmod.is_dependency_for:
+ if not is_pkg_child_to_any(kmod.preferred_pkg, kmod_parent.allowed_list):
+ bad_parent_list.append(kmod_parent)
+
+ bad_child_list = []
+ for kmod_child in kmod.depends_on:
+ if not is_pkg_parent_to_any(kmod.preferred_pkg, kmod_child.allowed_list):
+ bad_child_list.append(kmod_parent)
+
+ log.info('%s: wanted by %s but ended up in %s', kmod.name, [kmod.preferred_pkg.name], [pkg.name for pkg in kmod.allowed_list])
+ if bad_parent_list:
+ log.info('\thas conflicting parent: %s', abbrev_list_for_report(bad_parent_list))
+ if bad_child_list:
+ log.info('\thas conflicting children: %s', abbrev_list_for_report(bad_child_list))
+
+ kmods_moved = kmods_moved + 1
+
+ log.info('No. of kmod(s) assigned to preferred package: %s', kmods_good)
+ log.info('No. of kmod(s) moved to a related package: %s', kmods_moved)
+ log.info('No. of kmod(s) which could not be assigned: %s', kmods_err)
+ log.info('*'*60)
+
+ return kmods_err
+
+
+def write_modules_lists(path_prefix: str, pkg_list: KModPackageList, kmod_list: KModList):
+ kmod_list_alphabetical = sorted(kmod_list.get_topo_order(), key=lambda x: x.kmod_pathname)
+ for pkg in pkg_list:
+ output_path = os.path.join(path_prefix, pkg.name + '.list')
+ i = 0
+ with open(output_path, "w") as file:
+ for kmod in kmod_list_alphabetical:
+ if kmod.allowed_list and pkg in kmod.allowed_list:
+ file.write(kmod.kmod_pathname)
+ file.write('\n')
+ i = i + 1
+ log.info('Module list %s created with %s kmods', output_path, i)
+
+
+class FiltermodTests(unittest.TestCase):
+ do_pictures = ''
+
+ def setUp(self):
+ self.pkg_list = None
+ self.kmod_list = None
+
+ def _is_kmod_pkg(self, kmodname, pkgnames):
+ self.assertIsNotNone(self.pkg_list)
+ self.assertIsNotNone(self.kmod_list)
+
+ if type(pkgnames) is str:
+ pkgnames = [pkgnames]
+
+ expected_pkgs = []
+ for pkgname in pkgnames:
+ pkg = self.pkg_list.get(pkgname)
+ self.assertIsNotNone(pkg)
+ expected_pkgs.append(pkg)
+
+ kmod = self.kmod_list.get(kmodname)
+ self.assertIsNotNone(kmod)
+
+ if expected_pkgs:
+ self.assertTrue(len(kmod.allowed_list) == 1)
+ self.assertIn(next(iter(kmod.allowed_list)), expected_pkgs)
+ else:
+ self.assertEqual(kmod.allowed_list, set())
+
+ def test1a(self):
+ self.pkg_list, self.kmod_list = sort_kmods(get_td('test1.dep'), get_td('test1.yaml'),
+ do_pictures=FiltermodTests.do_pictures)
+
+ self._is_kmod_pkg('kmod1', 'modules-core')
+ self._is_kmod_pkg('kmod2', 'modules-core')
+ self._is_kmod_pkg('kmod3', 'modules')
+ self._is_kmod_pkg('kmod4', 'modules')
+
+ def test1b(self):
+ self.pkg_list, self.kmod_list = sort_kmods(get_td('test1.dep'), get_td('test1.yaml'),
+ do_pictures=FiltermodTests.do_pictures,
+ variants=['rt'])
+
+ self.assertIsNotNone(self.pkg_list.get('rt-kvm'))
+ self._is_kmod_pkg('kmod1', 'modules-core')
+ self._is_kmod_pkg('kmod2', 'modules-core')
+ self._is_kmod_pkg('kmod3', 'modules')
+ self._is_kmod_pkg('kmod4', 'rt-kvm')
+
+ def test2(self):
+ self.pkg_list, self.kmod_list = sort_kmods(get_td('test2.dep'), get_td('test2.yaml'),
+ do_pictures=FiltermodTests.do_pictures)
+
+ self._is_kmod_pkg('kmod1', 'modules-extra')
+ self._is_kmod_pkg('kmod2', 'modules')
+ self._is_kmod_pkg('kmod3', 'modules-core')
+ self._is_kmod_pkg('kmod4', 'modules-core')
+ self._is_kmod_pkg('kmod5', 'modules-core')
+ self._is_kmod_pkg('kmod6', 'modules-extra')
+ self._is_kmod_pkg('kmod8', 'modules')
+
+ def test3(self):
+ self.pkg_list, self.kmod_list = sort_kmods(get_td('test3.dep'), get_td('test3.yaml'),
+ do_pictures=FiltermodTests.do_pictures)
+
+ self._is_kmod_pkg('kmod2', ['modules-core', 'modules'])
+ self._is_kmod_pkg('kmod4', ['modules-core', 'modules-extra'])
+ self._is_kmod_pkg('kmod5', 'modules-core')
+ self._is_kmod_pkg('kmod6', 'modules-core')
+
+ def test4(self):
+ self.pkg_list, self.kmod_list = sort_kmods(get_td('test4.dep'), get_td('test4.yaml'),
+ do_pictures=FiltermodTests.do_pictures)
+
+ self._is_kmod_pkg('kmod0', 'modules')
+ self._is_kmod_pkg('kmod1', 'modules')
+ self._is_kmod_pkg('kmod2', 'modules')
+ self._is_kmod_pkg('kmod3', 'modules')
+ self._is_kmod_pkg('kmod4', 'modules')
+ self._is_kmod_pkg('kmod5', 'modules')
+ self._is_kmod_pkg('kmod6', 'modules')
+ self._is_kmod_pkg('kmod7', 'modules-partner2')
+ self._is_kmod_pkg('kmod8', 'modules-partner')
+ self._is_kmod_pkg('kmod9', 'modules-partner')
+
+ def _check_preffered_pkg(self, kmodname, pkgname):
+ kmod = self.kmod_list.get(kmodname)
+ self.assertIsNotNone(kmod)
+ self.assertEqual(kmod.preferred_pkg.name, pkgname)
+
+ def test5(self):
+ self.pkg_list, self.kmod_list = sort_kmods(get_td('test5.dep'), get_td('test5.yaml'),
+ do_pictures=FiltermodTests.do_pictures)
+
+ self._check_preffered_pkg('kmod2', 'modules')
+ self._check_preffered_pkg('kmod3', 'modules-partner')
+ self._check_preffered_pkg('kmod4', 'modules-partner')
+
+ def test6(self):
+ self.pkg_list, self.kmod_list = sort_kmods(get_td('test6.dep'), get_td('test6.yaml'),
+ do_pictures=FiltermodTests.do_pictures)
+
+ self._is_kmod_pkg('kmod2', 'modules-core')
+ self._is_kmod_pkg('kmod3', 'modules')
+ self._is_kmod_pkg('kmod4', 'modules')
+ self._is_kmod_pkg('kmod1', [])
+
+ def test7(self):
+ self.pkg_list, self.kmod_list = sort_kmods(get_td('test7.dep'), get_td('test7.yaml'),
+ do_pictures=FiltermodTests.do_pictures)
+
+ self._is_kmod_pkg('kmod1', 'modules-core')
+ self._is_kmod_pkg('kmod2', 'modules-core')
+ self._is_kmod_pkg('kmod3', 'modules-other')
+ self._is_kmod_pkg('kmod4', 'modules')
+
+
+def do_rpm_mapping_test(config_pathname, kmod_rpms):
+ kmod_dict = {}
+
+ def get_kmods_matching_re(pkgname, param_re):
+ matched = []
+ param_re = '^kernel/' + param_re
+ pattern = re.compile(param_re)
+
+ for kmod_pathname, kmod_rec in kmod_dict.items():
+ m = pattern.match(kmod_pathname)
+ if m:
+ matched.append(kmod_pathname)
+
+ return matched
+
+ for kmod_rpm in kmod_rpms.split():
+ filename = os.path.basename(kmod_rpm)
+
+ m = re.match(r'.*-modules-([^-]+)', filename)
+ if not m:
+ raise Exception('Unrecognized rpm ' + kmod_rpm + ', expected a kernel-modules* rpm')
+ pkgname = 'modules-' + m.group(1)
+ m = re.match(r'modules-([0-9.]+)', pkgname)
+ if m:
+ pkgname = 'modules'
+
+ tmpdir = os.path.join('tmp.filtermods', filename, pkgname)
+ if not os.path.exists(tmpdir):
+ log.info('creating tmp dir %s', tmpdir)
+ os.makedirs(tmpdir)
+ safe_run_command('rpm2cpio %s | cpio -id' % (os.path.abspath(kmod_rpm)), cwddir=tmpdir)
+ else:
+ log.info('using cached content of tmp dir: %s', tmpdir)
+
+ for path, subdirs, files in os.walk(tmpdir):
+ for name in files:
+ ret = re.match(r'.*/'+pkgname+'/lib/modules/[^/]+/[^/]+/(.*)', os.path.join(path, name))
+ if not ret:
+ continue
+
+ kmod_pathname = 'kernel/' + ret.group(1)
+ if not kmod_pathname.endswith('.xz') and not kmod_pathname.endswith('.ko'):
+ continue
+ if kmod_pathname in kmod_dict:
+ if pkgname not in kmod_dict[kmod_pathname]['target_pkgs']:
+ kmod_dict[kmod_pathname]['target_pkgs'].append(pkgname)
+ else:
+ kmod_dict[kmod_pathname] = {}
+ kmod_dict[kmod_pathname]['target_pkgs'] = [pkgname]
+ kmod_dict[kmod_pathname]['pkg'] = None
+ kmod_dict[kmod_pathname]['matched'] = False
+
+ kmod_pkg_list = load_config(config_pathname, None)
+
+ for package_name, rule_type, rule in kmod_pkg_list.rules:
+ kmod_names = get_kmods_matching_re(package_name, rule)
+
+ for kmod_pathname in kmod_names:
+ kmod_rec = kmod_dict[kmod_pathname]
+
+ if not kmod_rec['matched']:
+ kmod_rec['matched'] = True
+ kmod_rec['pkg'] = package_name
+ for kmod_pathname, kmod_rec in kmod_dict.items():
+ if kmod_rec['pkg'] not in kmod_rec['target_pkgs']:
+ log.warning('kmod %s wanted by config in %s, in tree it is: %s', kmod_pathname, [kmod_rec['pkg']], kmod_rec['target_pkgs'])
+ elif len(kmod_rec['target_pkgs']) > 1:
+ # if set(kmod_rec['target_pkgs']) != set(['modules', 'modules-core']):
+ log.warning('kmod %s multiple matches in tree: %s/%s', kmod_pathname, [kmod_rec['pkg']], kmod_rec['target_pkgs'])
+
+
+def cmd_sort(options):
+ do_pictures = ''
+ if options.graphviz:
+ do_pictures = '0f'
+
+ pkg_list, kmod_list = sort_kmods(options.depmod, options.config,
+ options.variants, do_pictures)
+ ret = print_report(pkg_list, kmod_list)
+ if options.output:
+ write_modules_lists(options.output, pkg_list, kmod_list)
+
+ return ret
+
+
+def cmd_print_rule_map(options):
+ kmod_list = KModList()
+ kmod_list.load_depmod_file(options.depmod)
+ pkg_list = load_config(options.config, kmod_list, options.variants)
+ apply_initial_labels(pkg_list, kmod_list, treat_default_as_wants=True)
+
+ for kmod in kmod_list.get_alphabetical_order():
+ print('%-20s %s' % (kmod.preferred_pkg, kmod.kmod_pathname))
+
+
+def cmd_selftest(options):
+ if options.graphviz:
+ FiltermodTests.do_pictures = '0f'
+
+ for arg in ['selftest', '-g', '--graphviz']:
+ if arg in sys.argv:
+ sys.argv.remove(arg)
+
+ unittest.main()
+ sys.exit(0)
+
+
+def cmd_cmp2rpm(options):
+ do_rpm_mapping_test(options.config, options.kmod_rpms)
+
+
+def main():
+ global log
+
+ parser = argparse.ArgumentParser()
+ parser.add_argument('-v', '--verbose', dest='verbose',
+ help='be more verbose', action='count', default=4)
+ parser.add_argument('-q', '--quiet', dest='quiet',
+ help='be more quiet', action='count', default=0)
+ parser.add_argument('-l', '--log-filename', dest='log_filename',
+ help='log filename', default='filtermods.log')
+
+ subparsers = parser.add_subparsers(dest='cmd')
+
+ def add_graphviz_arg(p):
+ p.add_argument('-g', '--graphviz', dest='graphviz',
+ help='generate graphviz visualizations',
+ action='store_true', default=False)
+
+ def add_config_arg(p):
+ p.add_argument('-c', '--config', dest='config', required=True,
+ help='path to yaml config with rules')
+
+ def add_depmod_arg(p):
+ p.add_argument('-d', '--depmod', dest='depmod', required=True,
+ help='path to modules.dep file')
+
+ def add_output_arg(p):
+ p.add_argument('-o', '--output', dest='output', default=None,
+ help='output $module_name.list files to directory specified by this parameter')
+
+ def add_variants_arg(p):
+ p.add_argument('-r', '--variants', dest='variants', action='append', default=[],
+ help='variants to enable in config')
+
+ def add_kmod_rpms_arg(p):
+ p.add_argument('-k', '--kmod-rpms', dest='kmod_rpms', required=True,
+ help='compare content of specified rpm(s) against yaml config rules')
+
+ parser_sort = subparsers.add_parser('sort', help='assign kmods specified by modules.dep using rules from yaml config')
+ add_config_arg(parser_sort)
+ add_depmod_arg(parser_sort)
+ add_output_arg(parser_sort)
+ add_variants_arg(parser_sort)
+ add_graphviz_arg(parser_sort)
+
+ parser_rule_map = subparsers.add_parser('rulemap', help='print how yaml config maps to kmods')
+ add_config_arg(parser_rule_map)
+ add_depmod_arg(parser_rule_map)
+ add_variants_arg(parser_rule_map)
+
+ parser_test = subparsers.add_parser('selftest', help='runs a self-test')
+ add_graphviz_arg(parser_test)
+
+ parser_cmp2rpm = subparsers.add_parser('cmp2rpm', help='compare ruleset against RPM(s)')
+ add_config_arg(parser_cmp2rpm)
+ add_kmod_rpms_arg(parser_cmp2rpm)
+
+ options = parser.parse_args()
+
+ if options.cmd == "selftest":
+ options.verbose = options.verbose - 2
+ options.verbose = max(options.verbose - options.quiet, 0)
+ levels = [NOTSET, CRITICAL, ERROR, WARN, INFO, DEBUG]
+ stdout_log_level = levels[min(options.verbose, len(levels) - 1)]
+
+ log = setup_logging(options.log_filename, stdout_log_level)
+
+ ret = 0
+ if options.cmd == "sort":
+ ret = cmd_sort(options)
+ elif options.cmd == "rulemap":
+ cmd_print_rule_map(options)
+ elif options.cmd == "selftest":
+ cmd_selftest(options)
+ elif options.cmd == "cmp2rpm":
+ cmd_cmp2rpm(options)
+ else:
+ parser.print_help()
+
+ return ret
+
+
+if __name__ == '__main__':
+ # import profile
+ # profile.run('main()', sort=1)
+ sys.exit(main())