diff --git a/CHANGELOG.md b/CHANGELOG.md index 62b4bbbd..24b433a4 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,9 +5,34 @@ All notable changes to this project will be documented in this file. The format is based on [Keep a Changelog](http://keepachangelog.com/en/1.0.0/) and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.html). + ## [Unreleased] + + +## [1.15.0] - 2026-03-18 + +### Added + - Added `--tag` option to `download` command for filtering packages by tags - Added download command documentation to README with comprehensive usage examples +- Added `--filename` option to `download` command for filtering by package filename, with support for glob patterns (e.g., `--filename '*.snupkg'`) +- Added `--download-all` flag to `download` command to download all matching packages instead of erroring on multiple matches +- Multiple packages table now includes a Filename column for easier disambiguation + +## [1.14.0] - 2026-03-11 + +### Added + +- Added `vulnerabilities` command to retrieve security scan results for a package + - Summary View (Default): Displays a high-level count of vulnerabilities broken down by severity (Critical, High, Medium, Low, Unknown). + - Assessment View `--show-assessment` (`-A`): Provides a detailed breakdown where vulnerabilities are: + - Grouped by the specific affected upstream package / dependency. + - Sorted by severity (Critical first). + - Richly formatted tables. + - Filtering Capabilities: + - By Severity: `--severity` Show only specific levels (e.g., just Critical and High). + - By Status: `--fixable | --non-fixable` Filter to show only "Fixable" vulnerabilities (where a patch exists) or "Non-Fixable" ones. + - Supports `--output-format json | pretty_json` for programmatic usage ## [1.14.0] - 2026-03-13 diff --git a/README.md b/README.md index 3ace54d3..f7c42b9b 100644 --- a/README.md +++ b/README.md @@ -277,6 +277,16 @@ cloudsmith download your-account/your-repo package-name --tag latest # Combine tag with metadata filters cloudsmith download your-account/your-repo package-name --tag stable --format deb --arch arm64 +# Filter by filename (exact or glob pattern) +cloudsmith download your-account/your-repo package-name --filename '*.nupkg' +cloudsmith download your-account/your-repo package-name --filename 'mypackage-1.0.0.snupkg' + +# Download all matching packages (when multiple packages share the same name/version) +cloudsmith download your-account/your-repo package-name --download-all + +# Combine --download-all with --filename to download a subset +cloudsmith download your-account/your-repo package-name --download-all --filename '*.snupkg' + # Download all associated files (POM, sources, javadoc, etc.) cloudsmith download your-account/your-repo package-name --all-files diff --git a/cloudsmith_cli/cli/commands/download.py b/cloudsmith_cli/cli/commands/download.py index 4ee25131..54d0126a 100644 --- a/cloudsmith_cli/cli/commands/download.py +++ b/cloudsmith_cli/cli/commands/download.py @@ -1,5 +1,7 @@ """CLI/Commands - Download packages.""" +# Copyright 2025 Cloudsmith Ltd + import os import click @@ -8,6 +10,7 @@ get_download_url, get_package_detail, get_package_files, + resolve_all_packages, resolve_auth, resolve_package, stream_download, @@ -47,6 +50,16 @@ "tag_filter", help="Filter by package tag (e.g., 'latest', 'stable'). Use --format, --arch, --os for metadata filters.", ) +@click.option( + "--filename", + "filename_filter", + help="Filter by package filename (e.g., 'mypackage.nupkg'). Supports glob patterns (e.g., '*.snupkg').", +) +@click.option( + "--download-all", + is_flag=True, + help="Download all matching packages instead of erroring on multiple matches.", +) @click.option( "--outfile", type=click.Path(), @@ -74,7 +87,7 @@ help="Automatically select the best match when multiple packages are found.", ) @click.pass_context -def download( # noqa: C901 +def download( ctx, opts, owner_repo, @@ -84,6 +97,8 @@ def download( # noqa: C901 os_filter, arch_filter, tag_filter, + filename_filter, + download_all, outfile, overwrite, all_files, @@ -94,7 +109,8 @@ def download( # noqa: C901 Download a package from a Cloudsmith repository. This command downloads a package binary from a Cloudsmith repository. You can - filter packages by version, format, operating system, architecture, and tags. + filter packages by version, format, operating system, architecture, tags, and + filename. Examples: @@ -114,6 +130,15 @@ def download( # noqa: C901 # Download a package with a specific tag cloudsmith download myorg/myrepo mypackage --tag latest + \b + # Download by filename (exact or glob pattern) + cloudsmith download myorg/myrepo TestSymbolPkg --filename '*.nupkg' + cloudsmith download myorg/myrepo TestSymbolPkg --filename 'TestSymbolPkg.1.0.24406.nupkg' + + \b + # Download all matching packages (e.g., .nupkg and .snupkg with same name/version) + cloudsmith download myorg/myrepo TestSymbolPkg --version 1.0.24406 --download-all + \b # Download all associated files (POM, sources, javadoc, etc.) for a Maven/NuGet package cloudsmith download myorg/myrepo mypackage --all-files @@ -125,15 +150,14 @@ def download( # noqa: C901 For private repositories, set: export CLOUDSMITH_API_KEY=your_api_key If multiple packages match your criteria, you'll see a selection table unless - you use --yes to automatically select the best match (highest version, then newest). + you use --yes to automatically select the best match (highest version, then newest), + or --download-all to download all matches. When using --all-files, all associated files (such as POM files, sources, javadoc, SBOM, etc.) will be downloaded into a folder named {package-name}-{version} unless you specify a custom directory with --outfile. """ owner, repo = owner_repo - - # Use stderr for messages if output is JSON use_stderr = utils.should_use_stderr(opts) if not use_stderr: @@ -142,275 +166,385 @@ def download( # noqa: C901 f"{click.style(owner, bold=True)}/{click.style(repo, bold=True)} ...", ) - # Resolve authentication + # Step 1: Authenticate session, auth_headers, auth_source = resolve_auth(opts) - if opts.debug: click.echo(f"Using authentication: {auth_source}", err=True) - # Find the package - context_msg = "Failed to find package!" - with handle_api_exceptions(ctx, opts=opts, context_msg=context_msg): - with maybe_spinner(opts): - package = resolve_package( - owner=owner, - repo=repo, - name=name, - version=version, - format_filter=format_filter, - os_filter=os_filter, - arch_filter=arch_filter, - tag_filter=tag_filter, - yes=yes, - ) + # Step 2: Find package(s) + filter_kwargs = dict( + owner=owner, + repo=repo, + name=name, + version=version, + format_filter=format_filter, + os_filter=os_filter, + arch_filter=arch_filter, + tag_filter=tag_filter, + filename_filter=filename_filter, + ) + packages = _find_packages(ctx, opts, filter_kwargs, download_all, yes, use_stderr) + + # Step 3: Resolve download items (url + output path for each file) + download_items = _resolve_download_items( + ctx, opts, packages, owner, repo, all_files, outfile, use_stderr + ) + + # Step 4: Dry-run or download + if dry_run: + _display_dry_run(packages, download_items, all_files) + return - if not use_stderr: - click.secho("OK", fg="green") + _perform_downloads( + ctx, + opts, + packages, + download_items, + session, + auth_headers, + overwrite, + all_files, + use_stderr, + ) - # Get detailed package info if we need more fields for download URL or all files - package_detail = None - if all_files: - # For --all-files, we always need the detailed package info to get the files array - if not use_stderr: - click.echo("Getting package details ...", nl=False) - context_msg = "Failed to get package details!" +# --------------------------------------------------------------------------- +# Step 2: Find packages +# --------------------------------------------------------------------------- + + +def _find_packages( + ctx: click.Context, + opts, + filter_kwargs: dict, + download_all: bool, + yes: bool, + use_stderr: bool, +) -> list: + """Find matching packages using the API.""" + if download_all: + context_msg = "Failed to find packages!" with handle_api_exceptions(ctx, opts=opts, context_msg=context_msg): with maybe_spinner(opts): - package_detail = get_package_detail( - owner=owner, repo=repo, identifier=package["slug"] - ) - if not use_stderr: - click.secho("OK", fg="green") + packages = resolve_all_packages(**filter_kwargs) + else: + context_msg = "Failed to find package!" + with handle_api_exceptions(ctx, opts=opts, context_msg=context_msg): + with maybe_spinner(opts): + packages = [resolve_package(**filter_kwargs, yes=yes)] - # Get all downloadable files - files_to_download = get_package_files(package_detail) + if not use_stderr: + click.secho("OK", fg="green") - if not files_to_download: - raise click.ClickException("No downloadable files found for this package.") + return packages - # Create output directory for all files - if outfile: - # If user specified an outfile, use it as the directory - output_dir = os.path.abspath(outfile) - else: - # Create directory named: {package-name}-{version} - pkg_name = package_detail.get("name", name) - pkg_version = package_detail.get("version", "unknown") - output_dir = os.path.abspath(f"{pkg_name}-{pkg_version}") - # Create directory if it doesn't exist - if not os.path.exists(output_dir): - os.makedirs(output_dir) - elif not os.path.isdir(output_dir): - raise click.ClickException( - f"Output path '{output_dir}' exists but is not a directory." - ) +# --------------------------------------------------------------------------- +# Step 3: Resolve download items +# --------------------------------------------------------------------------- + + +def _resolve_download_items( + ctx: click.Context, + opts, + packages: list, + owner: str, + repo: str, + all_files: bool, + outfile: str, + use_stderr: bool, +) -> list: + """ + Resolve each package into a list of download items. - if dry_run: - click.echo() - click.echo("Dry run - would download:") - click.echo(f" Package: {package.get('name')} v{package.get('version')}") - click.echo(f" Format: {package.get('format')}") - click.echo(f" Files: {len(files_to_download)}") - click.echo(f" To directory: {output_dir}") - click.echo() - for file_info in files_to_download: - primary_marker = " (primary)" if file_info.get("is_primary") else "" - click.echo( - f" [{file_info.get('tag', 'file')}] {file_info['filename']}{primary_marker} - " - f"{_format_file_size(file_info.get('size', 0))}" + Returns a list of dicts, each with keys: + filename, url, output_path, tag, is_primary, size, package_name, + package_version, status + """ + items = [] + + for pkg in packages: + if all_files: + items.extend( + _resolve_all_files_items( + ctx, opts, pkg, owner, repo, outfile, use_stderr ) - return + ) + else: + items.append( + _resolve_single_file_item( + ctx, + opts, + pkg, + owner, + repo, + outfile, + len(packages) > 1, + use_stderr, + ) + ) - # Download all files - if not use_stderr: - click.echo(f"\nDownloading {len(files_to_download)} files to: {output_dir}") - click.echo() + return items - success_count = 0 - failed_files = [] - downloaded_files = [] - for idx, file_info in enumerate(files_to_download, 1): - filename = file_info["filename"] - file_url = file_info["cdn_url"] - output_path = os.path.join(output_dir, filename) +def _resolve_all_files_items( + ctx: click.Context, + opts, + pkg: dict, + owner: str, + repo: str, + outfile: str, + use_stderr: bool, +) -> list: + """Resolve all sub-files for a single package (--all-files mode).""" + pkg_name = pkg.get("name", "unknown") + pkg_version = pkg.get("version", "unknown") - primary_marker = " (primary)" if file_info.get("is_primary") else "" - tag = file_info.get("tag", "file") + if not use_stderr: + click.echo("Getting package details ...", nl=False) - if not use_stderr: - click.echo( - f"[{idx}/{len(files_to_download)}] [{tag}] {filename}{primary_marker} ...", - nl=False, - ) - else: - click.echo( - f"[{idx}/{len(files_to_download)}] [{tag}] {filename}{primary_marker} ...", - nl=False, - err=True, - ) + context_msg = f"Failed to get details for {pkg_name}!" + with handle_api_exceptions(ctx, opts=opts, context_msg=context_msg): + with maybe_spinner(opts): + detail = get_package_detail(owner=owner, repo=repo, identifier=pkg["slug"]) - try: - context_msg = f"Failed to download {filename}!" - with handle_api_exceptions(ctx, opts=opts, context_msg=context_msg): - stream_download( - url=file_url, - outfile=output_path, - session=session, - headers=auth_headers, - overwrite=overwrite, - quiet=True, # Suppress per-file progress bars for cleaner output - ) - if not use_stderr: - click.secho(" OK", fg="green") - else: - click.echo(" OK", err=True) - success_count += 1 - downloaded_files.append( - { - "filename": filename, - "path": output_path, - "tag": tag, - "is_primary": file_info.get("is_primary", False), - "size": file_info.get("size", 0), - "status": "OK", - } - ) - except Exception as e: # pylint: disable=broad-except - if not use_stderr: - click.secho(" FAILED", fg="red") - else: - click.echo(" FAILED", err=True) - failed_files.append((filename, str(e))) - downloaded_files.append( - { - "filename": filename, - "path": output_path, - "tag": tag, - "is_primary": file_info.get("is_primary", False), - "size": file_info.get("size", 0), - "status": "FAILED", - "error": str(e), - } - ) + if not use_stderr: + click.secho("OK", fg="green") - # Build JSON output for all-files download - json_output = { - "package": { - "name": package.get("name"), - "version": package.get("version"), - "format": package.get("format"), - "slug": package.get("slug"), - }, - "output_directory": output_dir, - "files": downloaded_files, - "summary": { - "total": len(files_to_download), - "success": success_count, - "failed": len(failed_files), - }, - } - - if utils.maybe_print_as_json(opts, json_output): - return + sub_files = get_package_files(detail) + if not sub_files: + raise click.ClickException("No downloadable files found for this package.") + + # Determine output directory + if outfile: + output_dir = os.path.abspath(outfile) + else: + output_dir = os.path.abspath(f"{pkg_name}-{pkg_version}") + + if not os.path.exists(output_dir): + os.makedirs(output_dir) + elif not os.path.isdir(output_dir): + raise click.ClickException( + f"Output path '{output_dir}' exists but is not a directory." + ) - click.echo() - if success_count == len(files_to_download): - click.secho( - f"All {success_count} files downloaded successfully!", - fg="green", - ) - else: - click.secho( - f"Downloaded {success_count}/{len(files_to_download)} files.", - fg="yellow", - ) - if failed_files: - click.echo("\nFailed files:") - for filename, error in failed_files: - click.echo(f" - {filename}: {error}") + items = [] + for f in sub_files: + items.append( + { + "filename": f["filename"], + "url": f["cdn_url"], + "output_path": _safe_join(output_dir, f["filename"]), + "tag": f.get("tag", "file"), + "is_primary": f.get("is_primary", False), + "size": f.get("size", 0), + "package_name": pkg_name, + "package_version": pkg_version, + "status": None, + } + ) + return items - return - # Single file download (original behavior) - download_url = get_download_url(package) +def _resolve_single_file_item( + ctx: click.Context, + opts, + pkg: dict, + owner: str, + repo: str, + outfile: str, + multi_package: bool, + use_stderr: bool, +) -> dict: + """Resolve a single primary file for a package.""" + pkg_name = pkg.get("name", "unknown") + pkg_version = pkg.get("version", "unknown") + + download_url = get_download_url(pkg) if not download_url: - # Try getting detailed package info + # Fall back to detailed package info if not use_stderr: click.echo("Getting package details ...", nl=False) context_msg = "Failed to get package details!" with handle_api_exceptions(ctx, opts=opts, context_msg=context_msg): with maybe_spinner(opts): - package_detail = get_package_detail( - owner=owner, repo=repo, identifier=package["slug"] + detail = get_package_detail( + owner=owner, repo=repo, identifier=pkg["slug"] ) - download_url = get_download_url(package_detail or package) + download_url = get_download_url(detail or pkg) if not use_stderr: click.secho("OK", fg="green") - # Determine output filename - if not outfile: - # Extract filename from URL or use package name + format - if package.get("filename"): - outfile = package["filename"] - else: - # Fallback to package name with extension based on format - pkg_format = package.get("format", "bin") - extension = _get_extension_for_format(pkg_format) - outfile = f"{package.get('name', name)}-{package.get('version', 'latest')}.{extension}" + # Determine output path + if outfile and not multi_package: + output_path = os.path.abspath(outfile) + elif multi_package: + output_dir = os.path.abspath(outfile) if outfile else os.path.abspath(".") + if not os.path.exists(output_dir): + os.makedirs(output_dir) + filename = pkg.get("filename") or f"{pkg_name}-{pkg_version}" + output_path = _safe_join(output_dir, filename) + elif pkg.get("filename"): + output_path = os.path.abspath(os.path.basename(pkg["filename"])) + else: + pkg_format = pkg.get("format", "bin") + extension = _get_extension_for_format(pkg_format) + output_path = os.path.abspath(f"{pkg_name}-{pkg_version}.{extension}") + + return { + "filename": os.path.basename(output_path), + "url": download_url, + "output_path": output_path, + "tag": "file", + "is_primary": True, + "size": pkg.get("size", 0), + "package_name": pkg_name, + "package_version": pkg_version, + "status": None, + } - # Ensure outfile is not a directory - outfile = os.path.abspath(outfile) - if dry_run: +# --------------------------------------------------------------------------- +# Step 4a: Dry-run display (shared by all paths) +# --------------------------------------------------------------------------- + + +def _display_dry_run(packages: list, download_items: list, all_files: bool) -> None: + """Display what would be downloaded without actually downloading.""" + click.echo() + click.echo( + f"Dry run - would download {len(download_items)} file(s) " + f"from {len(packages)} package(s):" + ) + click.echo() + + for item in download_items: + primary_marker = " (primary)" if item.get("is_primary") else "" + size = _format_file_size(item.get("size", 0)) + tag = item.get("tag", "file") + click.echo(f" [{tag}] {item['filename']}{primary_marker} - {size}") + click.echo(f" Package: {item['package_name']} v{item['package_version']}") + click.echo(f" To: {item['output_path']}") + + +# --------------------------------------------------------------------------- +# Step 4b: Perform downloads +# --------------------------------------------------------------------------- + + +def _perform_downloads( + ctx: click.Context, + opts, + packages: list, + download_items: list, + session, + auth_headers: dict, + overwrite: bool, + all_files: bool, + use_stderr: bool, +) -> None: + """Download all resolved items and report results.""" + total = len(download_items) + if not use_stderr: + click.echo(f"\nDownloading {total} file(s):") click.echo() - click.echo("Dry run - would download:") - click.echo(f" Package: {package.get('name')} v{package.get('version')}") - click.echo(f" Format: {package.get('format')}") - click.echo(f" Size: {_format_package_size(package)}") - click.echo(f" From: {download_url}") - click.echo(f" To: {outfile}") - click.echo(f" Overwrite: {'Yes' if overwrite else 'No'}") - return - # Perform the download - context_msg = "Failed to download package!" - with handle_api_exceptions(ctx, opts=opts, context_msg=context_msg): - stream_download( - url=download_url, - outfile=outfile, - session=session, - headers=auth_headers, - overwrite=overwrite, - quiet=utils.should_use_stderr(opts), + results = [] + + for idx, item in enumerate(download_items, 1): + filename = item["filename"] + url = item["url"] + output_path = item["output_path"] + tag = item.get("tag", "file") + primary_marker = " (primary)" if item.get("is_primary") else "" + + # Handle missing download URL as a skip, not a failure + if not url: + _echo_progress( + use_stderr, f"[{idx}/{total}] [{tag}] {filename}{primary_marker} ..." + ) + _echo_status(use_stderr, " SKIPPED", fg="yellow") + results.append({**item, "status": "SKIPPED", "error": "No download URL"}) + continue + + _echo_progress( + use_stderr, f"[{idx}/{total}] [{tag}] {filename}{primary_marker} ..." ) - # Build JSON output for single-file download + try: + context_msg = f"Failed to download {filename}!" + with handle_api_exceptions(ctx, opts=opts, context_msg=context_msg): + stream_download( + url=url, + outfile=output_path, + session=session, + headers=auth_headers, + overwrite=overwrite, + quiet=True, + ) + _echo_status(use_stderr, " OK", fg="green") + results.append({**item, "status": "OK"}) + except Exception as e: # pylint: disable=broad-except + _echo_status(use_stderr, " FAILED", fg="red") + results.append({**item, "status": "FAILED", "error": str(e)}) + + # Report results + _report_results(opts, packages, results, all_files) + + +def _echo_progress(use_stderr: bool, message: str) -> None: + """Print progress message to stdout or stderr.""" + click.echo(message, nl=False, err=use_stderr) + + +def _echo_status(use_stderr: bool, message: str, fg: str = None) -> None: + """Print styled status message to stdout or stderr.""" + if fg and not use_stderr: + click.secho(message, fg=fg) + elif use_stderr: + click.echo(message, err=True) + else: + click.echo(message) + + +def _report_results(opts, packages: list, results: list, all_files: bool) -> None: + """Build JSON output and print summary.""" + success = [r for r in results if r["status"] == "OK"] + failed = [r for r in results if r["status"] == "FAILED"] + skipped = [r for r in results if r["status"] == "SKIPPED"] + json_output = { - "package": { - "name": package.get("name"), - "version": package.get("version"), - "format": package.get("format"), - "slug": package.get("slug"), - }, - "output_directory": os.path.dirname(outfile), + "packages": [ + { + "name": p.get("name"), + "version": p.get("version"), + "format": p.get("format"), + "filename": p.get("filename"), + "slug": p.get("slug"), + } + for p in packages + ], "files": [ { - "filename": os.path.basename(outfile), - "path": outfile, - "tag": "file", - "is_primary": True, - "size": package.get("size", 0), - "status": "OK", + "filename": r["filename"], + "path": r["output_path"], + "package": r["package_name"], + "version": r["package_version"], + "tag": r.get("tag", "file"), + "is_primary": r.get("is_primary", False), + "size": r.get("size", 0), + "status": r["status"], + **({"error": r["error"]} if "error" in r else {}), } + for r in results ], "summary": { - "total": 1, - "success": 1, - "failed": 0, + "total_packages": len(packages), + "total_files": len(results), + "success": len(success), + "failed": len(failed), + "skipped": len(skipped), }, } @@ -418,7 +552,36 @@ def download( # noqa: C901 return click.echo() - click.secho("Download completed successfully!", fg="green") + if not failed and not skipped: + click.secho(f"All {len(success)} file(s) downloaded successfully!", fg="green") + else: + click.secho(f"Downloaded {len(success)}/{len(results)} file(s).", fg="yellow") + if failed: + click.echo("\nFailed files:") + for r in failed: + click.echo(f" - {r['filename']}: {r.get('error', 'Unknown error')}") + if skipped: + click.echo("\nSkipped files (no download URL):") + for r in skipped: + click.echo(f" - {r['filename']}") + + +# --------------------------------------------------------------------------- +# Utilities +# --------------------------------------------------------------------------- + + +def _safe_join(base_dir: str, filename: str) -> str: + """Safely join base_dir and filename, preventing path traversal.""" + safe_name = os.path.basename(filename) + if not safe_name: + raise click.ClickException(f"Invalid filename '{filename}'.") + result = os.path.join(base_dir, safe_name) + if not os.path.realpath(result).startswith(os.path.realpath(base_dir) + os.sep): + raise click.ClickException( + f"Filename '{filename}' resolves outside the target directory." + ) + return result def _get_extension_for_format(pkg_format: str) -> str: @@ -450,10 +613,8 @@ def _format_file_size(size: int) -> str: """Format file size in bytes to human-readable format.""" if size == 0: return "Unknown" - for unit in ["B", "KB", "MB", "GB", "TB"]: if size < 1024.0: return f"{size:.1f} {unit}" size /= 1024.0 - return f"{size:.1f} PB" diff --git a/cloudsmith_cli/cli/tests/commands/test_download.py b/cloudsmith_cli/cli/tests/commands/test_download.py index 12552030..d4429e2c 100644 --- a/cloudsmith_cli/cli/tests/commands/test_download.py +++ b/cloudsmith_cli/cli/tests/commands/test_download.py @@ -112,7 +112,7 @@ def test_download_basic_success( ) self.assertEqual(result.exit_code, 0) - self.assertIn("Download completed successfully!", result.output) + self.assertIn("downloaded successfully", result.output) # Verify calls mock_resolve_pkg.assert_called_once() @@ -149,7 +149,7 @@ def test_download_dry_run(self, mock_get_url, mock_resolve_pkg, mock_resolve_aut ) self.assertEqual(result.exit_code, 0) - self.assertIn("Dry run - would download:", result.output) + self.assertIn("Dry run - would download", result.output) self.assertIn("test-package v1.0.0", result.output) @patch("cloudsmith_cli.cli.commands.download.resolve_auth") @@ -383,6 +383,173 @@ def test_format_package_size(self): self.assertEqual(_format_package_size({"size": 1048576}), "1.0 MB") self.assertEqual(_format_package_size({}), "Unknown") + @patch("cloudsmith_cli.core.download.list_packages") + @patch("cloudsmith_cli.cli.commands.download.resolve_auth") + def test_download_with_filename_filter(self, mock_resolve_auth, mock_list_packages): + """Integration test: --filename filter disambiguates packages.""" + mock_session = Mock() + mock_resolve_auth.return_value = (mock_session, {}, "none") + + mock_packages = [ + { + "name": "TestSymbolPkg", + "version": "1.0.24406", + "format": "nuget", + "filename": "TestSymbolPkg.1.0.24406.nupkg", + "cdn_url": "https://example.com/TestSymbolPkg.nupkg", + "size": 13000, + }, + { + "name": "TestSymbolPkg", + "version": "1.0.24406", + "format": "nuget", + "filename": "TestSymbolPkg.1.0.24406.snupkg", + "cdn_url": "https://example.com/TestSymbolPkg.snupkg", + "size": 3200, + }, + ] + mock_page_info = Mock() + mock_page_info.is_valid = True + mock_page_info.page = 1 + mock_page_info.page_total = 1 + mock_list_packages.return_value = (mock_packages, mock_page_info) + + runner = CliRunner() + result = runner.invoke( + download, + [ + "--config-file", + "/dev/null", + "aflac-sandbox/test-repo", + "TestSymbolPkg", + "--filename", + "*.snupkg", + "--dry-run", + ], + ) + + self.assertEqual(result.exit_code, 0) + self.assertIn("TestSymbolPkg.1.0.24406.snupkg", result.output) + self.assertNotIn("TestSymbolPkg.1.0.24406.nupkg", result.output) + + @patch("cloudsmith_cli.core.download.list_packages") + @patch("cloudsmith_cli.cli.commands.download.resolve_auth") + def test_download_all_matching_packages( + self, mock_resolve_auth, mock_list_packages + ): + """Integration test: --download-all downloads all matching packages.""" + mock_session = Mock() + mock_resolve_auth.return_value = (mock_session, {}, "none") + + mock_packages = [ + { + "name": "TestSymbolPkg", + "version": "1.0.24406", + "format": "nuget", + "filename": "TestSymbolPkg.1.0.24406.nupkg", + "cdn_url": "https://example.com/TestSymbolPkg.nupkg", + "size": 13000, + "slug": "test-slug-1", + }, + { + "name": "TestSymbolPkg", + "version": "1.0.24406", + "format": "nuget", + "filename": "TestSymbolPkg.1.0.24406.snupkg", + "cdn_url": "https://example.com/TestSymbolPkg.snupkg", + "size": 3200, + "slug": "test-slug-2", + }, + ] + mock_page_info = Mock() + mock_page_info.is_valid = True + mock_page_info.page = 1 + mock_page_info.page_total = 1 + mock_list_packages.return_value = (mock_packages, mock_page_info) + + runner = CliRunner() + result = runner.invoke( + download, + [ + "--config-file", + "/dev/null", + "aflac-sandbox/test-repo", + "TestSymbolPkg", + "--download-all", + "--dry-run", + ], + ) + + self.assertEqual(result.exit_code, 0) + self.assertIn("2 package(s)", result.output) + self.assertIn("TestSymbolPkg.1.0.24406.nupkg", result.output) + self.assertIn("TestSymbolPkg.1.0.24406.snupkg", result.output) + + @patch("cloudsmith_cli.core.download.list_packages") + @patch("cloudsmith_cli.cli.commands.download.resolve_auth") + def test_download_all_with_filename_filter( + self, mock_resolve_auth, mock_list_packages + ): + """Integration test: --download-all + --filename filter combined.""" + mock_session = Mock() + mock_resolve_auth.return_value = (mock_session, {}, "none") + + mock_packages = [ + { + "name": "TestPkg", + "version": "1.0.0", + "format": "nuget", + "filename": "TestPkg.1.0.0.nupkg", + "cdn_url": "https://example.com/TestPkg.nupkg", + "size": 13000, + "slug": "slug-1", + }, + { + "name": "TestPkg", + "version": "1.0.0", + "format": "nuget", + "filename": "TestPkg.1.0.0.snupkg", + "cdn_url": "https://example.com/TestPkg.snupkg", + "size": 3200, + "slug": "slug-2", + }, + { + "name": "TestPkg", + "version": "2.0.0", + "format": "nuget", + "filename": "TestPkg.2.0.0.nupkg", + "cdn_url": "https://example.com/TestPkg2.nupkg", + "size": 15000, + "slug": "slug-3", + }, + ] + mock_page_info = Mock() + mock_page_info.is_valid = True + mock_page_info.page = 1 + mock_page_info.page_total = 1 + mock_list_packages.return_value = (mock_packages, mock_page_info) + + runner = CliRunner() + result = runner.invoke( + download, + [ + "--config-file", + "/dev/null", + "org/repo", + "TestPkg", + "--download-all", + "--filename", + "*.nupkg", + "--dry-run", + ], + ) + + self.assertEqual(result.exit_code, 0) + self.assertIn("2 package(s)", result.output) + self.assertIn("TestPkg.1.0.0.nupkg", result.output) + self.assertIn("TestPkg.2.0.0.nupkg", result.output) + self.assertNotIn("snupkg", result.output) + if __name__ == "__main__": unittest.main() diff --git a/cloudsmith_cli/core/download.py b/cloudsmith_cli/core/download.py index 14af4b04..3cf58c55 100644 --- a/cloudsmith_cli/core/download.py +++ b/cloudsmith_cli/core/download.py @@ -1,5 +1,6 @@ """Core download functionality for Cloudsmith packages.""" +import fnmatch import hashlib import os from typing import Dict, List, Optional, Tuple @@ -7,6 +8,8 @@ import click import cloudsmith_api import requests +from rich.console import Console +from rich.table import Table from . import keyring, ratelimits, utils from .api.exceptions import catch_raise_api_exception @@ -76,7 +79,7 @@ def _matches_tag_filter(pkg: Dict, tag_filter: str) -> bool: return False -def resolve_package( +def _search_packages( owner: str, repo: str, name: str, @@ -86,10 +89,13 @@ def resolve_package( os_filter: Optional[str] = None, arch_filter: Optional[str] = None, tag_filter: Optional[str] = None, - yes: bool = False, -) -> Dict: + filename_filter: Optional[str] = None, +) -> List[Dict]: """ - Find a single package matching the criteria, handling multiple matches. + Search for packages matching criteria, returning all matches. + + Uses server-side filtering where possible, then applies client-side + filters for fields not supported by the API query language. Args: owner: Repository owner @@ -100,13 +106,10 @@ def resolve_package( os_filter: Optional OS filter arch_filter: Optional architecture filter tag_filter: Optional tag filter - yes: If True, automatically select best match when multiple found + filename_filter: Optional filename filter (supports glob patterns) Returns: - The package dict - - Raises: - click.ClickException: If 0 packages found (exit code 2) or >1 found without --yes (exit code 3) + List of matching package dicts """ # Build search query - use server-side filtering where possible query_parts = [f"name:{name}"] @@ -114,6 +117,9 @@ def resolve_package( query_parts.append(f"version:{version}") if format_filter: query_parts.append(f"format:{format_filter}") + # Use server-side filename filtering for exact matches (no wildcards) + if filename_filter and not any(c in filename_filter for c in "*?["): + query_parts.append(f"filename:{filename_filter}") query = " AND ".join(query_parts) @@ -138,10 +144,9 @@ def resolve_package( page += 1 # Apply client-side filters for fields not supported server-side - # First, filter for exact name match (API does partial matching) filtered_packages = [] for pkg in packages: - # Exact name match (case-insensitive) + # Exact name match (case-insensitive, API does partial matching) if pkg.get("name", "").lower() != name.lower(): continue # Apply OS filter @@ -153,8 +158,112 @@ def resolve_package( # Apply tag filter if tag_filter and not _matches_tag_filter(pkg, tag_filter): continue + # Apply filename filter (glob patterns are client-side only) + if filename_filter and any(c in filename_filter for c in "*?["): + if not fnmatch.fnmatch(pkg.get("filename", ""), filename_filter): + continue filtered_packages.append(pkg) - packages = filtered_packages + + return filtered_packages + + +def resolve_all_packages( + owner: str, + repo: str, + name: str, + *, + version: Optional[str] = None, + format_filter: Optional[str] = None, + os_filter: Optional[str] = None, + arch_filter: Optional[str] = None, + tag_filter: Optional[str] = None, + filename_filter: Optional[str] = None, +) -> List[Dict]: + """ + Find all packages matching the criteria. + + Args: + owner: Repository owner + repo: Repository name + name: Package name to search for + version: Optional version filter + format_filter: Optional format filter + os_filter: Optional OS filter + arch_filter: Optional architecture filter + tag_filter: Optional tag filter + filename_filter: Optional filename filter (supports glob patterns) + + Returns: + List of matching package dicts + + Raises: + click.ClickException: If no packages found (exit code 2) + """ + packages = _search_packages( + owner=owner, + repo=repo, + name=name, + version=version, + format_filter=format_filter, + os_filter=os_filter, + arch_filter=arch_filter, + tag_filter=tag_filter, + filename_filter=filename_filter, + ) + + if not packages: + exc = click.ClickException("No packages found matching the specified criteria.") + exc.exit_code = 2 + raise exc + + return packages + + +def resolve_package( + owner: str, + repo: str, + name: str, + *, + version: Optional[str] = None, + format_filter: Optional[str] = None, + os_filter: Optional[str] = None, + arch_filter: Optional[str] = None, + tag_filter: Optional[str] = None, + filename_filter: Optional[str] = None, + yes: bool = False, +) -> Dict: + """ + Find a single package matching the criteria, handling multiple matches. + + Args: + owner: Repository owner + repo: Repository name + name: Package name to search for + version: Optional version filter + format_filter: Optional format filter + os_filter: Optional OS filter + arch_filter: Optional architecture filter + tag_filter: Optional tag filter + filename_filter: Optional filename filter (supports glob patterns) + yes: If True, automatically select best match when multiple found + + Returns: + The package dict + + Raises: + click.ClickException: If 0 packages found (exit code 2) or >1 found without --yes (exit code 3) + """ + packages = _search_packages( + owner=owner, + repo=repo, + name=name, + version=version, + format_filter=format_filter, + os_filter=os_filter, + arch_filter=arch_filter, + tag_filter=tag_filter, + filename_filter=filename_filter, + ) # Handle results if not packages: @@ -167,32 +276,11 @@ def resolve_package( # Multiple packages found if not yes: - click.echo("Multiple packages found:") - click.echo() - - # Display table of matches - headers = ["#", "Name", "Version", "Format", "Size", "Created"] - rows = [] - - for i, pkg in enumerate(packages, 1): - rows.append( - [ - str(i), - click.style(pkg.get("name", ""), fg="cyan"), - click.style(pkg.get("version", ""), fg="yellow"), - click.style(pkg.get("format", ""), fg="blue"), - click.style(_format_size(pkg.get("size", 0)), fg="green"), - click.style(_format_date(pkg.get("uploaded_at", "")), fg="white"), - ] - ) - - # Import here to avoid circular imports - from ..cli.utils import pretty_print_table - - pretty_print_table(headers, rows) - click.echo() + _display_multiple_packages(packages) exc = click.ClickException( - "Multiple packages found. Use --yes to auto-select the best match, or add more specific filters." + "Multiple packages found. Use --yes to auto-select the best match, " + "--download-all to download all matches, or add more specific filters " + "(e.g., --filename '*.nupkg')." ) exc.exit_code = 3 raise exc @@ -207,6 +295,30 @@ def resolve_package( return best_package +def _display_multiple_packages(packages: List[Dict]) -> None: + """Display a table of multiple matching packages.""" + click.echo("Multiple packages found:") + click.echo() + + table = Table(title=None, show_lines=False) + for header in ["#", "Name", "Version", "Format", "Filename", "Size", "Created"]: + table.add_column(header) + + for i, pkg in enumerate(packages, 1): + table.add_row( + str(i), + pkg.get("name", ""), + pkg.get("version", ""), + pkg.get("format", ""), + pkg.get("filename", ""), + _format_size(pkg.get("size", 0)), + _format_date(pkg.get("uploaded_at", "")), + ) + + Console().print(table) + click.echo() + + def get_download_url(package: Dict) -> str: """ Get the download URL for a package. diff --git a/cloudsmith_cli/core/tests/test_download.py b/cloudsmith_cli/core/tests/test_download.py index ccb073c6..a76c345e 100644 --- a/cloudsmith_cli/core/tests/test_download.py +++ b/cloudsmith_cli/core/tests/test_download.py @@ -107,10 +107,10 @@ def test_resolve_package_multiple_matches_yes( mock_select_best.assert_called_once_with(mock_packages) @patch("cloudsmith_cli.core.download.list_packages") - @patch("cloudsmith_cli.cli.utils.pretty_print_table") + @patch("cloudsmith_cli.core.download.Console") @patch("click.echo") def test_resolve_package_multiple_matches_no_yes( - self, mock_echo, mock_pretty_print, mock_list_packages + self, mock_echo, mock_console_cls, mock_list_packages ): """Test package resolution with multiple matches without --yes.""" mock_packages = [ @@ -127,7 +127,7 @@ def test_resolve_package_multiple_matches_no_yes( download.resolve_package("owner", "repo", "test-package", yes=False) self.assertEqual(cm.exception.exit_code, 3) - mock_pretty_print.assert_called_once() + mock_console_cls().print.assert_called_once() @patch("cloudsmith_cli.core.download.list_packages") def test_resolve_package_with_filters(self, mock_list_packages): @@ -512,5 +512,197 @@ def test_get_package_files_empty_files_array(self): self.assertEqual(files[0]["filename"], "package.rpm") +class TestFilenameFilter(unittest.TestCase): + """Test filename filtering in _search_packages and resolve functions.""" + + def _make_page_info(self): + """Create a mock page info for single-page results.""" + page_info = Mock() + page_info.is_valid = True + page_info.page = 1 + page_info.page_total = 1 + return page_info + + @patch("cloudsmith_cli.core.download.list_packages") + def test_filename_filter_exact_match(self, mock_list_packages): + """Test exact filename filtering (server-side via query).""" + mock_packages = [ + {"name": "TestPkg", "version": "1.0.0", "filename": "TestPkg.1.0.0.nupkg"}, + ] + mock_list_packages.return_value = (mock_packages, self._make_page_info()) + + result = download.resolve_package( + "owner", "repo", "TestPkg", filename_filter="TestPkg.1.0.0.nupkg" + ) + + self.assertEqual(result["filename"], "TestPkg.1.0.0.nupkg") + # Verify filename was sent in query (server-side) + call_query = mock_list_packages.call_args[1]["query"] + self.assertIn("filename:TestPkg.1.0.0.nupkg", call_query) + + @patch("cloudsmith_cli.core.download.list_packages") + def test_filename_filter_glob_pattern(self, mock_list_packages): + """Test glob pattern filename filtering (client-side via fnmatch).""" + mock_packages = [ + {"name": "TestPkg", "version": "1.0.0", "filename": "TestPkg.1.0.0.nupkg"}, + {"name": "TestPkg", "version": "1.0.0", "filename": "TestPkg.1.0.0.snupkg"}, + ] + mock_list_packages.return_value = (mock_packages, self._make_page_info()) + + # Glob *.snupkg should match only the .snupkg file + result = download.resolve_package( + "owner", "repo", "TestPkg", filename_filter="*.snupkg" + ) + + self.assertEqual(result["filename"], "TestPkg.1.0.0.snupkg") + # Glob patterns should NOT be sent server-side + call_query = mock_list_packages.call_args[1]["query"] + self.assertNotIn("filename:", call_query) + + @patch("cloudsmith_cli.core.download.list_packages") + def test_filename_filter_glob_nupkg(self, mock_list_packages): + """Test glob pattern *.nupkg filters out .snupkg.""" + mock_packages = [ + {"name": "TestPkg", "version": "1.0.0", "filename": "TestPkg.1.0.0.nupkg"}, + {"name": "TestPkg", "version": "1.0.0", "filename": "TestPkg.1.0.0.snupkg"}, + ] + mock_list_packages.return_value = (mock_packages, self._make_page_info()) + + result = download.resolve_package( + "owner", "repo", "TestPkg", filename_filter="*.nupkg" + ) + + self.assertEqual(result["filename"], "TestPkg.1.0.0.nupkg") + + @patch("cloudsmith_cli.core.download.list_packages") + def test_filename_filter_no_match(self, mock_list_packages): + """Test filename filter with no matches raises error.""" + mock_packages = [ + {"name": "TestPkg", "version": "1.0.0", "filename": "TestPkg.1.0.0.nupkg"}, + ] + mock_list_packages.return_value = (mock_packages, self._make_page_info()) + + with self.assertRaises(click.ClickException) as cm: + download.resolve_package( + "owner", "repo", "TestPkg", filename_filter="*.rpm" + ) + + self.assertEqual(cm.exception.exit_code, 2) + + +class TestResolveAllPackages(unittest.TestCase): + """Test resolve_all_packages function.""" + + def _make_page_info(self): + page_info = Mock() + page_info.is_valid = True + page_info.page = 1 + page_info.page_total = 1 + return page_info + + @patch("cloudsmith_cli.core.download.list_packages") + def test_resolve_all_returns_all_matches(self, mock_list_packages): + """Test resolve_all_packages returns all matching packages.""" + mock_packages = [ + {"name": "TestPkg", "version": "1.0.0", "filename": "TestPkg.1.0.0.nupkg"}, + {"name": "TestPkg", "version": "1.0.0", "filename": "TestPkg.1.0.0.snupkg"}, + ] + mock_list_packages.return_value = (mock_packages, self._make_page_info()) + + result = download.resolve_all_packages("owner", "repo", "TestPkg") + + self.assertEqual(len(result), 2) + filenames = [p["filename"] for p in result] + self.assertIn("TestPkg.1.0.0.nupkg", filenames) + self.assertIn("TestPkg.1.0.0.snupkg", filenames) + + @patch("cloudsmith_cli.core.download.list_packages") + def test_resolve_all_no_packages_raises(self, mock_list_packages): + """Test resolve_all_packages raises when no packages found.""" + mock_list_packages.return_value = ([], self._make_page_info()) + + with self.assertRaises(click.ClickException) as cm: + download.resolve_all_packages("owner", "repo", "nonexistent") + + self.assertEqual(cm.exception.exit_code, 2) + + @patch("cloudsmith_cli.core.download.list_packages") + def test_resolve_all_with_filename_filter(self, mock_list_packages): + """Test resolve_all_packages with filename filter.""" + mock_packages = [ + {"name": "TestPkg", "version": "1.0.0", "filename": "TestPkg.1.0.0.nupkg"}, + {"name": "TestPkg", "version": "1.0.0", "filename": "TestPkg.1.0.0.snupkg"}, + {"name": "TestPkg", "version": "2.0.0", "filename": "TestPkg.2.0.0.nupkg"}, + ] + mock_list_packages.return_value = (mock_packages, self._make_page_info()) + + result = download.resolve_all_packages( + "owner", "repo", "TestPkg", filename_filter="*.snupkg" + ) + + self.assertEqual(len(result), 1) + self.assertEqual(result[0]["filename"], "TestPkg.1.0.0.snupkg") + + @patch("cloudsmith_cli.core.download.list_packages") + def test_resolve_all_with_version_filter(self, mock_list_packages): + """Test resolve_all_packages with version filter.""" + mock_packages = [ + {"name": "TestPkg", "version": "1.0.0", "filename": "TestPkg.1.0.0.nupkg"}, + {"name": "TestPkg", "version": "1.0.0", "filename": "TestPkg.1.0.0.snupkg"}, + ] + mock_list_packages.return_value = (mock_packages, self._make_page_info()) + + result = download.resolve_all_packages( + "owner", "repo", "TestPkg", version="1.0.0" + ) + + self.assertEqual(len(result), 2) + + @patch("cloudsmith_cli.core.download.list_packages") + def test_resolve_package_error_mentions_download_all(self, mock_list_packages): + """Test multiple packages error message mentions --download-all.""" + mock_packages = [ + {"name": "TestPkg", "version": "1.0.0", "filename": "TestPkg.nupkg"}, + {"name": "TestPkg", "version": "1.0.0", "filename": "TestPkg.snupkg"}, + ] + mock_list_packages.return_value = (mock_packages, self._make_page_info()) + + with self.assertRaises(click.ClickException) as cm: + download.resolve_package("owner", "repo", "TestPkg", yes=False) + + self.assertIn("--download-all", str(cm.exception)) + self.assertIn("--filename", str(cm.exception)) + + +class TestDisplayMultiplePackages(unittest.TestCase): + """Test _display_multiple_packages function.""" + + @patch("cloudsmith_cli.core.download.Console") + @patch("cloudsmith_cli.core.download.Table") + @patch("click.echo") + def test_display_includes_filename_column( + self, mock_echo, mock_table_cls, mock_console_cls + ): + """Test that the multiple packages table includes filename.""" + packages = [ + { + "name": "TestPkg", + "version": "1.0.0", + "format": "nuget", + "filename": "TestPkg.nupkg", + "size": 1024, + "uploaded_at": "2026-01-27", + }, + ] + + download._display_multiple_packages(packages) + + # Verify that add_column was called with "Filename" + column_names = [ + call.args[0] for call in mock_table_cls().add_column.call_args_list + ] + self.assertIn("Filename", column_names) + + if __name__ == "__main__": unittest.main()