|
| 1 | +import logging |
| 2 | +import os |
| 3 | +from yaml import safe_load, MarkedYAMLError |
| 4 | +from typing import Dict, Tuple |
| 5 | +from shutil import copyfile |
| 6 | +import semver |
| 7 | +import operatorcourier.identify as identify |
| 8 | +import operatorcourier.errors as errors |
| 9 | + |
| 10 | +logger = logging.getLogger(__name__) |
| 11 | + |
| 12 | + |
| 13 | +def flatten_bundles(source_dir: str, dest_dir: str): |
| 14 | + file_paths_to_copy = get_flattened_files_info(source_dir) |
| 15 | + for (src_file_path, new_file_name) in file_paths_to_copy: |
| 16 | + copyfile(src_file_path, os.path.join(dest_dir, new_file_name)) |
| 17 | + |
| 18 | + |
| 19 | +def get_flattened_files_info(source_dir: str) -> [(str, str)]: |
| 20 | + """ |
| 21 | + :param source_dir: Path of the directory containing different versions |
| 22 | + of operator bundles (CRD, CSV, package) in separate version directories |
| 23 | + :return: A list of tuples where in each tuple, the first element is |
| 24 | + the source file path to be copied to the flattened directory, and the second is |
| 25 | + the new file name to be used in the file copy. The function returns an empty list if |
| 26 | + the source_dir is already flat |
| 27 | + """ |
| 28 | + |
| 29 | + # extract package file and version folders from source_dir |
| 30 | + _, folder_names, file_names = next(os.walk(source_dir)) |
| 31 | + if not folder_names: |
| 32 | + logger.info('The source directory is already flat.') |
| 33 | + return [] |
| 34 | + |
| 35 | + file_paths_to_copy = [] # [ (SRC_FILE_PATH, NEW_FILE_NAME) ] |
| 36 | + |
| 37 | + crd_dict = {} # { CRD_NAME => (VERSION, CRD_PATH) } |
| 38 | + csv_paths = [] |
| 39 | + |
| 40 | + for version_folder_name in folder_names: |
| 41 | + parse_version_folder(source_dir, version_folder_name, csv_paths, crd_dict) |
| 42 | + |
| 43 | + # add package in source_dir |
| 44 | + package_path = get_package_path(source_dir, file_names) |
| 45 | + file_paths_to_copy.append((package_path, os.path.basename(package_path))) |
| 46 | + |
| 47 | + # add all CRDs with the latest version |
| 48 | + for _, crd_path in crd_dict.values(): |
| 49 | + file_paths_to_copy.append((crd_path, os.path.basename(crd_path))) |
| 50 | + |
| 51 | + # add all CSVs |
| 52 | + for csv_file_name, csv_entries in create_csv_dict(csv_paths).items(): |
| 53 | + for (version, csv_path) in csv_entries: |
| 54 | + basename, ext = os.path.splitext(csv_file_name) |
| 55 | + file_paths_to_copy.append((csv_path, f'{basename}-v{version}{ext}')) |
| 56 | + |
| 57 | + return file_paths_to_copy |
| 58 | + |
| 59 | + |
| 60 | +def parse_version_folder(base_dir: str, version_folder_name: str, |
| 61 | + csv_paths: list, crd_dict: Dict[str, Tuple[str, str]]): |
| 62 | + """ |
| 63 | + Parse the version folder of the bundle and collect information of CSV and CRDs |
| 64 | + in the bundle |
| 65 | +
|
| 66 | + :param base_dir: Path of the base directory where the version folder is located |
| 67 | + :param version_folder_name: The name of the version folder containing bundle files |
| 68 | + :param csv_paths: A list of CSV file paths inside version folders |
| 69 | + :param crd_dict: dict that contains CRD info collected from different version folders, |
| 70 | + where the key is the CRD name, and the value is a tuple where the first element is |
| 71 | + the version of the bundle, and the second is the path of the CRD file |
| 72 | + """ |
| 73 | + # parse each version folder and parse CRD, CSV files |
| 74 | + try: |
| 75 | + semver.parse(version_folder_name) |
| 76 | + except ValueError: |
| 77 | + logger.warning("Ignoring %s as it is not a valid semver. " |
| 78 | + "See https://semver.org for the semver specification.", |
| 79 | + version_folder_name) |
| 80 | + return |
| 81 | + |
| 82 | + logger.info('Parsing folder: %s...', version_folder_name) |
| 83 | + |
| 84 | + contains_csv = False |
| 85 | + version_folder_path = os.path.join(base_dir, version_folder_name) |
| 86 | + |
| 87 | + for item in os.listdir(os.path.join(base_dir, version_folder_name)): |
| 88 | + item_path = os.path.join(version_folder_path, item) |
| 89 | + |
| 90 | + if not os.path.isfile(item_path): |
| 91 | + logger.warning('Ignoring %s as it is not a regular file.', item) |
| 92 | + |
| 93 | + with open(item_path, 'r') as f: |
| 94 | + file_content = f.read() |
| 95 | + |
| 96 | + yaml_type = identify.get_operator_artifact_type(file_content) |
| 97 | + |
| 98 | + if yaml_type == 'ClusterServiceVersion': |
| 99 | + contains_csv = True |
| 100 | + csv_paths.append(item_path) |
| 101 | + elif yaml_type == 'CustomResourceDefinition': |
| 102 | + try: |
| 103 | + crd_name = safe_load(file_content)['metadata']['name'] |
| 104 | + except MarkedYAMLError: |
| 105 | + msg = "Courier requires valid input YAML files" |
| 106 | + logger.error(msg) |
| 107 | + raise errors.OpCourierBadYaml(msg) |
| 108 | + except KeyError: |
| 109 | + msg = f'{item} is not a valid CRD file as "metadata.name" ' \ |
| 110 | + f'field is required' |
| 111 | + logger.error(msg) |
| 112 | + raise errors.OpCourierBadBundle(msg) |
| 113 | + # create new CRD type entry if not found in dict |
| 114 | + if crd_name not in crd_dict: |
| 115 | + crd_dict[crd_name] = (version_folder_name, item_path) |
| 116 | + # update the CRD type entry with the file with the newest version |
| 117 | + elif semver.compare(version_folder_name, crd_dict[crd_name][0]) > 0: |
| 118 | + crd_dict[crd_name] = (crd_dict[crd_name][0], item_path) |
| 119 | + |
| 120 | + if not contains_csv: |
| 121 | + msg = 'This version directory does not contain any valid CSV file.' |
| 122 | + logger.error(msg) |
| 123 | + raise errors.OpCourierBadBundle(msg) |
| 124 | + |
| 125 | + |
| 126 | +def get_package_path(base_dir: str, file_names_in_base_dir: list) -> str: |
| 127 | + packages = [] |
| 128 | + |
| 129 | + # add package file to file_paths_to_copy |
| 130 | + # only 1 package yaml file is expected in file_names |
| 131 | + for file_name in file_names_in_base_dir: |
| 132 | + file_path = os.path.join(base_dir, file_name) |
| 133 | + with open(file_path, 'r') as f: |
| 134 | + file_content = f.read() |
| 135 | + if identify.get_operator_artifact_type(file_content) != 'Package': |
| 136 | + logger.warning('Ignoring %s as it is not a valid package file.', file_name) |
| 137 | + elif not packages: |
| 138 | + packages.append(file_path) |
| 139 | + else: |
| 140 | + msg = f'The input source directory expects only 1 valid package file.' |
| 141 | + logger.error(msg) |
| 142 | + raise errors.OpCourierBadBundle(msg) |
| 143 | + |
| 144 | + return packages[0] |
| 145 | + |
| 146 | + |
| 147 | +# parse all CSVs and ensure those with same names are handled |
| 148 | +def create_csv_dict(csv_paths: list) -> dict: |
| 149 | + csv_dict = {} # { CSV_FILE_NAME => [ (v1, csv_path_1), ... , (vn, csv_path_n) ] } |
| 150 | + for csv_path in csv_paths: |
| 151 | + version_folder_path, csv_file_name = os.path.split(csv_path) |
| 152 | + version = os.path.basename(version_folder_path) |
| 153 | + val = (version, csv_path) |
| 154 | + csv_dict.setdefault(csv_file_name, []).append(val) |
| 155 | + return csv_dict |
0 commit comments