import ast import os.path as osp import re import sys import warnings from collections import defaultdict from importlib.util import find_spec from typing import List, Optional, Tuple, Union from importlib import import_module as real_import_module import json import pickle from pathlib import Path import itertools import yaml from omegaconf import OmegaConf from pkg_resources.extern import packaging __import__('pkg_resources.extern.packaging.version') __import__('pkg_resources.extern.packaging.specifiers') __import__('pkg_resources.extern.packaging.requirements') __import__('pkg_resources.extern.packaging.markers') PYTHON_ROOT_DIR = osp.dirname(osp.dirname(sys.executable)) SYSTEM_PYTHON_PREFIX = '/usr/lib/python' MODULE2PACKAGE = { 'mmcls': 'mmcls', 'mmdet': 'mmdet', 'mmdet3d': 'mmdet3d', 'mmseg': 'mmsegmentation', 'mmaction': 'mmaction2', 'mmtrack': 'mmtrack', 'mmpose': 'mmpose', 'mmedit': 'mmedit', 'mmocr': 'mmocr', 'mmgen': 'mmgen', 'mmfewshot': 'mmfewshot', 'mmrazor': 'mmrazor', 'mmflow': 'mmflow', 'mmhuman3d': 'mmhuman3d', 'mmrotate': 'mmrotate', 'mmselfsup': 'mmselfsup', 'mmyolo': 'mmyolo', 'mmpretrain': 'mmpretrain', 'mmagic': 'mmagic', } # PKG2PROJECT is not a proper name to represent the mapping between module name # (module import from) and package name (used by pip install). Therefore, # PKG2PROJECT will be deprecated and this alias will only be kept until # MMEngine v1.0.0 PKG2PROJECT = MODULE2PACKAGE class ConfigParsingError(RuntimeError): """Raise error when failed to parse pure Python style config files.""" def _get_cfg_metainfo(package_path: str, cfg_path: str) -> dict: """Get target meta information from all 'metafile.yml' defined in `mode- index.yml` of external package. Args: package_path (str): Path of external package. cfg_path (str): Name of experiment config. Returns: dict: Meta information of target experiment. """ meta_index_path = osp.join(package_path, '.mim', 'model-index.yml') meta_index = OmegaConf.to_container(OmegaConf.load(meta_index_path), resolve=True) cfg_dict = dict() for meta_path in meta_index['Import']: meta_path = osp.join(package_path, '.mim', meta_path) cfg_meta = OmegaConf.to_container(OmegaConf.load(meta_path), resolve=True) for model_cfg in cfg_meta['Models']: if 'Config' not in model_cfg: warnings.warn(f'There is not `Config` define in {model_cfg}') continue cfg_name = model_cfg['Config'].partition('/')[-1] # Some config could have multiple weights, we only pick the # first one. if cfg_name in cfg_dict: continue cfg_dict[cfg_name] = model_cfg if cfg_path not in cfg_dict: raise ValueError(f'Expected configs: {cfg_dict.keys()}, but got ' f'{cfg_path}') return cfg_dict[cfg_path] def _get_external_cfg_path(package_path: str, cfg_file: str) -> str: """Get config path of external package. Args: package_path (str): Path of external package. cfg_file (str): Name of experiment config. Returns: str: Absolute config path from external package. """ cfg_file = cfg_file.split('.')[0] model_cfg = _get_cfg_metainfo(package_path, cfg_file) cfg_path = osp.join(package_path, model_cfg['Config']) check_file_exist(cfg_path) return cfg_path def _get_external_cfg_base_path(package_path: str, cfg_name: str) -> str: """Get base config path of external package. Args: package_path (str): Path of external package. cfg_name (str): External relative config path with 'package::'. Returns: str: Absolute config path from external package. """ cfg_path = osp.join(package_path, '.mim', 'configs', cfg_name) check_file_exist(cfg_path) return cfg_path def _get_package_and_cfg_path(cfg_path: str) -> Tuple[str, str]: """Get package name and relative config path. Args: cfg_path (str): External relative config path with 'package::'. Returns: Tuple[str, str]: Package name and config path. """ if re.match(r'\w*::\w*/\w*', cfg_path) is None: raise ValueError( '`_get_package_and_cfg_path` is used for get external package, ' 'please specify the package name and relative config path, just ' 'like `mmdet::faster_rcnn/faster-rcnn_r50_fpn_1x_coco.py`') package_cfg = cfg_path.split('::') if len(package_cfg) > 2: raise ValueError('`::` should only be used to separate package and ' 'config name, but found multiple `::` in ' f'{cfg_path}') package, cfg_path = package_cfg assert package in MODULE2PACKAGE, ( f'mmengine does not support to load {package} config.') package = MODULE2PACKAGE[package] return package, cfg_path class RemoveAssignFromAST(ast.NodeTransformer): """Remove Assign node if the target's name match the key. Args: key (str): The target name of the Assign node. """ def __init__(self, key): self.key = key def visit_Assign(self, node): if (isinstance(node.targets[0], ast.Name) and node.targets[0].id == self.key): return None else: return node def _is_builtin_module(module_name: str) -> bool: """Check if a module is a built-in module. Arg: module_name: name of module. """ if module_name.startswith('.'): return False if module_name.startswith('mmengine.config'): return True if module_name in sys.builtin_module_names: return True spec = find_spec(module_name.split('.')[0]) # Module not found if spec is None: return False origin_path = getattr(spec, 'origin', None) if origin_path is None: return False origin_path = osp.abspath(origin_path) if ('site-package' in origin_path or 'dist-package' in origin_path or not origin_path.startswith( (PYTHON_ROOT_DIR, SYSTEM_PYTHON_PREFIX))): return False else: return True class ImportTransformer(ast.NodeTransformer): """Convert the import syntax to the assignment of :class:`mmengine.config.LazyObject` and preload the base variable before parsing the configuration file. Since you are already looking at this part of the code, I believe you must be interested in the mechanism of the ``lazy_import`` feature of :class:`Config`. In this docstring, we will dive deeper into its principles. Most of OpenMMLab users maybe bothered with that: * In most of popular IDEs, they cannot navigate to the source code in configuration file * In most of popular IDEs, they cannot jump to the base file in current configuration file, which is much painful when the inheritance relationship is complex. In order to solve this problem, we introduce the ``lazy_import`` mode. A very intuitive idea for solving this problem is to import the module corresponding to the "type" field using the ``import`` syntax. Similarly, we can also ``import`` base file. However, this approach has a significant drawback. It requires triggering the import logic to parse the configuration file, which can be time-consuming. Additionally, it implies downloading numerous dependencies solely for the purpose of parsing the configuration file. However, it's possible that only a portion of the config will actually be used. For instance, the package used in the ``train_pipeline`` may not be necessary for an evaluation task. Forcing users to download these unused packages is not a desirable solution. To avoid this problem, we introduce :class:`mmengine.config.LazyObject` and :class:`mmengine.config.LazyAttr`. Before we proceed with further explanations, you may refer to the documentation of these two modules to gain an understanding of their functionalities. Actually, one of the functions of ``ImportTransformer`` is to hack the ``import`` syntax. It will replace the import syntax (exclude import the base files) with the assignment of ``LazyObject``. As for the import syntax of the base file, we cannot lazy import it since we're eager to merge the fields of current file and base files. Therefore, another function of the ``ImportTransformer`` is to collaborate with ``Config._parse_lazy_import`` to parse the base files. Args: global_dict (dict): The global dict of the current configuration file. If we divide ordinary Python syntax into two parts, namely the import section and the non-import section (assuming a simple case with imports at the beginning and the rest of the code following), the variables generated by the import statements are stored in global variables for subsequent code use. In this context, the ``global_dict`` represents the global variables required when executing the non-import code. ``global_dict`` will be filled during visiting the parsed code. base_dict (dict): All variables defined in base files. Examples: >>> from mmengine.config import read_base >>> >>> >>> with read_base(): >>> from .._base_.default_runtime import * >>> from .._base_.datasets.coco_detection import dataset In this case, the base_dict will be: Examples: >>> base_dict = { >>> '.._base_.default_runtime': ... >>> '.._base_.datasets.coco_detection': dataset} and `global_dict` will be updated like this: Examples: >>> global_dict.update(base_dict['.._base_.default_runtime']) # `import *` means update all data >>> global_dict.update(dataset=base_dict['.._base_.datasets.coco_detection']['dataset']) # only update `dataset` """ # noqa: E501 def __init__(self, global_dict: dict, base_dict: Optional[dict] = None, filename: Optional[str] = None): self.base_dict = base_dict if base_dict is not None else {} self.global_dict = global_dict # In Windows, the filename could be like this: # "C:\\Users\\runneradmin\\AppData\\Local\\" # Although it has been an raw string, ast.parse will firstly escape # it as the executed code: # "C:\Users\runneradmin\AppData\Local\\\" # As you see, the `\U` will be treated as a part of # the escape sequence during code parsing, leading to an # parsing error # Here we use `encode('unicode_escape').decode()` for double escaping if isinstance(filename, str): filename = filename.encode('unicode_escape').decode() self.filename = filename self.imported_obj: set = set() super().__init__() def visit_ImportFrom( self, node: ast.ImportFrom ) -> Optional[Union[List[ast.Assign], ast.ImportFrom]]: """Hack the ``from ... import ...`` syntax and update the global_dict. Examples: >>> from mmdet.models import RetinaNet Will be parsed as: Examples: >>> RetinaNet = lazyObject('mmdet.models', 'RetinaNet') ``global_dict`` will also be updated by ``base_dict`` as the class docstring says. Args: node (ast.AST): The node of the current import statement. Returns: Optional[List[ast.Assign]]: There three cases: * If the node is a statement of importing base files. None will be returned. * If the node is a statement of importing a builtin module, node will be directly returned * Otherwise, it will return the assignment statements of ``LazyObject``. """ # Built-in modules will not be parsed as LazyObject module = f'{node.level*"."}{node.module}' if _is_builtin_module(module): # Make sure builtin module will be added into `self.imported_obj` for alias in node.names: if alias.asname is not None: self.imported_obj.add(alias.asname) elif alias.name == '*': raise ConfigParsingError( 'Cannot import * from non-base config') else: self.imported_obj.add(alias.name) return node if module in self.base_dict: for alias_node in node.names: if alias_node.name == '*': self.global_dict.update(self.base_dict[module]) return None if alias_node.asname is not None: base_key = alias_node.asname else: base_key = alias_node.name self.global_dict[base_key] = self.base_dict[module][ alias_node.name] return None nodes: List[ast.Assign] = [] for alias_node in node.names: # `ast.alias` has lineno attr after Python 3.10, if hasattr(alias_node, 'lineno'): lineno = alias_node.lineno else: lineno = node.lineno if alias_node.name == '*': # TODO: If users import * from a non-config module, it should # fallback to import the real module and raise a warning to # remind users the real module will be imported which will slow # down the parsing speed. raise ConfigParsingError( 'Illegal syntax in config! `from xxx import *` is not ' 'allowed to appear outside the `if base:` statement') elif alias_node.asname is not None: # case1: # from mmengine.dataset import BaseDataset as Dataset -> # Dataset = LazyObject('mmengine.dataset', 'BaseDataset') code = f'{alias_node.asname} = LazyObject("{module}", "{alias_node.name}", "{self.filename}, line {lineno}")' # noqa: E501 self.imported_obj.add(alias_node.asname) else: # case2: # from mmengine.model import BaseModel # BaseModel = LazyObject('mmengine.model', 'BaseModel') code = f'{alias_node.name} = LazyObject("{module}", "{alias_node.name}", "{self.filename}, line {lineno}")' # noqa: E501 self.imported_obj.add(alias_node.name) try: nodes.append(ast.parse(code).body[0]) # type: ignore except Exception as e: raise ConfigParsingError( f'Cannot import {alias_node} from {module}' '1. Cannot import * from 3rd party lib in the config ' 'file\n' '2. Please check if the module is a base config which ' 'should be added to `_base_`\n') from e return nodes def visit_Import(self, node) -> Union[ast.Assign, ast.Import]: """Work with ``_gather_abs_import_lazyobj`` to hack the ``import ...`` syntax. Examples: >>> import mmcls.models >>> import mmcls.datasets >>> import mmcls Will be parsed as: Examples: >>> # import mmcls.models; import mmcls.datasets; import mmcls >>> mmcls = lazyObject(['mmcls', 'mmcls.datasets', 'mmcls.models']) Args: node (ast.AST): The node of the current import statement. Returns: ast.Assign: If the import statement is ``import ... as ...``, ast.Assign will be returned, otherwise node will be directly returned. """ # For absolute import like: `import mmdet.configs as configs`. # It will be parsed as: # configs = LazyObject('mmdet.configs') # For absolute import like: # `import mmdet.configs` # `import mmdet.configs.default_runtime` # This will be parsed as # mmdet = LazyObject(['mmdet.configs.default_runtime', 'mmdet.configs]) # However, visit_Import cannot gather other import information, so # `_gather_abs_import_LazyObject` will gather all import information # from the same module and construct the LazyObject. alias_list = node.names assert len(alias_list) == 1, ( 'Illegal syntax in config! import multiple modules in one line is ' 'not supported') # TODO Support multiline import alias = alias_list[0] if alias.asname is not None: self.imported_obj.add(alias.asname) if _is_builtin_module(alias.name.split('.')[0]): return node return ast.parse( # type: ignore f'{alias.asname} = LazyObject(' f'"{alias.name}",' f'location="{self.filename}, line {node.lineno}")').body[0] return node def _gather_abs_import_lazyobj(tree: ast.Module, filename: Optional[str] = None): """Experimental implementation of gathering absolute import information.""" if isinstance(filename, str): filename = filename.encode('unicode_escape').decode() imported = defaultdict(list) abs_imported = set() new_body: List[ast.stmt] = [] # module2node is used to get lineno when Python < 3.10 module2node: dict = dict() for node in tree.body: if isinstance(node, ast.Import): for alias in node.names: # Skip converting built-in module to LazyObject if _is_builtin_module(alias.name): new_body.append(node) continue module = alias.name.split('.')[0] module2node.setdefault(module, node) imported[module].append(alias) continue new_body.append(node) for key, value in imported.items(): names = [_value.name for _value in value] if hasattr(value[0], 'lineno'): lineno = value[0].lineno else: lineno = module2node[key].lineno lazy_module_assign = ast.parse( f'{key} = LazyObject({names}, location="{filename}, line {lineno}")' # noqa: E501 ) # noqa: E501 abs_imported.add(key) new_body.insert(0, lazy_module_assign.body[0]) tree.body = new_body return tree, abs_imported def get_installed_path(package: str) -> str: """Get installed path of package. Args: package (str): Name of package. Example: >>> get_installed_path('mmcls') >>> '.../lib/python3.7/site-packages/mmcls' """ import importlib.util from pkg_resources import DistributionNotFound, get_distribution # if the package name is not the same as module name, module name should be # inferred. For example, mmcv-full is the package name, but mmcv is module # name. If we want to get the installed path of mmcv-full, we should concat # the pkg.location and module name try: pkg = get_distribution(package) except DistributionNotFound as e: # if the package is not installed, package path set in PYTHONPATH # can be detected by `find_spec` spec = importlib.util.find_spec(package) if spec is not None: if spec.origin is not None: return osp.dirname(spec.origin) else: # `get_installed_path` cannot get the installed path of # namespace packages raise RuntimeError( f'{package} is a namespace package, which is invalid ' 'for `get_install_path`') else: raise e possible_path = osp.join(pkg.location, package) # type: ignore if osp.exists(possible_path): return possible_path else: return osp.join(pkg.location, package2module(package)) # type: ignore def import_modules_from_strings(imports, allow_failed_imports=False): """Import modules from the given list of strings. Args: imports (list | str | None): The given module names to be imported. allow_failed_imports (bool): If True, the failed imports will return None. Otherwise, an ImportError is raise. Defaults to False. Returns: list[module] | module | None: The imported modules. Examples: >>> osp, sys = import_modules_from_strings( ... ['os.path', 'sys']) >>> import os.path as osp_ >>> import sys as sys_ >>> assert osp == osp_ >>> assert sys == sys_ """ if not imports: return single_import = False if isinstance(imports, str): single_import = True imports = [imports] if not isinstance(imports, list): raise TypeError( f'custom_imports must be a list but got type {type(imports)}') imported = [] for imp in imports: if not isinstance(imp, str): raise TypeError( f'{imp} is of type {type(imp)} and cannot be imported.') try: imported_tmp = import_module(imp) except ImportError: if allow_failed_imports: warnings.warn(f'{imp} failed to import and is ignored.', UserWarning) imported_tmp = None else: raise ImportError(f'Failed to import {imp}') imported.append(imported_tmp) if single_import: imported = imported[0] return imported def import_module(name, package=None): """Import a module, optionally supporting relative imports.""" return real_import_module(name, package) def is_installed(package: str) -> bool: """Check package whether installed. Args: package (str): Name of package to be checked. """ # When executing `import mmengine.runner`, # pkg_resources will be imported and it takes too much time. # Therefore, import it in function scope to save time. import importlib.util import pkg_resources from pkg_resources import get_distribution # refresh the pkg_resources # more datails at https://github.com/pypa/setuptools/issues/373 importlib.reload(pkg_resources) try: get_distribution(package) return True except pkg_resources.DistributionNotFound: spec = importlib.util.find_spec(package) if spec is None: return False elif spec.origin is not None: return True else: return False def dump(obj, file=None, file_format=None, **kwargs): """Dump data to json/yaml/pickle strings or files (mmengine-like replacement).""" if isinstance(file, Path): file = str(file) # Guess file format if not explicitly given if file_format is None: if isinstance(file, str): file_format = file.split('.')[-1].lower() elif file is None: raise ValueError("file_format must be specified if file is None") if file_format not in ['json', 'yaml', 'yml', 'pkl', 'pickle']: raise TypeError(f"Unsupported file format: {file_format}") # Convert YAML extension if file_format == 'yml': file_format = 'yaml' if file_format == 'pickle': file_format = 'pkl' # Handle output to string if file is None: if file_format == 'json': return json.dumps(obj, indent=4, **kwargs) elif file_format == 'yaml': return yaml.dump(obj, **kwargs) elif file_format == 'pkl': return pickle.dumps(obj, **kwargs) # Handle output to file mode = 'w' if file_format in ['json', 'yaml'] else 'wb' with open(file, mode, encoding='utf-8' if 'b' not in mode else None) as f: if file_format == 'json': json.dump(obj, f, indent=4, **kwargs) elif file_format == 'yaml': yaml.dump(obj, f, **kwargs) elif file_format == 'pkl': pickle.dump(obj, f, **kwargs) return True def check_file_exist(filename, msg_tmpl='file "{}" does not exist'): if not osp.isfile(filename): raise FileNotFoundError(msg_tmpl.format(filename)) def package2module(package: str): """Infer module name from package. Args: package (str): Package to infer module name. """ pkg = get_distribution(package) if pkg.has_metadata('top_level.txt'): module_name = pkg.get_metadata('top_level.txt').split('\n')[0] return module_name else: raise ValueError( highlighted_error(f'can not infer the module name of {package}')) def get_distribution(dist): """Return a current distribution object for a Requirement or string""" if isinstance(dist, str): dist = Requirement.parse(dist) return dist def highlighted_error(msg: Union[str, Exception]) -> str: return click.style(msg, fg='red', bold=True) # type: ignore class Requirement(packaging.requirements.Requirement): def __init__(self, requirement_string): """DO NOT CALL THIS UNDOCUMENTED METHOD; use Requirement.parse()!""" super(Requirement, self).__init__(requirement_string) self.unsafe_name = self.name project_name = safe_name(self.name) self.project_name, self.key = project_name, project_name.lower() self.specs = [ (spec.operator, spec.version) for spec in self.specifier] self.extras = tuple(map(safe_extra, self.extras)) self.hashCmp = ( self.key, self.url, self.specifier, frozenset(self.extras), str(self.marker) if self.marker else None, ) self.__hash = hash(self.hashCmp) def __eq__(self, other): return ( isinstance(other, Requirement) and self.hashCmp == other.hashCmp ) def __ne__(self, other): return not self == other def __contains__(self, item): if item.key != self.key: return False item = item.version # Allow prereleases always in order to match the previous behavior of # this method. In the future this should be smarter and follow PEP 440 # more accurately. return self.specifier.contains(item, prereleases=True) def __hash__(self): return self.__hash def __repr__(self): return "Requirement.parse(%r)" % str(self) @staticmethod def parse(s): req, = parse_requirements(s) return req def parse_requirements(strs): """Yield ``Requirement`` objects for each specification in `strs` `strs` must be a string, or a (possibly-nested) iterable thereof. """ # create a steppable iterator, so we can handle \-continuations lines = iter(yield_lines(strs)) for line in lines: # Drop comments -- a hash without a space may be in a URL. if ' #' in line: line = line[:line.find(' #')] # If there is a line continuation, drop it, and append the next line. if line.endswith('\\'): line = line[:-2].strip() try: line += next(lines) except StopIteration: return yield Requirement(line) def yield_lines(iterable): """Yield valid lines of a string or iterable""" return itertools.chain.from_iterable(map(yield_lines, iterable)) def safe_extra(extra): """Convert an arbitrary string to a standard 'extra' name Any runs of non-alphanumeric characters are replaced with a single '_', and the result is always lowercased. """ return re.sub('[^A-Za-z0-9.-]+', '_', extra).lower() def safe_name(name): """Convert an arbitrary string to a standard distribution name Any runs of non-alphanumeric/. characters are replaced with a single '-'. """ return re.sub('[^A-Za-z0-9.]+', '-', name)