|
import ast |
|
import os.path as osp |
|
import re |
|
import sys |
|
import warnings |
|
from collections import defaultdict |
|
from importlib.util import find_spec |
|
from typing import List, Optional, Tuple, Union |
|
from importlib import import_module as real_import_module |
|
import json |
|
import pickle |
|
from pathlib import Path |
|
import itertools |
|
|
|
import yaml |
|
from omegaconf import OmegaConf |
|
|
|
from pkg_resources.extern import packaging |
|
__import__('pkg_resources.extern.packaging.version') |
|
__import__('pkg_resources.extern.packaging.specifiers') |
|
__import__('pkg_resources.extern.packaging.requirements') |
|
__import__('pkg_resources.extern.packaging.markers') |
|
|
|
|
|
PYTHON_ROOT_DIR = osp.dirname(osp.dirname(sys.executable)) |
|
SYSTEM_PYTHON_PREFIX = '/usr/lib/python' |
|
|
|
MODULE2PACKAGE = { |
|
'mmcls': 'mmcls', |
|
'mmdet': 'mmdet', |
|
'mmdet3d': 'mmdet3d', |
|
'mmseg': 'mmsegmentation', |
|
'mmaction': 'mmaction2', |
|
'mmtrack': 'mmtrack', |
|
'mmpose': 'mmpose', |
|
'mmedit': 'mmedit', |
|
'mmocr': 'mmocr', |
|
'mmgen': 'mmgen', |
|
'mmfewshot': 'mmfewshot', |
|
'mmrazor': 'mmrazor', |
|
'mmflow': 'mmflow', |
|
'mmhuman3d': 'mmhuman3d', |
|
'mmrotate': 'mmrotate', |
|
'mmselfsup': 'mmselfsup', |
|
'mmyolo': 'mmyolo', |
|
'mmpretrain': 'mmpretrain', |
|
'mmagic': 'mmagic', |
|
} |
|
|
|
|
|
|
|
|
|
|
|
PKG2PROJECT = MODULE2PACKAGE |
|
|
|
|
|
class ConfigParsingError(RuntimeError): |
|
"""Raise error when failed to parse pure Python style config files.""" |
|
|
|
|
|
def _get_cfg_metainfo(package_path: str, cfg_path: str) -> dict: |
|
"""Get target meta information from all 'metafile.yml' defined in `mode- |
|
index.yml` of external package. |
|
|
|
Args: |
|
package_path (str): Path of external package. |
|
cfg_path (str): Name of experiment config. |
|
|
|
Returns: |
|
dict: Meta information of target experiment. |
|
""" |
|
meta_index_path = osp.join(package_path, '.mim', 'model-index.yml') |
|
meta_index = OmegaConf.to_container(OmegaConf.load(meta_index_path), resolve=True) |
|
cfg_dict = dict() |
|
for meta_path in meta_index['Import']: |
|
meta_path = osp.join(package_path, '.mim', meta_path) |
|
cfg_meta = OmegaConf.to_container(OmegaConf.load(meta_path), resolve=True) |
|
for model_cfg in cfg_meta['Models']: |
|
if 'Config' not in model_cfg: |
|
warnings.warn(f'There is not `Config` define in {model_cfg}') |
|
continue |
|
cfg_name = model_cfg['Config'].partition('/')[-1] |
|
|
|
|
|
if cfg_name in cfg_dict: |
|
continue |
|
cfg_dict[cfg_name] = model_cfg |
|
if cfg_path not in cfg_dict: |
|
raise ValueError(f'Expected configs: {cfg_dict.keys()}, but got ' |
|
f'{cfg_path}') |
|
return cfg_dict[cfg_path] |
|
|
|
|
|
def _get_external_cfg_path(package_path: str, cfg_file: str) -> str: |
|
"""Get config path of external package. |
|
|
|
Args: |
|
package_path (str): Path of external package. |
|
cfg_file (str): Name of experiment config. |
|
|
|
Returns: |
|
str: Absolute config path from external package. |
|
""" |
|
cfg_file = cfg_file.split('.')[0] |
|
model_cfg = _get_cfg_metainfo(package_path, cfg_file) |
|
cfg_path = osp.join(package_path, model_cfg['Config']) |
|
check_file_exist(cfg_path) |
|
return cfg_path |
|
|
|
|
|
def _get_external_cfg_base_path(package_path: str, cfg_name: str) -> str: |
|
"""Get base config path of external package. |
|
|
|
Args: |
|
package_path (str): Path of external package. |
|
cfg_name (str): External relative config path with 'package::'. |
|
|
|
Returns: |
|
str: Absolute config path from external package. |
|
""" |
|
cfg_path = osp.join(package_path, '.mim', 'configs', cfg_name) |
|
check_file_exist(cfg_path) |
|
return cfg_path |
|
|
|
|
|
def _get_package_and_cfg_path(cfg_path: str) -> Tuple[str, str]: |
|
"""Get package name and relative config path. |
|
|
|
Args: |
|
cfg_path (str): External relative config path with 'package::'. |
|
|
|
Returns: |
|
Tuple[str, str]: Package name and config path. |
|
""" |
|
if re.match(r'\w*::\w*/\w*', cfg_path) is None: |
|
raise ValueError( |
|
'`_get_package_and_cfg_path` is used for get external package, ' |
|
'please specify the package name and relative config path, just ' |
|
'like `mmdet::faster_rcnn/faster-rcnn_r50_fpn_1x_coco.py`') |
|
package_cfg = cfg_path.split('::') |
|
if len(package_cfg) > 2: |
|
raise ValueError('`::` should only be used to separate package and ' |
|
'config name, but found multiple `::` in ' |
|
f'{cfg_path}') |
|
package, cfg_path = package_cfg |
|
assert package in MODULE2PACKAGE, ( |
|
f'mmengine does not support to load {package} config.') |
|
package = MODULE2PACKAGE[package] |
|
return package, cfg_path |
|
|
|
|
|
class RemoveAssignFromAST(ast.NodeTransformer): |
|
"""Remove Assign node if the target's name match the key. |
|
|
|
Args: |
|
key (str): The target name of the Assign node. |
|
""" |
|
|
|
def __init__(self, key): |
|
self.key = key |
|
|
|
def visit_Assign(self, node): |
|
if (isinstance(node.targets[0], ast.Name) |
|
and node.targets[0].id == self.key): |
|
return None |
|
else: |
|
return node |
|
|
|
|
|
def _is_builtin_module(module_name: str) -> bool: |
|
"""Check if a module is a built-in module. |
|
|
|
Arg: |
|
module_name: name of module. |
|
""" |
|
if module_name.startswith('.'): |
|
return False |
|
if module_name.startswith('mmengine.config'): |
|
return True |
|
if module_name in sys.builtin_module_names: |
|
return True |
|
spec = find_spec(module_name.split('.')[0]) |
|
|
|
if spec is None: |
|
return False |
|
origin_path = getattr(spec, 'origin', None) |
|
if origin_path is None: |
|
return False |
|
origin_path = osp.abspath(origin_path) |
|
if ('site-package' in origin_path or 'dist-package' in origin_path |
|
or not origin_path.startswith( |
|
(PYTHON_ROOT_DIR, SYSTEM_PYTHON_PREFIX))): |
|
return False |
|
else: |
|
return True |
|
|
|
|
|
class ImportTransformer(ast.NodeTransformer): |
|
"""Convert the import syntax to the assignment of |
|
:class:`mmengine.config.LazyObject` and preload the base variable before |
|
parsing the configuration file. |
|
|
|
Since you are already looking at this part of the code, I believe you must |
|
be interested in the mechanism of the ``lazy_import`` feature of |
|
:class:`Config`. In this docstring, we will dive deeper into its |
|
principles. |
|
|
|
Most of OpenMMLab users maybe bothered with that: |
|
|
|
* In most of popular IDEs, they cannot navigate to the source code in |
|
configuration file |
|
* In most of popular IDEs, they cannot jump to the base file in current |
|
configuration file, which is much painful when the inheritance |
|
relationship is complex. |
|
|
|
In order to solve this problem, we introduce the ``lazy_import`` mode. |
|
|
|
A very intuitive idea for solving this problem is to import the module |
|
corresponding to the "type" field using the ``import`` syntax. Similarly, |
|
we can also ``import`` base file. |
|
|
|
However, this approach has a significant drawback. It requires triggering |
|
the import logic to parse the configuration file, which can be |
|
time-consuming. Additionally, it implies downloading numerous dependencies |
|
solely for the purpose of parsing the configuration file. |
|
However, it's possible that only a portion of the config will actually be |
|
used. For instance, the package used in the ``train_pipeline`` may not |
|
be necessary for an evaluation task. Forcing users to download these |
|
unused packages is not a desirable solution. |
|
|
|
To avoid this problem, we introduce :class:`mmengine.config.LazyObject` and |
|
:class:`mmengine.config.LazyAttr`. Before we proceed with further |
|
explanations, you may refer to the documentation of these two modules to |
|
gain an understanding of their functionalities. |
|
|
|
Actually, one of the functions of ``ImportTransformer`` is to hack the |
|
``import`` syntax. It will replace the import syntax |
|
(exclude import the base files) with the assignment of ``LazyObject``. |
|
|
|
As for the import syntax of the base file, we cannot lazy import it since |
|
we're eager to merge the fields of current file and base files. Therefore, |
|
another function of the ``ImportTransformer`` is to collaborate with |
|
``Config._parse_lazy_import`` to parse the base files. |
|
|
|
Args: |
|
global_dict (dict): The global dict of the current configuration file. |
|
If we divide ordinary Python syntax into two parts, namely the |
|
import section and the non-import section (assuming a simple case |
|
with imports at the beginning and the rest of the code following), |
|
the variables generated by the import statements are stored in |
|
global variables for subsequent code use. In this context, |
|
the ``global_dict`` represents the global variables required when |
|
executing the non-import code. ``global_dict`` will be filled |
|
during visiting the parsed code. |
|
base_dict (dict): All variables defined in base files. |
|
|
|
Examples: |
|
>>> from mmengine.config import read_base |
|
>>> |
|
>>> |
|
>>> with read_base(): |
|
>>> from .._base_.default_runtime import * |
|
>>> from .._base_.datasets.coco_detection import dataset |
|
|
|
In this case, the base_dict will be: |
|
|
|
Examples: |
|
>>> base_dict = { |
|
>>> '.._base_.default_runtime': ... |
|
>>> '.._base_.datasets.coco_detection': dataset} |
|
|
|
and `global_dict` will be updated like this: |
|
|
|
Examples: |
|
>>> global_dict.update(base_dict['.._base_.default_runtime']) # `import *` means update all data |
|
>>> global_dict.update(dataset=base_dict['.._base_.datasets.coco_detection']['dataset']) # only update `dataset` |
|
""" |
|
|
|
def __init__(self, |
|
global_dict: dict, |
|
base_dict: Optional[dict] = None, |
|
filename: Optional[str] = None): |
|
self.base_dict = base_dict if base_dict is not None else {} |
|
self.global_dict = global_dict |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if isinstance(filename, str): |
|
filename = filename.encode('unicode_escape').decode() |
|
self.filename = filename |
|
self.imported_obj: set = set() |
|
super().__init__() |
|
|
|
def visit_ImportFrom( |
|
self, node: ast.ImportFrom |
|
) -> Optional[Union[List[ast.Assign], ast.ImportFrom]]: |
|
"""Hack the ``from ... import ...`` syntax and update the global_dict. |
|
|
|
Examples: |
|
>>> from mmdet.models import RetinaNet |
|
|
|
Will be parsed as: |
|
|
|
Examples: |
|
>>> RetinaNet = lazyObject('mmdet.models', 'RetinaNet') |
|
|
|
``global_dict`` will also be updated by ``base_dict`` as the |
|
class docstring says. |
|
|
|
Args: |
|
node (ast.AST): The node of the current import statement. |
|
|
|
Returns: |
|
Optional[List[ast.Assign]]: There three cases: |
|
|
|
* If the node is a statement of importing base files. |
|
None will be returned. |
|
* If the node is a statement of importing a builtin module, |
|
node will be directly returned |
|
* Otherwise, it will return the assignment statements of |
|
``LazyObject``. |
|
""" |
|
|
|
module = f'{node.level*"."}{node.module}' |
|
if _is_builtin_module(module): |
|
|
|
for alias in node.names: |
|
if alias.asname is not None: |
|
self.imported_obj.add(alias.asname) |
|
elif alias.name == '*': |
|
raise ConfigParsingError( |
|
'Cannot import * from non-base config') |
|
else: |
|
self.imported_obj.add(alias.name) |
|
return node |
|
|
|
if module in self.base_dict: |
|
for alias_node in node.names: |
|
if alias_node.name == '*': |
|
self.global_dict.update(self.base_dict[module]) |
|
return None |
|
if alias_node.asname is not None: |
|
base_key = alias_node.asname |
|
else: |
|
base_key = alias_node.name |
|
self.global_dict[base_key] = self.base_dict[module][ |
|
alias_node.name] |
|
return None |
|
|
|
nodes: List[ast.Assign] = [] |
|
for alias_node in node.names: |
|
|
|
if hasattr(alias_node, 'lineno'): |
|
lineno = alias_node.lineno |
|
else: |
|
lineno = node.lineno |
|
if alias_node.name == '*': |
|
|
|
|
|
|
|
|
|
raise ConfigParsingError( |
|
'Illegal syntax in config! `from xxx import *` is not ' |
|
'allowed to appear outside the `if base:` statement') |
|
elif alias_node.asname is not None: |
|
|
|
|
|
|
|
code = f'{alias_node.asname} = LazyObject("{module}", "{alias_node.name}", "{self.filename}, line {lineno}")' |
|
self.imported_obj.add(alias_node.asname) |
|
else: |
|
|
|
|
|
|
|
code = f'{alias_node.name} = LazyObject("{module}", "{alias_node.name}", "{self.filename}, line {lineno}")' |
|
self.imported_obj.add(alias_node.name) |
|
try: |
|
nodes.append(ast.parse(code).body[0]) |
|
except Exception as e: |
|
raise ConfigParsingError( |
|
f'Cannot import {alias_node} from {module}' |
|
'1. Cannot import * from 3rd party lib in the config ' |
|
'file\n' |
|
'2. Please check if the module is a base config which ' |
|
'should be added to `_base_`\n') from e |
|
return nodes |
|
|
|
def visit_Import(self, node) -> Union[ast.Assign, ast.Import]: |
|
"""Work with ``_gather_abs_import_lazyobj`` to hack the ``import ...`` |
|
syntax. |
|
|
|
Examples: |
|
>>> import mmcls.models |
|
>>> import mmcls.datasets |
|
>>> import mmcls |
|
|
|
Will be parsed as: |
|
|
|
Examples: |
|
>>> # import mmcls.models; import mmcls.datasets; import mmcls |
|
>>> mmcls = lazyObject(['mmcls', 'mmcls.datasets', 'mmcls.models']) |
|
|
|
Args: |
|
node (ast.AST): The node of the current import statement. |
|
|
|
Returns: |
|
ast.Assign: If the import statement is ``import ... as ...``, |
|
ast.Assign will be returned, otherwise node will be directly |
|
returned. |
|
""" |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
alias_list = node.names |
|
assert len(alias_list) == 1, ( |
|
'Illegal syntax in config! import multiple modules in one line is ' |
|
'not supported') |
|
|
|
alias = alias_list[0] |
|
if alias.asname is not None: |
|
self.imported_obj.add(alias.asname) |
|
if _is_builtin_module(alias.name.split('.')[0]): |
|
return node |
|
return ast.parse( |
|
f'{alias.asname} = LazyObject(' |
|
f'"{alias.name}",' |
|
f'location="{self.filename}, line {node.lineno}")').body[0] |
|
return node |
|
|
|
|
|
def _gather_abs_import_lazyobj(tree: ast.Module, |
|
filename: Optional[str] = None): |
|
"""Experimental implementation of gathering absolute import information.""" |
|
if isinstance(filename, str): |
|
filename = filename.encode('unicode_escape').decode() |
|
imported = defaultdict(list) |
|
abs_imported = set() |
|
new_body: List[ast.stmt] = [] |
|
|
|
module2node: dict = dict() |
|
for node in tree.body: |
|
if isinstance(node, ast.Import): |
|
for alias in node.names: |
|
|
|
if _is_builtin_module(alias.name): |
|
new_body.append(node) |
|
continue |
|
module = alias.name.split('.')[0] |
|
module2node.setdefault(module, node) |
|
imported[module].append(alias) |
|
continue |
|
new_body.append(node) |
|
|
|
for key, value in imported.items(): |
|
names = [_value.name for _value in value] |
|
if hasattr(value[0], 'lineno'): |
|
lineno = value[0].lineno |
|
else: |
|
lineno = module2node[key].lineno |
|
lazy_module_assign = ast.parse( |
|
f'{key} = LazyObject({names}, location="{filename}, line {lineno}")' |
|
) |
|
abs_imported.add(key) |
|
new_body.insert(0, lazy_module_assign.body[0]) |
|
tree.body = new_body |
|
return tree, abs_imported |
|
|
|
|
|
def get_installed_path(package: str) -> str: |
|
"""Get installed path of package. |
|
|
|
Args: |
|
package (str): Name of package. |
|
|
|
Example: |
|
>>> get_installed_path('mmcls') |
|
>>> '.../lib/python3.7/site-packages/mmcls' |
|
""" |
|
import importlib.util |
|
|
|
from pkg_resources import DistributionNotFound, get_distribution |
|
|
|
|
|
|
|
|
|
|
|
try: |
|
pkg = get_distribution(package) |
|
except DistributionNotFound as e: |
|
|
|
|
|
spec = importlib.util.find_spec(package) |
|
if spec is not None: |
|
if spec.origin is not None: |
|
return osp.dirname(spec.origin) |
|
else: |
|
|
|
|
|
raise RuntimeError( |
|
f'{package} is a namespace package, which is invalid ' |
|
'for `get_install_path`') |
|
else: |
|
raise e |
|
|
|
possible_path = osp.join(pkg.location, package) |
|
if osp.exists(possible_path): |
|
return possible_path |
|
else: |
|
return osp.join(pkg.location, package2module(package)) |
|
|
|
|
|
def import_modules_from_strings(imports, allow_failed_imports=False): |
|
"""Import modules from the given list of strings. |
|
|
|
Args: |
|
imports (list | str | None): The given module names to be imported. |
|
allow_failed_imports (bool): If True, the failed imports will return |
|
None. Otherwise, an ImportError is raise. Defaults to False. |
|
|
|
Returns: |
|
list[module] | module | None: The imported modules. |
|
|
|
Examples: |
|
>>> osp, sys = import_modules_from_strings( |
|
... ['os.path', 'sys']) |
|
>>> import os.path as osp_ |
|
>>> import sys as sys_ |
|
>>> assert osp == osp_ |
|
>>> assert sys == sys_ |
|
""" |
|
if not imports: |
|
return |
|
single_import = False |
|
if isinstance(imports, str): |
|
single_import = True |
|
imports = [imports] |
|
if not isinstance(imports, list): |
|
raise TypeError( |
|
f'custom_imports must be a list but got type {type(imports)}') |
|
imported = [] |
|
for imp in imports: |
|
if not isinstance(imp, str): |
|
raise TypeError( |
|
f'{imp} is of type {type(imp)} and cannot be imported.') |
|
try: |
|
imported_tmp = import_module(imp) |
|
except ImportError: |
|
if allow_failed_imports: |
|
warnings.warn(f'{imp} failed to import and is ignored.', |
|
UserWarning) |
|
imported_tmp = None |
|
else: |
|
raise ImportError(f'Failed to import {imp}') |
|
imported.append(imported_tmp) |
|
if single_import: |
|
imported = imported[0] |
|
return imported |
|
|
|
|
|
def import_module(name, package=None): |
|
"""Import a module, optionally supporting relative imports.""" |
|
return real_import_module(name, package) |
|
|
|
|
|
def is_installed(package: str) -> bool: |
|
"""Check package whether installed. |
|
|
|
Args: |
|
package (str): Name of package to be checked. |
|
""" |
|
|
|
|
|
|
|
import importlib.util |
|
import pkg_resources |
|
from pkg_resources import get_distribution |
|
|
|
|
|
|
|
importlib.reload(pkg_resources) |
|
try: |
|
get_distribution(package) |
|
return True |
|
except pkg_resources.DistributionNotFound: |
|
spec = importlib.util.find_spec(package) |
|
if spec is None: |
|
return False |
|
elif spec.origin is not None: |
|
return True |
|
else: |
|
return False |
|
|
|
|
|
def dump(obj, file=None, file_format=None, **kwargs): |
|
"""Dump data to json/yaml/pickle strings or files (mmengine-like replacement).""" |
|
if isinstance(file, Path): |
|
file = str(file) |
|
|
|
|
|
if file_format is None: |
|
if isinstance(file, str): |
|
file_format = file.split('.')[-1].lower() |
|
elif file is None: |
|
raise ValueError("file_format must be specified if file is None") |
|
|
|
if file_format not in ['json', 'yaml', 'yml', 'pkl', 'pickle']: |
|
raise TypeError(f"Unsupported file format: {file_format}") |
|
|
|
|
|
if file_format == 'yml': |
|
file_format = 'yaml' |
|
if file_format == 'pickle': |
|
file_format = 'pkl' |
|
|
|
|
|
if file is None: |
|
if file_format == 'json': |
|
return json.dumps(obj, indent=4, **kwargs) |
|
elif file_format == 'yaml': |
|
return yaml.dump(obj, **kwargs) |
|
elif file_format == 'pkl': |
|
return pickle.dumps(obj, **kwargs) |
|
|
|
|
|
mode = 'w' if file_format in ['json', 'yaml'] else 'wb' |
|
with open(file, mode, encoding='utf-8' if 'b' not in mode else None) as f: |
|
if file_format == 'json': |
|
json.dump(obj, f, indent=4, **kwargs) |
|
elif file_format == 'yaml': |
|
yaml.dump(obj, f, **kwargs) |
|
elif file_format == 'pkl': |
|
pickle.dump(obj, f, **kwargs) |
|
|
|
return True |
|
|
|
|
|
def check_file_exist(filename, msg_tmpl='file "{}" does not exist'): |
|
if not osp.isfile(filename): |
|
raise FileNotFoundError(msg_tmpl.format(filename)) |
|
|
|
|
|
def package2module(package: str): |
|
"""Infer module name from package. |
|
|
|
Args: |
|
package (str): Package to infer module name. |
|
""" |
|
pkg = get_distribution(package) |
|
if pkg.has_metadata('top_level.txt'): |
|
module_name = pkg.get_metadata('top_level.txt').split('\n')[0] |
|
return module_name |
|
else: |
|
raise ValueError( |
|
highlighted_error(f'can not infer the module name of {package}')) |
|
|
|
|
|
def get_distribution(dist): |
|
"""Return a current distribution object for a Requirement or string""" |
|
if isinstance(dist, str): |
|
dist = Requirement.parse(dist) |
|
return dist |
|
|
|
|
|
def highlighted_error(msg: Union[str, Exception]) -> str: |
|
return click.style(msg, fg='red', bold=True) |
|
|
|
|
|
class Requirement(packaging.requirements.Requirement): |
|
def __init__(self, requirement_string): |
|
"""DO NOT CALL THIS UNDOCUMENTED METHOD; use Requirement.parse()!""" |
|
super(Requirement, self).__init__(requirement_string) |
|
self.unsafe_name = self.name |
|
project_name = safe_name(self.name) |
|
self.project_name, self.key = project_name, project_name.lower() |
|
self.specs = [ |
|
(spec.operator, spec.version) for spec in self.specifier] |
|
self.extras = tuple(map(safe_extra, self.extras)) |
|
self.hashCmp = ( |
|
self.key, |
|
self.url, |
|
self.specifier, |
|
frozenset(self.extras), |
|
str(self.marker) if self.marker else None, |
|
) |
|
self.__hash = hash(self.hashCmp) |
|
|
|
def __eq__(self, other): |
|
return ( |
|
isinstance(other, Requirement) and |
|
self.hashCmp == other.hashCmp |
|
) |
|
|
|
def __ne__(self, other): |
|
return not self == other |
|
|
|
def __contains__(self, item): |
|
if item.key != self.key: |
|
return False |
|
|
|
item = item.version |
|
|
|
|
|
|
|
|
|
return self.specifier.contains(item, prereleases=True) |
|
|
|
def __hash__(self): |
|
return self.__hash |
|
|
|
def __repr__(self): |
|
return "Requirement.parse(%r)" % str(self) |
|
|
|
@staticmethod |
|
def parse(s): |
|
req, = parse_requirements(s) |
|
return req |
|
|
|
|
|
def parse_requirements(strs): |
|
"""Yield ``Requirement`` objects for each specification in `strs` |
|
|
|
`strs` must be a string, or a (possibly-nested) iterable thereof. |
|
""" |
|
|
|
lines = iter(yield_lines(strs)) |
|
|
|
for line in lines: |
|
|
|
if ' #' in line: |
|
line = line[:line.find(' #')] |
|
|
|
if line.endswith('\\'): |
|
line = line[:-2].strip() |
|
try: |
|
line += next(lines) |
|
except StopIteration: |
|
return |
|
yield Requirement(line) |
|
|
|
|
|
def yield_lines(iterable): |
|
"""Yield valid lines of a string or iterable""" |
|
return itertools.chain.from_iterable(map(yield_lines, iterable)) |
|
|
|
|
|
def safe_extra(extra): |
|
"""Convert an arbitrary string to a standard 'extra' name |
|
|
|
Any runs of non-alphanumeric characters are replaced with a single '_', |
|
and the result is always lowercased. |
|
""" |
|
return re.sub('[^A-Za-z0-9.-]+', '_', extra).lower() |
|
|
|
|
|
def safe_name(name): |
|
"""Convert an arbitrary string to a standard distribution name |
|
|
|
Any runs of non-alphanumeric/. characters are replaced with a single '-'. |
|
""" |
|
return re.sub('[^A-Za-z0-9.]+', '-', name) |