From fd47b533c00d7e3a343d3ebfc67255561a2634b9 Mon Sep 17 00:00:00 2001 From: Ranjan Singh Date: Mon, 9 Mar 2026 13:29:52 +0000 Subject: [PATCH 1/6] feat(download): add --filename and --download-all options --- CHANGELOG.md | 10 +- README.md | 11 +- cloudsmith_cli/cli/commands/download.py | 332 +++++++++++++++++- .../cli/tests/commands/test_download.py | 167 +++++++++ cloudsmith_cli/core/download.py | 188 ++++++++-- cloudsmith_cli/core/tests/test_download.py | 187 ++++++++++ 6 files changed, 842 insertions(+), 53 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 316577e9..c8ec1c9f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,9 +5,17 @@ All notable changes to this project will be documented in this file. The format is based on [Keep a Changelog](http://keepachangelog.com/en/1.0.0/) and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.html). -## [Unreleased] + +## [1.14.0] - 2026-03-09 + +### Added + - Added `--tag` option to `download` command for filtering packages by tags - Added download command documentation to README with comprehensive usage examples +- Added `--filename` option to `download` command for filtering by package filename, with support for glob patterns (e.g., `--filename '*.snupkg'`) +- Added `--download-all` flag to `download` command to download all matching packages instead of erroring on multiple matches +- Multiple packages table now includes a Filename column for easier disambiguation + ## [1.13.0] - 2026-02-16 diff --git a/README.md b/README.md index ef340cc5..baf51b2f 100644 --- a/README.md +++ b/README.md @@ -48,7 +48,6 @@ The CLI currently supports the following commands (and sub-commands): - `packages`: List packages for a repository. (Aliases `repos list`) - `repos`: List repositories for a namespace (owner). - `login`|`token`: Retrieve your API authentication token/key via login. -- `logout`: Clear stored authentication credentials and SSO tokens (Keyring, API key from credential file and emit warning when `$CLOUDSMITH_API_KEY` is still set). - `metrics`: Metrics and statistics for a repository. - `tokens`: Retrieve bandwidth usage for entitlement tokens. - `packages`: Retrieve package usage for repository. @@ -276,6 +275,16 @@ cloudsmith download your-account/your-repo package-name --tag latest # Combine tag with metadata filters cloudsmith download your-account/your-repo package-name --tag stable --format deb --arch arm64 +# Filter by filename (exact or glob pattern) +cloudsmith download your-account/your-repo package-name --filename '*.nupkg' +cloudsmith download your-account/your-repo package-name --filename 'mypackage-1.0.0.snupkg' + +# Download all matching packages (when multiple packages share the same name/version) +cloudsmith download your-account/your-repo package-name --download-all + +# Combine --download-all with --filename to download a subset +cloudsmith download your-account/your-repo package-name --download-all --filename '*.snupkg' + # Download all associated files (POM, sources, javadoc, etc.) cloudsmith download your-account/your-repo package-name --all-files diff --git a/cloudsmith_cli/cli/commands/download.py b/cloudsmith_cli/cli/commands/download.py index 4ee25131..0d8d31e6 100644 --- a/cloudsmith_cli/cli/commands/download.py +++ b/cloudsmith_cli/cli/commands/download.py @@ -8,6 +8,7 @@ get_download_url, get_package_detail, get_package_files, + resolve_all_packages, resolve_auth, resolve_package, stream_download, @@ -47,6 +48,16 @@ "tag_filter", help="Filter by package tag (e.g., 'latest', 'stable'). Use --format, --arch, --os for metadata filters.", ) +@click.option( + "--filename", + "filename_filter", + help="Filter by package filename (e.g., 'mypackage.nupkg'). Supports glob patterns (e.g., '*.snupkg').", +) +@click.option( + "--download-all", + is_flag=True, + help="Download all matching packages instead of erroring on multiple matches.", +) @click.option( "--outfile", type=click.Path(), @@ -84,6 +95,8 @@ def download( # noqa: C901 os_filter, arch_filter, tag_filter, + filename_filter, + download_all, outfile, overwrite, all_files, @@ -94,7 +107,8 @@ def download( # noqa: C901 Download a package from a Cloudsmith repository. This command downloads a package binary from a Cloudsmith repository. You can - filter packages by version, format, operating system, architecture, and tags. + filter packages by version, format, operating system, architecture, tags, and + filename. Examples: @@ -114,6 +128,15 @@ def download( # noqa: C901 # Download a package with a specific tag cloudsmith download myorg/myrepo mypackage --tag latest + \b + # Download by filename (exact or glob pattern) + cloudsmith download myorg/myrepo TestSymbolPkg --filename '*.nupkg' + cloudsmith download myorg/myrepo TestSymbolPkg --filename 'TestSymbolPkg.1.0.24406.nupkg' + + \b + # Download all matching packages (e.g., .nupkg and .snupkg with same name/version) + cloudsmith download myorg/myrepo TestSymbolPkg --version 1.0.24406 --download-all + \b # Download all associated files (POM, sources, javadoc, etc.) for a Maven/NuGet package cloudsmith download myorg/myrepo mypackage --all-files @@ -125,7 +148,8 @@ def download( # noqa: C901 For private repositories, set: export CLOUDSMITH_API_KEY=your_api_key If multiple packages match your criteria, you'll see a selection table unless - you use --yes to automatically select the best match (highest version, then newest). + you use --yes to automatically select the best match (highest version, then newest), + or --download-all to download all matches. When using --all-files, all associated files (such as POM files, sources, javadoc, SBOM, etc.) will be downloaded into a folder named {package-name}-{version} unless @@ -148,21 +172,50 @@ def download( # noqa: C901 if opts.debug: click.echo(f"Using authentication: {auth_source}", err=True) - # Find the package + # Common filter kwargs shared by resolve_package and resolve_all_packages + filter_kwargs = dict( + owner=owner, + repo=repo, + name=name, + version=version, + format_filter=format_filter, + os_filter=os_filter, + arch_filter=arch_filter, + tag_filter=tag_filter, + filename_filter=filename_filter, + ) + + # --download-all: resolve all matching packages and download each one + if download_all: + context_msg = "Failed to find packages!" + with handle_api_exceptions(ctx, opts=opts, context_msg=context_msg): + with maybe_spinner(opts): + packages = resolve_all_packages(**filter_kwargs) + + if not use_stderr: + click.secho("OK", fg="green") + + _download_all_packages( + ctx=ctx, + opts=opts, + packages=packages, + owner=owner, + repo=repo, + session=session, + auth_headers=auth_headers, + outfile=outfile, + overwrite=overwrite, + all_files=all_files, + dry_run=dry_run, + use_stderr=use_stderr, + ) + return + + # Single-package mode (default) context_msg = "Failed to find package!" with handle_api_exceptions(ctx, opts=opts, context_msg=context_msg): with maybe_spinner(opts): - package = resolve_package( - owner=owner, - repo=repo, - name=name, - version=version, - format_filter=format_filter, - os_filter=os_filter, - arch_filter=arch_filter, - tag_filter=tag_filter, - yes=yes, - ) + package = resolve_package(**filter_kwargs, yes=yes) if not use_stderr: click.secho("OK", fg="green") @@ -421,6 +474,257 @@ def download( # noqa: C901 click.secho("Download completed successfully!", fg="green") +def _download_all_packages( # noqa: C901 + *, + ctx, + opts, + packages, + owner, + repo, + session, + auth_headers, + outfile, + overwrite, + all_files, + dry_run, + use_stderr, +): + """Download all matching packages into a directory.""" + # Determine output directory + if outfile: + output_dir = os.path.abspath(outfile) + else: + # Use current directory + output_dir = os.path.abspath(".") + + if dry_run: + click.echo() + click.echo(f"Dry run - would download {len(packages)} package(s):") + click.echo(f" To directory: {output_dir}") + click.echo() + for i, pkg in enumerate(packages, 1): + filename = pkg.get("filename", "unknown") + size = _format_package_size(pkg) + click.echo( + f" {i}. {pkg.get('name')} v{pkg.get('version')} " + f"({pkg.get('format')}) - {filename} [{size}]" + ) + if all_files: + # Show sub-files if --all-files + context_msg = "Failed to get package details!" + with handle_api_exceptions(ctx, opts=opts, context_msg=context_msg): + detail = get_package_detail( + owner=owner, repo=repo, identifier=pkg["slug"] + ) + sub_files = get_package_files(detail) + for f in sub_files: + primary = " (primary)" if f.get("is_primary") else "" + click.echo( + f" [{f.get('tag', 'file')}] {f['filename']}{primary}" + ) + return + + # Create output directory if needed + if not os.path.exists(output_dir): + os.makedirs(output_dir) + elif not os.path.isdir(output_dir): + raise click.ClickException( + f"Output path '{output_dir}' exists but is not a directory." + ) + + if not use_stderr: + click.echo(f"\nDownloading {len(packages)} package(s) to: {output_dir}") + click.echo() + + all_downloaded_files = [] + total_success = 0 + total_failed = 0 + + for pkg_idx, pkg in enumerate(packages, 1): + pkg_name = pkg.get("name", "unknown") + pkg_version = pkg.get("version", "unknown") + pkg_filename = pkg.get("filename", "") + + if not use_stderr: + click.echo( + f"[{pkg_idx}/{len(packages)}] {pkg_name} v{pkg_version} " + f"({pkg_filename})" + ) + + if all_files: + # Download all sub-files for this package + context_msg = f"Failed to get details for {pkg_name}!" + with handle_api_exceptions(ctx, opts=opts, context_msg=context_msg): + detail = get_package_detail( + owner=owner, repo=repo, identifier=pkg["slug"] + ) + sub_files = get_package_files(detail) + + for file_info in sub_files: + filename = file_info["filename"] + file_url = file_info["cdn_url"] + file_path = os.path.join(output_dir, filename) + tag = file_info.get("tag", "file") + + if not use_stderr: + click.echo(f" [{tag}] {filename} ...", nl=False) + + try: + context_msg = f"Failed to download {filename}!" + with handle_api_exceptions(ctx, opts=opts, context_msg=context_msg): + stream_download( + url=file_url, + outfile=file_path, + session=session, + headers=auth_headers, + overwrite=overwrite, + quiet=True, + ) + if not use_stderr: + click.secho(" OK", fg="green") + total_success += 1 + all_downloaded_files.append( + { + "filename": filename, + "path": file_path, + "package": pkg_name, + "version": pkg_version, + "tag": tag, + "status": "OK", + } + ) + except Exception as e: # pylint: disable=broad-except + if not use_stderr: + click.secho(" FAILED", fg="red") + total_failed += 1 + all_downloaded_files.append( + { + "filename": filename, + "path": file_path, + "package": pkg_name, + "version": pkg_version, + "tag": tag, + "status": "FAILED", + "error": str(e), + } + ) + else: + # Download the primary package file + download_url = pkg.get("cdn_url") or pkg.get("download_url") + filename = pkg_filename or f"{pkg_name}-{pkg_version}" + file_path = os.path.join(output_dir, filename) + + if not download_url: + # Fall back to detailed package info + context_msg = f"Failed to get details for {pkg_name}!" + with handle_api_exceptions(ctx, opts=opts, context_msg=context_msg): + detail = get_package_detail( + owner=owner, repo=repo, identifier=pkg["slug"] + ) + download_url = ( + detail.get("cdn_url") + or detail.get("download_url") + or detail.get("file_url") + ) + + if not download_url: + if not use_stderr: + click.secho(" No download URL available - SKIPPED", fg="yellow") + total_failed += 1 + all_downloaded_files.append( + { + "filename": filename, + "path": file_path, + "package": pkg_name, + "version": pkg_version, + "status": "FAILED", + "error": "No download URL", + } + ) + continue + + if not use_stderr: + click.echo(f" Downloading {filename} ...", nl=False) + + try: + context_msg = f"Failed to download {filename}!" + with handle_api_exceptions(ctx, opts=opts, context_msg=context_msg): + stream_download( + url=download_url, + outfile=file_path, + session=session, + headers=auth_headers, + overwrite=overwrite, + quiet=True, + ) + if not use_stderr: + click.secho(" OK", fg="green") + total_success += 1 + all_downloaded_files.append( + { + "filename": filename, + "path": file_path, + "package": pkg_name, + "version": pkg_version, + "status": "OK", + } + ) + except Exception as e: # pylint: disable=broad-except + if not use_stderr: + click.secho(" FAILED", fg="red") + total_failed += 1 + all_downloaded_files.append( + { + "filename": filename, + "path": file_path, + "package": pkg_name, + "version": pkg_version, + "status": "FAILED", + "error": str(e), + } + ) + + # Build JSON output + json_output = { + "packages": [ + { + "name": p.get("name"), + "version": p.get("version"), + "format": p.get("format"), + "filename": p.get("filename"), + "slug": p.get("slug"), + } + for p in packages + ], + "output_directory": output_dir, + "files": all_downloaded_files, + "summary": { + "total_packages": len(packages), + "total_files": total_success + total_failed, + "success": total_success, + "failed": total_failed, + }, + } + + if utils.maybe_print_as_json(opts, json_output): + return + + click.echo() + total = total_success + total_failed + if total_failed == 0: + click.secho( + f"All {total_success} file(s) from {len(packages)} package(s) " + f"downloaded successfully!", + fg="green", + ) + else: + click.secho( + f"Downloaded {total_success}/{total} file(s) from " + f"{len(packages)} package(s).", + fg="yellow", + ) + + def _get_extension_for_format(pkg_format: str) -> str: """Get appropriate file extension for package format.""" format_extensions = { diff --git a/cloudsmith_cli/cli/tests/commands/test_download.py b/cloudsmith_cli/cli/tests/commands/test_download.py index 12552030..1dce948f 100644 --- a/cloudsmith_cli/cli/tests/commands/test_download.py +++ b/cloudsmith_cli/cli/tests/commands/test_download.py @@ -383,6 +383,173 @@ def test_format_package_size(self): self.assertEqual(_format_package_size({"size": 1048576}), "1.0 MB") self.assertEqual(_format_package_size({}), "Unknown") + @patch("cloudsmith_cli.core.download.list_packages") + @patch("cloudsmith_cli.cli.commands.download.resolve_auth") + def test_download_with_filename_filter(self, mock_resolve_auth, mock_list_packages): + """Integration test: --filename filter disambiguates packages.""" + mock_session = Mock() + mock_resolve_auth.return_value = (mock_session, {}, "none") + + mock_packages = [ + { + "name": "TestSymbolPkg", + "version": "1.0.24406", + "format": "nuget", + "filename": "TestSymbolPkg.1.0.24406.nupkg", + "cdn_url": "https://example.com/TestSymbolPkg.nupkg", + "size": 13000, + }, + { + "name": "TestSymbolPkg", + "version": "1.0.24406", + "format": "nuget", + "filename": "TestSymbolPkg.1.0.24406.snupkg", + "cdn_url": "https://example.com/TestSymbolPkg.snupkg", + "size": 3200, + }, + ] + mock_page_info = Mock() + mock_page_info.is_valid = True + mock_page_info.page = 1 + mock_page_info.page_total = 1 + mock_list_packages.return_value = (mock_packages, mock_page_info) + + runner = CliRunner() + result = runner.invoke( + download, + [ + "--config-file", + "/dev/null", + "aflac-sandbox/test-repo", + "TestSymbolPkg", + "--filename", + "*.snupkg", + "--dry-run", + ], + ) + + self.assertEqual(result.exit_code, 0) + self.assertIn("TestSymbolPkg.1.0.24406.snupkg", result.output) + self.assertNotIn("TestSymbolPkg.1.0.24406.nupkg", result.output) + + @patch("cloudsmith_cli.core.download.list_packages") + @patch("cloudsmith_cli.cli.commands.download.resolve_auth") + def test_download_all_matching_packages( + self, mock_resolve_auth, mock_list_packages + ): + """Integration test: --download-all downloads all matching packages.""" + mock_session = Mock() + mock_resolve_auth.return_value = (mock_session, {}, "none") + + mock_packages = [ + { + "name": "TestSymbolPkg", + "version": "1.0.24406", + "format": "nuget", + "filename": "TestSymbolPkg.1.0.24406.nupkg", + "cdn_url": "https://example.com/TestSymbolPkg.nupkg", + "size": 13000, + "slug": "test-slug-1", + }, + { + "name": "TestSymbolPkg", + "version": "1.0.24406", + "format": "nuget", + "filename": "TestSymbolPkg.1.0.24406.snupkg", + "cdn_url": "https://example.com/TestSymbolPkg.snupkg", + "size": 3200, + "slug": "test-slug-2", + }, + ] + mock_page_info = Mock() + mock_page_info.is_valid = True + mock_page_info.page = 1 + mock_page_info.page_total = 1 + mock_list_packages.return_value = (mock_packages, mock_page_info) + + runner = CliRunner() + result = runner.invoke( + download, + [ + "--config-file", + "/dev/null", + "aflac-sandbox/test-repo", + "TestSymbolPkg", + "--download-all", + "--dry-run", + ], + ) + + self.assertEqual(result.exit_code, 0) + self.assertIn("2 package(s)", result.output) + self.assertIn("TestSymbolPkg.1.0.24406.nupkg", result.output) + self.assertIn("TestSymbolPkg.1.0.24406.snupkg", result.output) + + @patch("cloudsmith_cli.core.download.list_packages") + @patch("cloudsmith_cli.cli.commands.download.resolve_auth") + def test_download_all_with_filename_filter( + self, mock_resolve_auth, mock_list_packages + ): + """Integration test: --download-all + --filename filter combined.""" + mock_session = Mock() + mock_resolve_auth.return_value = (mock_session, {}, "none") + + mock_packages = [ + { + "name": "TestPkg", + "version": "1.0.0", + "format": "nuget", + "filename": "TestPkg.1.0.0.nupkg", + "cdn_url": "https://example.com/TestPkg.nupkg", + "size": 13000, + "slug": "slug-1", + }, + { + "name": "TestPkg", + "version": "1.0.0", + "format": "nuget", + "filename": "TestPkg.1.0.0.snupkg", + "cdn_url": "https://example.com/TestPkg.snupkg", + "size": 3200, + "slug": "slug-2", + }, + { + "name": "TestPkg", + "version": "2.0.0", + "format": "nuget", + "filename": "TestPkg.2.0.0.nupkg", + "cdn_url": "https://example.com/TestPkg2.nupkg", + "size": 15000, + "slug": "slug-3", + }, + ] + mock_page_info = Mock() + mock_page_info.is_valid = True + mock_page_info.page = 1 + mock_page_info.page_total = 1 + mock_list_packages.return_value = (mock_packages, mock_page_info) + + runner = CliRunner() + result = runner.invoke( + download, + [ + "--config-file", + "/dev/null", + "org/repo", + "TestPkg", + "--download-all", + "--filename", + "*.nupkg", + "--dry-run", + ], + ) + + self.assertEqual(result.exit_code, 0) + self.assertIn("2 package(s)", result.output) + self.assertIn("TestPkg.1.0.0.nupkg", result.output) + self.assertIn("TestPkg.2.0.0.nupkg", result.output) + self.assertNotIn("snupkg", result.output) + if __name__ == "__main__": unittest.main() diff --git a/cloudsmith_cli/core/download.py b/cloudsmith_cli/core/download.py index 14af4b04..6243521d 100644 --- a/cloudsmith_cli/core/download.py +++ b/cloudsmith_cli/core/download.py @@ -1,5 +1,6 @@ """Core download functionality for Cloudsmith packages.""" +import fnmatch import hashlib import os from typing import Dict, List, Optional, Tuple @@ -76,7 +77,7 @@ def _matches_tag_filter(pkg: Dict, tag_filter: str) -> bool: return False -def resolve_package( +def _search_packages( owner: str, repo: str, name: str, @@ -86,10 +87,13 @@ def resolve_package( os_filter: Optional[str] = None, arch_filter: Optional[str] = None, tag_filter: Optional[str] = None, - yes: bool = False, -) -> Dict: + filename_filter: Optional[str] = None, +) -> List[Dict]: """ - Find a single package matching the criteria, handling multiple matches. + Search for packages matching criteria, returning all matches. + + Uses server-side filtering where possible, then applies client-side + filters for fields not supported by the API query language. Args: owner: Repository owner @@ -100,13 +104,10 @@ def resolve_package( os_filter: Optional OS filter arch_filter: Optional architecture filter tag_filter: Optional tag filter - yes: If True, automatically select best match when multiple found + filename_filter: Optional filename filter (supports glob patterns) Returns: - The package dict - - Raises: - click.ClickException: If 0 packages found (exit code 2) or >1 found without --yes (exit code 3) + List of matching package dicts """ # Build search query - use server-side filtering where possible query_parts = [f"name:{name}"] @@ -114,6 +115,9 @@ def resolve_package( query_parts.append(f"version:{version}") if format_filter: query_parts.append(f"format:{format_filter}") + # Use server-side filename filtering for exact matches (no wildcards) + if filename_filter and not any(c in filename_filter for c in "*?["): + query_parts.append(f"filename:{filename_filter}") query = " AND ".join(query_parts) @@ -138,10 +142,9 @@ def resolve_package( page += 1 # Apply client-side filters for fields not supported server-side - # First, filter for exact name match (API does partial matching) filtered_packages = [] for pkg in packages: - # Exact name match (case-insensitive) + # Exact name match (case-insensitive, API does partial matching) if pkg.get("name", "").lower() != name.lower(): continue # Apply OS filter @@ -153,8 +156,112 @@ def resolve_package( # Apply tag filter if tag_filter and not _matches_tag_filter(pkg, tag_filter): continue + # Apply filename filter (glob patterns are client-side only) + if filename_filter and any(c in filename_filter for c in "*?["): + if not fnmatch.fnmatch(pkg.get("filename", ""), filename_filter): + continue filtered_packages.append(pkg) - packages = filtered_packages + + return filtered_packages + + +def resolve_all_packages( + owner: str, + repo: str, + name: str, + *, + version: Optional[str] = None, + format_filter: Optional[str] = None, + os_filter: Optional[str] = None, + arch_filter: Optional[str] = None, + tag_filter: Optional[str] = None, + filename_filter: Optional[str] = None, +) -> List[Dict]: + """ + Find all packages matching the criteria. + + Args: + owner: Repository owner + repo: Repository name + name: Package name to search for + version: Optional version filter + format_filter: Optional format filter + os_filter: Optional OS filter + arch_filter: Optional architecture filter + tag_filter: Optional tag filter + filename_filter: Optional filename filter (supports glob patterns) + + Returns: + List of matching package dicts + + Raises: + click.ClickException: If no packages found (exit code 2) + """ + packages = _search_packages( + owner=owner, + repo=repo, + name=name, + version=version, + format_filter=format_filter, + os_filter=os_filter, + arch_filter=arch_filter, + tag_filter=tag_filter, + filename_filter=filename_filter, + ) + + if not packages: + exc = click.ClickException("No packages found matching the specified criteria.") + exc.exit_code = 2 + raise exc + + return packages + + +def resolve_package( + owner: str, + repo: str, + name: str, + *, + version: Optional[str] = None, + format_filter: Optional[str] = None, + os_filter: Optional[str] = None, + arch_filter: Optional[str] = None, + tag_filter: Optional[str] = None, + filename_filter: Optional[str] = None, + yes: bool = False, +) -> Dict: + """ + Find a single package matching the criteria, handling multiple matches. + + Args: + owner: Repository owner + repo: Repository name + name: Package name to search for + version: Optional version filter + format_filter: Optional format filter + os_filter: Optional OS filter + arch_filter: Optional architecture filter + tag_filter: Optional tag filter + filename_filter: Optional filename filter (supports glob patterns) + yes: If True, automatically select best match when multiple found + + Returns: + The package dict + + Raises: + click.ClickException: If 0 packages found (exit code 2) or >1 found without --yes (exit code 3) + """ + packages = _search_packages( + owner=owner, + repo=repo, + name=name, + version=version, + format_filter=format_filter, + os_filter=os_filter, + arch_filter=arch_filter, + tag_filter=tag_filter, + filename_filter=filename_filter, + ) # Handle results if not packages: @@ -167,32 +274,11 @@ def resolve_package( # Multiple packages found if not yes: - click.echo("Multiple packages found:") - click.echo() - - # Display table of matches - headers = ["#", "Name", "Version", "Format", "Size", "Created"] - rows = [] - - for i, pkg in enumerate(packages, 1): - rows.append( - [ - str(i), - click.style(pkg.get("name", ""), fg="cyan"), - click.style(pkg.get("version", ""), fg="yellow"), - click.style(pkg.get("format", ""), fg="blue"), - click.style(_format_size(pkg.get("size", 0)), fg="green"), - click.style(_format_date(pkg.get("uploaded_at", "")), fg="white"), - ] - ) - - # Import here to avoid circular imports - from ..cli.utils import pretty_print_table - - pretty_print_table(headers, rows) - click.echo() + _display_multiple_packages(packages) exc = click.ClickException( - "Multiple packages found. Use --yes to auto-select the best match, or add more specific filters." + "Multiple packages found. Use --yes to auto-select the best match, " + "--download-all to download all matches, or add more specific filters " + "(e.g., --filename '*.nupkg')." ) exc.exit_code = 3 raise exc @@ -207,6 +293,34 @@ def resolve_package( return best_package +def _display_multiple_packages(packages: List[Dict]) -> None: + """Display a table of multiple matching packages.""" + click.echo("Multiple packages found:") + click.echo() + + headers = ["#", "Name", "Version", "Format", "Filename", "Size", "Created"] + rows = [] + + for i, pkg in enumerate(packages, 1): + rows.append( + [ + str(i), + click.style(pkg.get("name", ""), fg="cyan"), + click.style(pkg.get("version", ""), fg="yellow"), + click.style(pkg.get("format", ""), fg="blue"), + click.style(pkg.get("filename", ""), fg="magenta"), + click.style(_format_size(pkg.get("size", 0)), fg="green"), + click.style(_format_date(pkg.get("uploaded_at", "")), fg="white"), + ] + ) + + # Import here to avoid circular imports + from ..cli.utils import pretty_print_table + + pretty_print_table(headers, rows) + click.echo() + + def get_download_url(package: Dict) -> str: """ Get the download URL for a package. diff --git a/cloudsmith_cli/core/tests/test_download.py b/cloudsmith_cli/core/tests/test_download.py index ccb073c6..9cf3df26 100644 --- a/cloudsmith_cli/core/tests/test_download.py +++ b/cloudsmith_cli/core/tests/test_download.py @@ -512,5 +512,192 @@ def test_get_package_files_empty_files_array(self): self.assertEqual(files[0]["filename"], "package.rpm") +class TestFilenameFilter(unittest.TestCase): + """Test filename filtering in _search_packages and resolve functions.""" + + def _make_page_info(self): + """Create a mock page info for single-page results.""" + page_info = Mock() + page_info.is_valid = True + page_info.page = 1 + page_info.page_total = 1 + return page_info + + @patch("cloudsmith_cli.core.download.list_packages") + def test_filename_filter_exact_match(self, mock_list_packages): + """Test exact filename filtering (server-side via query).""" + mock_packages = [ + {"name": "TestPkg", "version": "1.0.0", "filename": "TestPkg.1.0.0.nupkg"}, + ] + mock_list_packages.return_value = (mock_packages, self._make_page_info()) + + result = download.resolve_package( + "owner", "repo", "TestPkg", filename_filter="TestPkg.1.0.0.nupkg" + ) + + self.assertEqual(result["filename"], "TestPkg.1.0.0.nupkg") + # Verify filename was sent in query (server-side) + call_query = mock_list_packages.call_args[1]["query"] + self.assertIn("filename:TestPkg.1.0.0.nupkg", call_query) + + @patch("cloudsmith_cli.core.download.list_packages") + def test_filename_filter_glob_pattern(self, mock_list_packages): + """Test glob pattern filename filtering (client-side via fnmatch).""" + mock_packages = [ + {"name": "TestPkg", "version": "1.0.0", "filename": "TestPkg.1.0.0.nupkg"}, + {"name": "TestPkg", "version": "1.0.0", "filename": "TestPkg.1.0.0.snupkg"}, + ] + mock_list_packages.return_value = (mock_packages, self._make_page_info()) + + # Glob *.snupkg should match only the .snupkg file + result = download.resolve_package( + "owner", "repo", "TestPkg", filename_filter="*.snupkg" + ) + + self.assertEqual(result["filename"], "TestPkg.1.0.0.snupkg") + # Glob patterns should NOT be sent server-side + call_query = mock_list_packages.call_args[1]["query"] + self.assertNotIn("filename:", call_query) + + @patch("cloudsmith_cli.core.download.list_packages") + def test_filename_filter_glob_nupkg(self, mock_list_packages): + """Test glob pattern *.nupkg filters out .snupkg.""" + mock_packages = [ + {"name": "TestPkg", "version": "1.0.0", "filename": "TestPkg.1.0.0.nupkg"}, + {"name": "TestPkg", "version": "1.0.0", "filename": "TestPkg.1.0.0.snupkg"}, + ] + mock_list_packages.return_value = (mock_packages, self._make_page_info()) + + result = download.resolve_package( + "owner", "repo", "TestPkg", filename_filter="*.nupkg" + ) + + self.assertEqual(result["filename"], "TestPkg.1.0.0.nupkg") + + @patch("cloudsmith_cli.core.download.list_packages") + def test_filename_filter_no_match(self, mock_list_packages): + """Test filename filter with no matches raises error.""" + mock_packages = [ + {"name": "TestPkg", "version": "1.0.0", "filename": "TestPkg.1.0.0.nupkg"}, + ] + mock_list_packages.return_value = (mock_packages, self._make_page_info()) + + with self.assertRaises(click.ClickException) as cm: + download.resolve_package( + "owner", "repo", "TestPkg", filename_filter="*.rpm" + ) + + self.assertEqual(cm.exception.exit_code, 2) + + +class TestResolveAllPackages(unittest.TestCase): + """Test resolve_all_packages function.""" + + def _make_page_info(self): + page_info = Mock() + page_info.is_valid = True + page_info.page = 1 + page_info.page_total = 1 + return page_info + + @patch("cloudsmith_cli.core.download.list_packages") + def test_resolve_all_returns_all_matches(self, mock_list_packages): + """Test resolve_all_packages returns all matching packages.""" + mock_packages = [ + {"name": "TestPkg", "version": "1.0.0", "filename": "TestPkg.1.0.0.nupkg"}, + {"name": "TestPkg", "version": "1.0.0", "filename": "TestPkg.1.0.0.snupkg"}, + ] + mock_list_packages.return_value = (mock_packages, self._make_page_info()) + + result = download.resolve_all_packages("owner", "repo", "TestPkg") + + self.assertEqual(len(result), 2) + filenames = [p["filename"] for p in result] + self.assertIn("TestPkg.1.0.0.nupkg", filenames) + self.assertIn("TestPkg.1.0.0.snupkg", filenames) + + @patch("cloudsmith_cli.core.download.list_packages") + def test_resolve_all_no_packages_raises(self, mock_list_packages): + """Test resolve_all_packages raises when no packages found.""" + mock_list_packages.return_value = ([], self._make_page_info()) + + with self.assertRaises(click.ClickException) as cm: + download.resolve_all_packages("owner", "repo", "nonexistent") + + self.assertEqual(cm.exception.exit_code, 2) + + @patch("cloudsmith_cli.core.download.list_packages") + def test_resolve_all_with_filename_filter(self, mock_list_packages): + """Test resolve_all_packages with filename filter.""" + mock_packages = [ + {"name": "TestPkg", "version": "1.0.0", "filename": "TestPkg.1.0.0.nupkg"}, + {"name": "TestPkg", "version": "1.0.0", "filename": "TestPkg.1.0.0.snupkg"}, + {"name": "TestPkg", "version": "2.0.0", "filename": "TestPkg.2.0.0.nupkg"}, + ] + mock_list_packages.return_value = (mock_packages, self._make_page_info()) + + result = download.resolve_all_packages( + "owner", "repo", "TestPkg", filename_filter="*.snupkg" + ) + + self.assertEqual(len(result), 1) + self.assertEqual(result[0]["filename"], "TestPkg.1.0.0.snupkg") + + @patch("cloudsmith_cli.core.download.list_packages") + def test_resolve_all_with_version_filter(self, mock_list_packages): + """Test resolve_all_packages with version filter.""" + mock_packages = [ + {"name": "TestPkg", "version": "1.0.0", "filename": "TestPkg.1.0.0.nupkg"}, + {"name": "TestPkg", "version": "1.0.0", "filename": "TestPkg.1.0.0.snupkg"}, + ] + mock_list_packages.return_value = (mock_packages, self._make_page_info()) + + result = download.resolve_all_packages( + "owner", "repo", "TestPkg", version="1.0.0" + ) + + self.assertEqual(len(result), 2) + + @patch("cloudsmith_cli.core.download.list_packages") + def test_resolve_package_error_mentions_download_all(self, mock_list_packages): + """Test multiple packages error message mentions --download-all.""" + mock_packages = [ + {"name": "TestPkg", "version": "1.0.0", "filename": "TestPkg.nupkg"}, + {"name": "TestPkg", "version": "1.0.0", "filename": "TestPkg.snupkg"}, + ] + mock_list_packages.return_value = (mock_packages, self._make_page_info()) + + with self.assertRaises(click.ClickException) as cm: + download.resolve_package("owner", "repo", "TestPkg", yes=False) + + self.assertIn("--download-all", str(cm.exception)) + self.assertIn("--filename", str(cm.exception)) + + +class TestDisplayMultiplePackages(unittest.TestCase): + """Test _display_multiple_packages function.""" + + @patch("cloudsmith_cli.cli.utils.pretty_print_table") + @patch("click.echo") + def test_display_includes_filename_column(self, mock_echo, mock_table): + """Test that the multiple packages table includes filename.""" + packages = [ + { + "name": "TestPkg", + "version": "1.0.0", + "format": "nuget", + "filename": "TestPkg.nupkg", + "size": 1024, + "uploaded_at": "2026-01-27", + }, + ] + + download._display_multiple_packages(packages) + + mock_table.assert_called_once() + headers = mock_table.call_args[0][0] + self.assertIn("Filename", headers) + + if __name__ == "__main__": unittest.main() From 085c9595ecbe074fe23d6334e3cb57b9e7b76295 Mon Sep 17 00:00:00 2001 From: Ranjan Singh Date: Mon, 9 Mar 2026 17:12:06 +0000 Subject: [PATCH 2/6] fix: address PR review comments --- CHANGELOG.md | 3 +++ README.md | 1 + cloudsmith_cli/cli/commands/download.py | 31 +++++++++++++++++++++---- 3 files changed, 30 insertions(+), 5 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index c8ec1c9f..4a032a88 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,6 +6,9 @@ The format is based on [Keep a Changelog](http://keepachangelog.com/en/1.0.0/) and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.html). +## [Unreleased] + + ## [1.14.0] - 2026-03-09 ### Added diff --git a/README.md b/README.md index baf51b2f..52cb3687 100644 --- a/README.md +++ b/README.md @@ -48,6 +48,7 @@ The CLI currently supports the following commands (and sub-commands): - `packages`: List packages for a repository. (Aliases `repos list`) - `repos`: List repositories for a namespace (owner). - `login`|`token`: Retrieve your API authentication token/key via login. +- `logout`: Clear stored authentication credentials and SSO tokens (Keyring, API key from credential file and emit warning when `$CLOUDSMITH_API_KEY` is still set). - `metrics`: Metrics and statistics for a repository. - `tokens`: Retrieve bandwidth usage for entitlement tokens. - `packages`: Retrieve package usage for repository. diff --git a/cloudsmith_cli/cli/commands/download.py b/cloudsmith_cli/cli/commands/download.py index 0d8d31e6..28b4d8d9 100644 --- a/cloudsmith_cli/cli/commands/download.py +++ b/cloudsmith_cli/cli/commands/download.py @@ -288,7 +288,7 @@ def download( # noqa: C901 for idx, file_info in enumerate(files_to_download, 1): filename = file_info["filename"] file_url = file_info["cdn_url"] - output_path = os.path.join(output_dir, filename) + output_path = _safe_join(output_dir, filename) primary_marker = " (primary)" if file_info.get("is_primary") else "" tag = file_info.get("tag", "file") @@ -408,7 +408,7 @@ def download( # noqa: C901 if not outfile: # Extract filename from URL or use package name + format if package.get("filename"): - outfile = package["filename"] + outfile = os.path.basename(package["filename"]) else: # Fallback to package name with extension based on format pkg_format = package.get("format", "bin") @@ -552,7 +552,11 @@ def _download_all_packages( # noqa: C901 ) if all_files: - # Download all sub-files for this package + # Download all sub-files for this package into a per-package subdir + pkg_subdir = os.path.join(output_dir, f"{pkg_name}-{pkg_version}") + if not os.path.exists(pkg_subdir): + os.makedirs(pkg_subdir) + context_msg = f"Failed to get details for {pkg_name}!" with handle_api_exceptions(ctx, opts=opts, context_msg=context_msg): detail = get_package_detail( @@ -563,7 +567,7 @@ def _download_all_packages( # noqa: C901 for file_info in sub_files: filename = file_info["filename"] file_url = file_info["cdn_url"] - file_path = os.path.join(output_dir, filename) + file_path = _safe_join(pkg_subdir, filename) tag = file_info.get("tag", "file") if not use_stderr: @@ -612,7 +616,7 @@ def _download_all_packages( # noqa: C901 # Download the primary package file download_url = pkg.get("cdn_url") or pkg.get("download_url") filename = pkg_filename or f"{pkg_name}-{pkg_version}" - file_path = os.path.join(output_dir, filename) + file_path = _safe_join(output_dir, filename) if not download_url: # Fall back to detailed package info @@ -725,6 +729,23 @@ def _download_all_packages( # noqa: C901 ) +def _safe_join(base_dir, filename): + """Safely join base_dir and filename, preventing path traversal.""" + # Strip path separators and use only the basename + safe_name = os.path.basename(filename) + if not safe_name: + raise click.ClickException( + f"Invalid filename '{filename}' — cannot be empty after sanitization." + ) + result = os.path.join(base_dir, safe_name) + # Final check: resolved path must be under base_dir + if not os.path.realpath(result).startswith(os.path.realpath(base_dir) + os.sep): + raise click.ClickException( + f"Filename '{filename}' resolves outside the target directory." + ) + return result + + def _get_extension_for_format(pkg_format: str) -> str: """Get appropriate file extension for package format.""" format_extensions = { From 4c38f2d6742090a13b56519f96521c525cd07523 Mon Sep 17 00:00:00 2001 From: Ranjan Singh Date: Fri, 13 Mar 2026 11:51:05 +0000 Subject: [PATCH 3/6] refactor: restructure download command into 4-step architecture, fix SKIPPED status, and extract shared helpers --- CHANGELOG.md | 16 +- cloudsmith_cli/cli/commands/download.py | 747 +++++++----------- .../cli/tests/commands/test_download.py | 4 +- 3 files changed, 295 insertions(+), 472 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 4a032a88..f3b592a5 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,7 +9,7 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0. ## [Unreleased] -## [1.14.0] - 2026-03-09 +## [1.15.0] - 2026-03-13 ### Added @@ -19,6 +19,20 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0. - Added `--download-all` flag to `download` command to download all matching packages instead of erroring on multiple matches - Multiple packages table now includes a Filename column for easier disambiguation +## [1.14.0] - 2026-03-11 + +### Added + +- Added `vulnerabilities` command to retrieve security scan results for a package + - Summary View (Default): Displays a high-level count of vulnerabilities broken down by severity (Critical, High, Medium, Low, Unknown). + - Assessment View `--show-assessment` (`-A`): Provides a detailed breakdown where vulnerabilities are: + - Grouped by the specific affected upstream package / dependency. + - Sorted by severity (Critical first). + - Richly formatted tables. + - Filtering Capabilities: + - By Severity: `--severity` Show only specific levels (e.g., just Critical and High). + - By Status: `--fixable | --non-fixable` Filter to show only "Fixable" vulnerabilities (where a patch exists) or "Non-Fixable" ones. + - Supports `--output-format json | pretty_json` for programmatic usage ## [1.13.0] - 2026-02-16 diff --git a/cloudsmith_cli/cli/commands/download.py b/cloudsmith_cli/cli/commands/download.py index 28b4d8d9..f50c6ff7 100644 --- a/cloudsmith_cli/cli/commands/download.py +++ b/cloudsmith_cli/cli/commands/download.py @@ -1,5 +1,7 @@ """CLI/Commands - Download packages.""" +# Copyright 2025 Cloudsmith Ltd + import os import click @@ -85,7 +87,7 @@ help="Automatically select the best match when multiple packages are found.", ) @click.pass_context -def download( # noqa: C901 +def download( ctx, opts, owner_repo, @@ -156,8 +158,6 @@ def download( # noqa: C901 you specify a custom directory with --outfile. """ owner, repo = owner_repo - - # Use stderr for messages if output is JSON use_stderr = utils.should_use_stderr(opts) if not use_stderr: @@ -166,13 +166,12 @@ def download( # noqa: C901 f"{click.style(owner, bold=True)}/{click.style(repo, bold=True)} ...", ) - # Resolve authentication + # Step 1: Authenticate session, auth_headers, auth_source = resolve_auth(opts) - if opts.debug: click.echo(f"Using authentication: {auth_source}", err=True) - # Common filter kwargs shared by resolve_package and resolve_all_packages + # Step 2: Find package(s) filter_kwargs = dict( owner=owner, repo=repo, @@ -184,511 +183,308 @@ def download( # noqa: C901 tag_filter=tag_filter, filename_filter=filename_filter, ) + packages = _find_packages(ctx, opts, filter_kwargs, download_all, yes, use_stderr) + + # Step 3: Resolve download items (url + output path for each file) + download_items = _resolve_download_items( + ctx, opts, packages, owner, repo, all_files, outfile, use_stderr + ) + + # Step 4: Dry-run or download + if dry_run: + _display_dry_run(packages, download_items, all_files) + return + + _perform_downloads( + ctx, + opts, + packages, + download_items, + session, + auth_headers, + overwrite, + all_files, + use_stderr, + ) - # --download-all: resolve all matching packages and download each one + +# --------------------------------------------------------------------------- +# Step 2: Find packages +# --------------------------------------------------------------------------- + + +def _find_packages(ctx, opts, filter_kwargs, download_all, yes, use_stderr): + """Find matching packages using the API.""" if download_all: context_msg = "Failed to find packages!" with handle_api_exceptions(ctx, opts=opts, context_msg=context_msg): with maybe_spinner(opts): packages = resolve_all_packages(**filter_kwargs) - - if not use_stderr: - click.secho("OK", fg="green") - - _download_all_packages( - ctx=ctx, - opts=opts, - packages=packages, - owner=owner, - repo=repo, - session=session, - auth_headers=auth_headers, - outfile=outfile, - overwrite=overwrite, - all_files=all_files, - dry_run=dry_run, - use_stderr=use_stderr, - ) - return - - # Single-package mode (default) - context_msg = "Failed to find package!" - with handle_api_exceptions(ctx, opts=opts, context_msg=context_msg): - with maybe_spinner(opts): - package = resolve_package(**filter_kwargs, yes=yes) + else: + context_msg = "Failed to find package!" + with handle_api_exceptions(ctx, opts=opts, context_msg=context_msg): + with maybe_spinner(opts): + packages = [resolve_package(**filter_kwargs, yes=yes)] if not use_stderr: click.secho("OK", fg="green") - # Get detailed package info if we need more fields for download URL or all files - package_detail = None + return packages - if all_files: - # For --all-files, we always need the detailed package info to get the files array - if not use_stderr: - click.echo("Getting package details ...", nl=False) - context_msg = "Failed to get package details!" - with handle_api_exceptions(ctx, opts=opts, context_msg=context_msg): - with maybe_spinner(opts): - package_detail = get_package_detail( - owner=owner, repo=repo, identifier=package["slug"] - ) - if not use_stderr: - click.secho("OK", fg="green") - # Get all downloadable files - files_to_download = get_package_files(package_detail) +# --------------------------------------------------------------------------- +# Step 3: Resolve download items +# --------------------------------------------------------------------------- - if not files_to_download: - raise click.ClickException("No downloadable files found for this package.") - # Create output directory for all files - if outfile: - # If user specified an outfile, use it as the directory - output_dir = os.path.abspath(outfile) - else: - # Create directory named: {package-name}-{version} - pkg_name = package_detail.get("name", name) - pkg_version = package_detail.get("version", "unknown") - output_dir = os.path.abspath(f"{pkg_name}-{pkg_version}") +def _resolve_download_items( + ctx, opts, packages, owner, repo, all_files, outfile, use_stderr +): + """ + Resolve each package into a list of download items. - # Create directory if it doesn't exist - if not os.path.exists(output_dir): - os.makedirs(output_dir) - elif not os.path.isdir(output_dir): - raise click.ClickException( - f"Output path '{output_dir}' exists but is not a directory." - ) + Returns a list of dicts, each with keys: + filename, url, output_path, tag, is_primary, size, package_name, + package_version, status + """ + items = [] - if dry_run: - click.echo() - click.echo("Dry run - would download:") - click.echo(f" Package: {package.get('name')} v{package.get('version')}") - click.echo(f" Format: {package.get('format')}") - click.echo(f" Files: {len(files_to_download)}") - click.echo(f" To directory: {output_dir}") - click.echo() - for file_info in files_to_download: - primary_marker = " (primary)" if file_info.get("is_primary") else "" - click.echo( - f" [{file_info.get('tag', 'file')}] {file_info['filename']}{primary_marker} - " - f"{_format_file_size(file_info.get('size', 0))}" + for pkg in packages: + if all_files: + items.extend( + _resolve_all_files_items( + ctx, opts, pkg, owner, repo, outfile, use_stderr + ) + ) + else: + items.append( + _resolve_single_file_item( + ctx, + opts, + pkg, + owner, + repo, + outfile, + len(packages) > 1, + use_stderr, ) - return + ) - # Download all files - if not use_stderr: - click.echo(f"\nDownloading {len(files_to_download)} files to: {output_dir}") - click.echo() + return items - success_count = 0 - failed_files = [] - downloaded_files = [] - for idx, file_info in enumerate(files_to_download, 1): - filename = file_info["filename"] - file_url = file_info["cdn_url"] - output_path = _safe_join(output_dir, filename) +def _resolve_all_files_items(ctx, opts, pkg, owner, repo, outfile, use_stderr): + """Resolve all sub-files for a single package (--all-files mode).""" + pkg_name = pkg.get("name", "unknown") + pkg_version = pkg.get("version", "unknown") - primary_marker = " (primary)" if file_info.get("is_primary") else "" - tag = file_info.get("tag", "file") + if not use_stderr: + click.echo("Getting package details ...", nl=False) - if not use_stderr: - click.echo( - f"[{idx}/{len(files_to_download)}] [{tag}] {filename}{primary_marker} ...", - nl=False, - ) - else: - click.echo( - f"[{idx}/{len(files_to_download)}] [{tag}] {filename}{primary_marker} ...", - nl=False, - err=True, - ) + context_msg = f"Failed to get details for {pkg_name}!" + with handle_api_exceptions(ctx, opts=opts, context_msg=context_msg): + with maybe_spinner(opts): + detail = get_package_detail(owner=owner, repo=repo, identifier=pkg["slug"]) - try: - context_msg = f"Failed to download {filename}!" - with handle_api_exceptions(ctx, opts=opts, context_msg=context_msg): - stream_download( - url=file_url, - outfile=output_path, - session=session, - headers=auth_headers, - overwrite=overwrite, - quiet=True, # Suppress per-file progress bars for cleaner output - ) - if not use_stderr: - click.secho(" OK", fg="green") - else: - click.echo(" OK", err=True) - success_count += 1 - downloaded_files.append( - { - "filename": filename, - "path": output_path, - "tag": tag, - "is_primary": file_info.get("is_primary", False), - "size": file_info.get("size", 0), - "status": "OK", - } - ) - except Exception as e: # pylint: disable=broad-except - if not use_stderr: - click.secho(" FAILED", fg="red") - else: - click.echo(" FAILED", err=True) - failed_files.append((filename, str(e))) - downloaded_files.append( - { - "filename": filename, - "path": output_path, - "tag": tag, - "is_primary": file_info.get("is_primary", False), - "size": file_info.get("size", 0), - "status": "FAILED", - "error": str(e), - } - ) + if not use_stderr: + click.secho("OK", fg="green") - # Build JSON output for all-files download - json_output = { - "package": { - "name": package.get("name"), - "version": package.get("version"), - "format": package.get("format"), - "slug": package.get("slug"), - }, - "output_directory": output_dir, - "files": downloaded_files, - "summary": { - "total": len(files_to_download), - "success": success_count, - "failed": len(failed_files), - }, - } - - if utils.maybe_print_as_json(opts, json_output): - return + sub_files = get_package_files(detail) + if not sub_files: + raise click.ClickException("No downloadable files found for this package.") - click.echo() - if success_count == len(files_to_download): - click.secho( - f"All {success_count} files downloaded successfully!", - fg="green", - ) - else: - click.secho( - f"Downloaded {success_count}/{len(files_to_download)} files.", - fg="yellow", - ) - if failed_files: - click.echo("\nFailed files:") - for filename, error in failed_files: - click.echo(f" - {filename}: {error}") + # Determine output directory + if outfile: + output_dir = os.path.abspath(outfile) + else: + output_dir = os.path.abspath(f"{pkg_name}-{pkg_version}") - return + if not os.path.exists(output_dir): + os.makedirs(output_dir) + elif not os.path.isdir(output_dir): + raise click.ClickException( + f"Output path '{output_dir}' exists but is not a directory." + ) + + items = [] + for f in sub_files: + items.append( + { + "filename": f["filename"], + "url": f["cdn_url"], + "output_path": _safe_join(output_dir, f["filename"]), + "tag": f.get("tag", "file"), + "is_primary": f.get("is_primary", False), + "size": f.get("size", 0), + "package_name": pkg_name, + "package_version": pkg_version, + "status": None, + } + ) + return items + + +def _resolve_single_file_item( + ctx, opts, pkg, owner, repo, outfile, multi_package, use_stderr +): + """Resolve a single primary file for a package.""" + pkg_name = pkg.get("name", "unknown") + pkg_version = pkg.get("version", "unknown") - # Single file download (original behavior) - download_url = get_download_url(package) + download_url = get_download_url(pkg) if not download_url: - # Try getting detailed package info + # Fall back to detailed package info if not use_stderr: click.echo("Getting package details ...", nl=False) context_msg = "Failed to get package details!" with handle_api_exceptions(ctx, opts=opts, context_msg=context_msg): with maybe_spinner(opts): - package_detail = get_package_detail( - owner=owner, repo=repo, identifier=package["slug"] + detail = get_package_detail( + owner=owner, repo=repo, identifier=pkg["slug"] ) - download_url = get_download_url(package_detail or package) + download_url = get_download_url(detail or pkg) if not use_stderr: click.secho("OK", fg="green") - # Determine output filename - if not outfile: - # Extract filename from URL or use package name + format - if package.get("filename"): - outfile = os.path.basename(package["filename"]) - else: - # Fallback to package name with extension based on format - pkg_format = package.get("format", "bin") - extension = _get_extension_for_format(pkg_format) - outfile = f"{package.get('name', name)}-{package.get('version', 'latest')}.{extension}" + # Determine output path + if outfile and not multi_package: + output_path = os.path.abspath(outfile) + elif multi_package: + output_dir = os.path.abspath(outfile) if outfile else os.path.abspath(".") + if not os.path.exists(output_dir): + os.makedirs(output_dir) + filename = pkg.get("filename") or f"{pkg_name}-{pkg_version}" + output_path = _safe_join(output_dir, filename) + elif pkg.get("filename"): + output_path = os.path.abspath(os.path.basename(pkg["filename"])) + else: + pkg_format = pkg.get("format", "bin") + extension = _get_extension_for_format(pkg_format) + output_path = os.path.abspath(f"{pkg_name}-{pkg_version}.{extension}") + + return { + "filename": os.path.basename(output_path), + "url": download_url, + "output_path": output_path, + "tag": "file", + "is_primary": True, + "size": pkg.get("size", 0), + "package_name": pkg_name, + "package_version": pkg_version, + "status": None, + } - # Ensure outfile is not a directory - outfile = os.path.abspath(outfile) - if dry_run: - click.echo() - click.echo("Dry run - would download:") - click.echo(f" Package: {package.get('name')} v{package.get('version')}") - click.echo(f" Format: {package.get('format')}") - click.echo(f" Size: {_format_package_size(package)}") - click.echo(f" From: {download_url}") - click.echo(f" To: {outfile}") - click.echo(f" Overwrite: {'Yes' if overwrite else 'No'}") - return +# --------------------------------------------------------------------------- +# Step 4a: Dry-run display (shared by all paths) +# --------------------------------------------------------------------------- - # Perform the download - context_msg = "Failed to download package!" - with handle_api_exceptions(ctx, opts=opts, context_msg=context_msg): - stream_download( - url=download_url, - outfile=outfile, - session=session, - headers=auth_headers, - overwrite=overwrite, - quiet=utils.should_use_stderr(opts), - ) - # Build JSON output for single-file download - json_output = { - "package": { - "name": package.get("name"), - "version": package.get("version"), - "format": package.get("format"), - "slug": package.get("slug"), - }, - "output_directory": os.path.dirname(outfile), - "files": [ - { - "filename": os.path.basename(outfile), - "path": outfile, - "tag": "file", - "is_primary": True, - "size": package.get("size", 0), - "status": "OK", - } - ], - "summary": { - "total": 1, - "success": 1, - "failed": 0, - }, - } +def _display_dry_run(packages, download_items, all_files): + """Display what would be downloaded without actually downloading.""" + click.echo() + click.echo( + f"Dry run - would download {len(download_items)} file(s) " + f"from {len(packages)} package(s):" + ) + click.echo() - if utils.maybe_print_as_json(opts, json_output): - return + for item in download_items: + primary_marker = " (primary)" if item.get("is_primary") else "" + size = _format_file_size(item.get("size", 0)) + tag = item.get("tag", "file") + click.echo(f" [{tag}] {item['filename']}{primary_marker} - {size}") + click.echo(f" Package: {item['package_name']} v{item['package_version']}") + click.echo(f" To: {item['output_path']}") - click.echo() - click.secho("Download completed successfully!", fg="green") + +# --------------------------------------------------------------------------- +# Step 4b: Perform downloads +# --------------------------------------------------------------------------- -def _download_all_packages( # noqa: C901 - *, +def _perform_downloads( ctx, opts, packages, - owner, - repo, + download_items, session, auth_headers, - outfile, overwrite, all_files, - dry_run, use_stderr, ): - """Download all matching packages into a directory.""" - # Determine output directory - if outfile: - output_dir = os.path.abspath(outfile) - else: - # Use current directory - output_dir = os.path.abspath(".") - - if dry_run: - click.echo() - click.echo(f"Dry run - would download {len(packages)} package(s):") - click.echo(f" To directory: {output_dir}") - click.echo() - for i, pkg in enumerate(packages, 1): - filename = pkg.get("filename", "unknown") - size = _format_package_size(pkg) - click.echo( - f" {i}. {pkg.get('name')} v{pkg.get('version')} " - f"({pkg.get('format')}) - {filename} [{size}]" - ) - if all_files: - # Show sub-files if --all-files - context_msg = "Failed to get package details!" - with handle_api_exceptions(ctx, opts=opts, context_msg=context_msg): - detail = get_package_detail( - owner=owner, repo=repo, identifier=pkg["slug"] - ) - sub_files = get_package_files(detail) - for f in sub_files: - primary = " (primary)" if f.get("is_primary") else "" - click.echo( - f" [{f.get('tag', 'file')}] {f['filename']}{primary}" - ) - return - - # Create output directory if needed - if not os.path.exists(output_dir): - os.makedirs(output_dir) - elif not os.path.isdir(output_dir): - raise click.ClickException( - f"Output path '{output_dir}' exists but is not a directory." - ) - + """Download all resolved items and report results.""" + total = len(download_items) if not use_stderr: - click.echo(f"\nDownloading {len(packages)} package(s) to: {output_dir}") + click.echo(f"\nDownloading {total} file(s):") click.echo() - all_downloaded_files = [] - total_success = 0 - total_failed = 0 + results = [] - for pkg_idx, pkg in enumerate(packages, 1): - pkg_name = pkg.get("name", "unknown") - pkg_version = pkg.get("version", "unknown") - pkg_filename = pkg.get("filename", "") + for idx, item in enumerate(download_items, 1): + filename = item["filename"] + url = item["url"] + output_path = item["output_path"] + tag = item.get("tag", "file") + primary_marker = " (primary)" if item.get("is_primary") else "" - if not use_stderr: - click.echo( - f"[{pkg_idx}/{len(packages)}] {pkg_name} v{pkg_version} " - f"({pkg_filename})" + # Handle missing download URL as a skip, not a failure + if not url: + _echo_progress( + use_stderr, f"[{idx}/{total}] [{tag}] {filename}{primary_marker} ..." ) + _echo_status(use_stderr, " SKIPPED", fg="yellow") + results.append({**item, "status": "SKIPPED", "error": "No download URL"}) + continue - if all_files: - # Download all sub-files for this package into a per-package subdir - pkg_subdir = os.path.join(output_dir, f"{pkg_name}-{pkg_version}") - if not os.path.exists(pkg_subdir): - os.makedirs(pkg_subdir) + _echo_progress( + use_stderr, f"[{idx}/{total}] [{tag}] {filename}{primary_marker} ..." + ) - context_msg = f"Failed to get details for {pkg_name}!" + try: + context_msg = f"Failed to download {filename}!" with handle_api_exceptions(ctx, opts=opts, context_msg=context_msg): - detail = get_package_detail( - owner=owner, repo=repo, identifier=pkg["slug"] - ) - sub_files = get_package_files(detail) - - for file_info in sub_files: - filename = file_info["filename"] - file_url = file_info["cdn_url"] - file_path = _safe_join(pkg_subdir, filename) - tag = file_info.get("tag", "file") - - if not use_stderr: - click.echo(f" [{tag}] {filename} ...", nl=False) - - try: - context_msg = f"Failed to download {filename}!" - with handle_api_exceptions(ctx, opts=opts, context_msg=context_msg): - stream_download( - url=file_url, - outfile=file_path, - session=session, - headers=auth_headers, - overwrite=overwrite, - quiet=True, - ) - if not use_stderr: - click.secho(" OK", fg="green") - total_success += 1 - all_downloaded_files.append( - { - "filename": filename, - "path": file_path, - "package": pkg_name, - "version": pkg_version, - "tag": tag, - "status": "OK", - } - ) - except Exception as e: # pylint: disable=broad-except - if not use_stderr: - click.secho(" FAILED", fg="red") - total_failed += 1 - all_downloaded_files.append( - { - "filename": filename, - "path": file_path, - "package": pkg_name, - "version": pkg_version, - "tag": tag, - "status": "FAILED", - "error": str(e), - } - ) - else: - # Download the primary package file - download_url = pkg.get("cdn_url") or pkg.get("download_url") - filename = pkg_filename or f"{pkg_name}-{pkg_version}" - file_path = _safe_join(output_dir, filename) - - if not download_url: - # Fall back to detailed package info - context_msg = f"Failed to get details for {pkg_name}!" - with handle_api_exceptions(ctx, opts=opts, context_msg=context_msg): - detail = get_package_detail( - owner=owner, repo=repo, identifier=pkg["slug"] - ) - download_url = ( - detail.get("cdn_url") - or detail.get("download_url") - or detail.get("file_url") - ) - - if not download_url: - if not use_stderr: - click.secho(" No download URL available - SKIPPED", fg="yellow") - total_failed += 1 - all_downloaded_files.append( - { - "filename": filename, - "path": file_path, - "package": pkg_name, - "version": pkg_version, - "status": "FAILED", - "error": "No download URL", - } - ) - continue - - if not use_stderr: - click.echo(f" Downloading {filename} ...", nl=False) - - try: - context_msg = f"Failed to download {filename}!" - with handle_api_exceptions(ctx, opts=opts, context_msg=context_msg): - stream_download( - url=download_url, - outfile=file_path, - session=session, - headers=auth_headers, - overwrite=overwrite, - quiet=True, - ) - if not use_stderr: - click.secho(" OK", fg="green") - total_success += 1 - all_downloaded_files.append( - { - "filename": filename, - "path": file_path, - "package": pkg_name, - "version": pkg_version, - "status": "OK", - } - ) - except Exception as e: # pylint: disable=broad-except - if not use_stderr: - click.secho(" FAILED", fg="red") - total_failed += 1 - all_downloaded_files.append( - { - "filename": filename, - "path": file_path, - "package": pkg_name, - "version": pkg_version, - "status": "FAILED", - "error": str(e), - } + stream_download( + url=url, + outfile=output_path, + session=session, + headers=auth_headers, + overwrite=overwrite, + quiet=True, ) + _echo_status(use_stderr, " OK", fg="green") + results.append({**item, "status": "OK"}) + except Exception as e: # pylint: disable=broad-except + _echo_status(use_stderr, " FAILED", fg="red") + results.append({**item, "status": "FAILED", "error": str(e)}) + + # Report results + _report_results(opts, packages, results, all_files) + + +def _echo_progress(use_stderr, message): + """Print progress message to stdout or stderr.""" + click.echo(message, nl=False, err=use_stderr) + + +def _echo_status(use_stderr, message, fg=None): + """Print styled status message to stdout or stderr.""" + if fg and not use_stderr: + click.secho(message, fg=fg) + elif use_stderr: + click.echo(message, err=True) + else: + click.echo(message) + + +def _report_results(opts, packages, results, all_files): + """Build JSON output and print summary.""" + success = [r for r in results if r["status"] == "OK"] + failed = [r for r in results if r["status"] == "FAILED"] + skipped = [r for r in results if r["status"] == "SKIPPED"] - # Build JSON output json_output = { "packages": [ { @@ -700,13 +496,26 @@ def _download_all_packages( # noqa: C901 } for p in packages ], - "output_directory": output_dir, - "files": all_downloaded_files, + "files": [ + { + "filename": r["filename"], + "path": r["output_path"], + "package": r["package_name"], + "version": r["package_version"], + "tag": r.get("tag", "file"), + "is_primary": r.get("is_primary", False), + "size": r.get("size", 0), + "status": r["status"], + **({"error": r["error"]} if "error" in r else {}), + } + for r in results + ], "summary": { "total_packages": len(packages), - "total_files": total_success + total_failed, - "success": total_success, - "failed": total_failed, + "total_files": len(results), + "success": len(success), + "failed": len(failed), + "skipped": len(skipped), }, } @@ -714,31 +523,33 @@ def _download_all_packages( # noqa: C901 return click.echo() - total = total_success + total_failed - if total_failed == 0: - click.secho( - f"All {total_success} file(s) from {len(packages)} package(s) " - f"downloaded successfully!", - fg="green", - ) + if not failed and not skipped: + click.secho(f"All {len(success)} file(s) downloaded successfully!", fg="green") else: - click.secho( - f"Downloaded {total_success}/{total} file(s) from " - f"{len(packages)} package(s).", - fg="yellow", - ) + click.secho(f"Downloaded {len(success)}/{len(results)} file(s).", fg="yellow") + if failed: + click.echo("\nFailed files:") + for r in failed: + click.echo(f" - {r['filename']}: {r.get('error', 'Unknown error')}") + if skipped: + click.echo("\nSkipped files (no download URL):") + for r in skipped: + click.echo(f" - {r['filename']}") + + +# --------------------------------------------------------------------------- +# Utilities +# --------------------------------------------------------------------------- def _safe_join(base_dir, filename): """Safely join base_dir and filename, preventing path traversal.""" - # Strip path separators and use only the basename safe_name = os.path.basename(filename) if not safe_name: raise click.ClickException( f"Invalid filename '{filename}' — cannot be empty after sanitization." ) result = os.path.join(base_dir, safe_name) - # Final check: resolved path must be under base_dir if not os.path.realpath(result).startswith(os.path.realpath(base_dir) + os.sep): raise click.ClickException( f"Filename '{filename}' resolves outside the target directory." @@ -746,7 +557,7 @@ def _safe_join(base_dir, filename): return result -def _get_extension_for_format(pkg_format: str) -> str: +def _get_extension_for_format(pkg_format): """Get appropriate file extension for package format.""" format_extensions = { "deb": "deb", @@ -765,20 +576,18 @@ def _get_extension_for_format(pkg_format: str) -> str: return format_extensions.get(pkg_format.lower(), "bin") -def _format_package_size(package: dict) -> str: +def _format_package_size(package): """Format package size for display.""" size = package.get("size", 0) return _format_file_size(size) -def _format_file_size(size: int) -> str: +def _format_file_size(size): """Format file size in bytes to human-readable format.""" if size == 0: return "Unknown" - for unit in ["B", "KB", "MB", "GB", "TB"]: if size < 1024.0: return f"{size:.1f} {unit}" size /= 1024.0 - return f"{size:.1f} PB" diff --git a/cloudsmith_cli/cli/tests/commands/test_download.py b/cloudsmith_cli/cli/tests/commands/test_download.py index 1dce948f..d4429e2c 100644 --- a/cloudsmith_cli/cli/tests/commands/test_download.py +++ b/cloudsmith_cli/cli/tests/commands/test_download.py @@ -112,7 +112,7 @@ def test_download_basic_success( ) self.assertEqual(result.exit_code, 0) - self.assertIn("Download completed successfully!", result.output) + self.assertIn("downloaded successfully", result.output) # Verify calls mock_resolve_pkg.assert_called_once() @@ -149,7 +149,7 @@ def test_download_dry_run(self, mock_get_url, mock_resolve_pkg, mock_resolve_aut ) self.assertEqual(result.exit_code, 0) - self.assertIn("Dry run - would download:", result.output) + self.assertIn("Dry run - would download", result.output) self.assertIn("test-package v1.0.0", result.output) @patch("cloudsmith_cli.cli.commands.download.resolve_auth") From 3154052c91f4aefd6c79779f06fee9c155b738f3 Mon Sep 17 00:00:00 2001 From: Ranjan Singh Date: Wed, 18 Mar 2026 13:35:11 +0000 Subject: [PATCH 4/6] replace the the pretty_print_table call with rich.table.Table --- cloudsmith_cli/cli/commands/download.py | 77 +++++++++++++++++-------- cloudsmith_cli/core/download.py | 31 +++++----- 2 files changed, 67 insertions(+), 41 deletions(-) diff --git a/cloudsmith_cli/cli/commands/download.py b/cloudsmith_cli/cli/commands/download.py index f50c6ff7..54d0126a 100644 --- a/cloudsmith_cli/cli/commands/download.py +++ b/cloudsmith_cli/cli/commands/download.py @@ -213,7 +213,14 @@ def download( # --------------------------------------------------------------------------- -def _find_packages(ctx, opts, filter_kwargs, download_all, yes, use_stderr): +def _find_packages( + ctx: click.Context, + opts, + filter_kwargs: dict, + download_all: bool, + yes: bool, + use_stderr: bool, +) -> list: """Find matching packages using the API.""" if download_all: context_msg = "Failed to find packages!" @@ -238,8 +245,15 @@ def _find_packages(ctx, opts, filter_kwargs, download_all, yes, use_stderr): def _resolve_download_items( - ctx, opts, packages, owner, repo, all_files, outfile, use_stderr -): + ctx: click.Context, + opts, + packages: list, + owner: str, + repo: str, + all_files: bool, + outfile: str, + use_stderr: bool, +) -> list: """ Resolve each package into a list of download items. @@ -273,7 +287,15 @@ def _resolve_download_items( return items -def _resolve_all_files_items(ctx, opts, pkg, owner, repo, outfile, use_stderr): +def _resolve_all_files_items( + ctx: click.Context, + opts, + pkg: dict, + owner: str, + repo: str, + outfile: str, + use_stderr: bool, +) -> list: """Resolve all sub-files for a single package (--all-files mode).""" pkg_name = pkg.get("name", "unknown") pkg_version = pkg.get("version", "unknown") @@ -325,8 +347,15 @@ def _resolve_all_files_items(ctx, opts, pkg, owner, repo, outfile, use_stderr): def _resolve_single_file_item( - ctx, opts, pkg, owner, repo, outfile, multi_package, use_stderr -): + ctx: click.Context, + opts, + pkg: dict, + owner: str, + repo: str, + outfile: str, + multi_package: bool, + use_stderr: bool, +) -> dict: """Resolve a single primary file for a package.""" pkg_name = pkg.get("name", "unknown") pkg_version = pkg.get("version", "unknown") @@ -381,7 +410,7 @@ def _resolve_single_file_item( # --------------------------------------------------------------------------- -def _display_dry_run(packages, download_items, all_files): +def _display_dry_run(packages: list, download_items: list, all_files: bool) -> None: """Display what would be downloaded without actually downloading.""" click.echo() click.echo( @@ -405,16 +434,16 @@ def _display_dry_run(packages, download_items, all_files): def _perform_downloads( - ctx, + ctx: click.Context, opts, - packages, - download_items, + packages: list, + download_items: list, session, - auth_headers, - overwrite, - all_files, - use_stderr, -): + auth_headers: dict, + overwrite: bool, + all_files: bool, + use_stderr: bool, +) -> None: """Download all resolved items and report results.""" total = len(download_items) if not use_stderr: @@ -464,12 +493,12 @@ def _perform_downloads( _report_results(opts, packages, results, all_files) -def _echo_progress(use_stderr, message): +def _echo_progress(use_stderr: bool, message: str) -> None: """Print progress message to stdout or stderr.""" click.echo(message, nl=False, err=use_stderr) -def _echo_status(use_stderr, message, fg=None): +def _echo_status(use_stderr: bool, message: str, fg: str = None) -> None: """Print styled status message to stdout or stderr.""" if fg and not use_stderr: click.secho(message, fg=fg) @@ -479,7 +508,7 @@ def _echo_status(use_stderr, message, fg=None): click.echo(message) -def _report_results(opts, packages, results, all_files): +def _report_results(opts, packages: list, results: list, all_files: bool) -> None: """Build JSON output and print summary.""" success = [r for r in results if r["status"] == "OK"] failed = [r for r in results if r["status"] == "FAILED"] @@ -542,13 +571,11 @@ def _report_results(opts, packages, results, all_files): # --------------------------------------------------------------------------- -def _safe_join(base_dir, filename): +def _safe_join(base_dir: str, filename: str) -> str: """Safely join base_dir and filename, preventing path traversal.""" safe_name = os.path.basename(filename) if not safe_name: - raise click.ClickException( - f"Invalid filename '{filename}' — cannot be empty after sanitization." - ) + raise click.ClickException(f"Invalid filename '{filename}'.") result = os.path.join(base_dir, safe_name) if not os.path.realpath(result).startswith(os.path.realpath(base_dir) + os.sep): raise click.ClickException( @@ -557,7 +584,7 @@ def _safe_join(base_dir, filename): return result -def _get_extension_for_format(pkg_format): +def _get_extension_for_format(pkg_format: str) -> str: """Get appropriate file extension for package format.""" format_extensions = { "deb": "deb", @@ -576,13 +603,13 @@ def _get_extension_for_format(pkg_format): return format_extensions.get(pkg_format.lower(), "bin") -def _format_package_size(package): +def _format_package_size(package: dict) -> str: """Format package size for display.""" size = package.get("size", 0) return _format_file_size(size) -def _format_file_size(size): +def _format_file_size(size: int) -> str: """Format file size in bytes to human-readable format.""" if size == 0: return "Unknown" diff --git a/cloudsmith_cli/core/download.py b/cloudsmith_cli/core/download.py index 6243521d..3d361474 100644 --- a/cloudsmith_cli/core/download.py +++ b/cloudsmith_cli/core/download.py @@ -295,29 +295,28 @@ def resolve_package( def _display_multiple_packages(packages: List[Dict]) -> None: """Display a table of multiple matching packages.""" + from rich.console import Console + from rich.table import Table + click.echo("Multiple packages found:") click.echo() - headers = ["#", "Name", "Version", "Format", "Filename", "Size", "Created"] - rows = [] + table = Table(title=None, show_lines=False) + for header in ["#", "Name", "Version", "Format", "Filename", "Size", "Created"]: + table.add_column(header) for i, pkg in enumerate(packages, 1): - rows.append( - [ - str(i), - click.style(pkg.get("name", ""), fg="cyan"), - click.style(pkg.get("version", ""), fg="yellow"), - click.style(pkg.get("format", ""), fg="blue"), - click.style(pkg.get("filename", ""), fg="magenta"), - click.style(_format_size(pkg.get("size", 0)), fg="green"), - click.style(_format_date(pkg.get("uploaded_at", "")), fg="white"), - ] + table.add_row( + str(i), + pkg.get("name", ""), + pkg.get("version", ""), + pkg.get("format", ""), + pkg.get("filename", ""), + _format_size(pkg.get("size", 0)), + _format_date(pkg.get("uploaded_at", "")), ) - # Import here to avoid circular imports - from ..cli.utils import pretty_print_table - - pretty_print_table(headers, rows) + Console().print(table) click.echo() From 2603bc149ee18786f36e768560ca3295fe88df24 Mon Sep 17 00:00:00 2001 From: Ranjan Singh Date: Wed, 18 Mar 2026 13:53:37 +0000 Subject: [PATCH 5/6] replace the the pretty_print_table call with rich.table.Table in testcases --- cloudsmith_cli/core/download.py | 5 ++--- cloudsmith_cli/core/tests/test_download.py | 21 +++++++++++++-------- 2 files changed, 15 insertions(+), 11 deletions(-) diff --git a/cloudsmith_cli/core/download.py b/cloudsmith_cli/core/download.py index 3d361474..3cf58c55 100644 --- a/cloudsmith_cli/core/download.py +++ b/cloudsmith_cli/core/download.py @@ -8,6 +8,8 @@ import click import cloudsmith_api import requests +from rich.console import Console +from rich.table import Table from . import keyring, ratelimits, utils from .api.exceptions import catch_raise_api_exception @@ -295,9 +297,6 @@ def resolve_package( def _display_multiple_packages(packages: List[Dict]) -> None: """Display a table of multiple matching packages.""" - from rich.console import Console - from rich.table import Table - click.echo("Multiple packages found:") click.echo() diff --git a/cloudsmith_cli/core/tests/test_download.py b/cloudsmith_cli/core/tests/test_download.py index 9cf3df26..a76c345e 100644 --- a/cloudsmith_cli/core/tests/test_download.py +++ b/cloudsmith_cli/core/tests/test_download.py @@ -107,10 +107,10 @@ def test_resolve_package_multiple_matches_yes( mock_select_best.assert_called_once_with(mock_packages) @patch("cloudsmith_cli.core.download.list_packages") - @patch("cloudsmith_cli.cli.utils.pretty_print_table") + @patch("cloudsmith_cli.core.download.Console") @patch("click.echo") def test_resolve_package_multiple_matches_no_yes( - self, mock_echo, mock_pretty_print, mock_list_packages + self, mock_echo, mock_console_cls, mock_list_packages ): """Test package resolution with multiple matches without --yes.""" mock_packages = [ @@ -127,7 +127,7 @@ def test_resolve_package_multiple_matches_no_yes( download.resolve_package("owner", "repo", "test-package", yes=False) self.assertEqual(cm.exception.exit_code, 3) - mock_pretty_print.assert_called_once() + mock_console_cls().print.assert_called_once() @patch("cloudsmith_cli.core.download.list_packages") def test_resolve_package_with_filters(self, mock_list_packages): @@ -677,9 +677,12 @@ def test_resolve_package_error_mentions_download_all(self, mock_list_packages): class TestDisplayMultiplePackages(unittest.TestCase): """Test _display_multiple_packages function.""" - @patch("cloudsmith_cli.cli.utils.pretty_print_table") + @patch("cloudsmith_cli.core.download.Console") + @patch("cloudsmith_cli.core.download.Table") @patch("click.echo") - def test_display_includes_filename_column(self, mock_echo, mock_table): + def test_display_includes_filename_column( + self, mock_echo, mock_table_cls, mock_console_cls + ): """Test that the multiple packages table includes filename.""" packages = [ { @@ -694,9 +697,11 @@ def test_display_includes_filename_column(self, mock_echo, mock_table): download._display_multiple_packages(packages) - mock_table.assert_called_once() - headers = mock_table.call_args[0][0] - self.assertIn("Filename", headers) + # Verify that add_column was called with "Filename" + column_names = [ + call.args[0] for call in mock_table_cls().add_column.call_args_list + ] + self.assertIn("Filename", column_names) if __name__ == "__main__": From 9107ee781246540d76462e0044a6de004e2c578d Mon Sep 17 00:00:00 2001 From: Ranjan Singh Date: Wed, 18 Mar 2026 16:12:21 +0000 Subject: [PATCH 6/6] change release date --- CHANGELOG.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 7969836a..24b433a4 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,7 +9,7 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0. ## [Unreleased] -## [1.15.0] - 2026-03-13 +## [1.15.0] - 2026-03-18 ### Added