diff --git a/.github/trigger_files/beam_PostCommit_Java.json b/.github/trigger_files/beam_PostCommit_Java.json index 756b765e59e3..7b4c1ba67021 100644 --- a/.github/trigger_files/beam_PostCommit_Java.json +++ b/.github/trigger_files/beam_PostCommit_Java.json @@ -1,4 +1,4 @@ { "comment": "Modify this file in a trivial way to cause this test suite to run", - "modification": 4 + "modification": 6 } diff --git a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtil.java b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtil.java index a2fdf24e9fbc..33399ef87b63 100644 --- a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtil.java +++ b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtil.java @@ -37,16 +37,21 @@ import java.nio.channels.WritableByteChannel; import java.util.Collection; import java.util.List; +import java.util.Set; import java.util.concurrent.ExecutorService; import java.util.function.Supplier; import org.apache.beam.sdk.extensions.gcp.options.GcsOptions; import org.apache.beam.sdk.extensions.gcp.util.GcsUtilV2.BlobResult; +import org.apache.beam.sdk.extensions.gcp.util.GcsUtilV2.MissingStrategy; +import org.apache.beam.sdk.extensions.gcp.util.GcsUtilV2.OverwriteStrategy; import org.apache.beam.sdk.extensions.gcp.util.gcsfs.GcsPath; import org.apache.beam.sdk.io.fs.MoveOptions; +import org.apache.beam.sdk.io.fs.MoveOptions.StandardMoveOptions; import org.apache.beam.sdk.options.DefaultValueFactory; import org.apache.beam.sdk.options.ExperimentalOptions; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Sets; import org.checkerframework.checker.nullness.qual.Nullable; public class GcsUtil { @@ -433,12 +438,65 @@ public void copy(Iterable srcFilenames, Iterable destFilenames) delegate.copy(srcFilenames, destFilenames); } + /** experimental api. */ + public void copyV2(Iterable srcPaths, Iterable dstPaths) throws IOException { + copy(srcPaths, dstPaths, OverwriteStrategy.SAFE_OVERWRITE); + } + + /** experimental api. */ + public void copy( + Iterable srcPaths, Iterable dstPaths, OverwriteStrategy strategy) + throws IOException { + if (delegateV2 != null) { + delegateV2.copy(srcPaths, dstPaths, strategy); + } else { + throw new IOException("GcsUtil V2 not initialized."); + } + } + public void rename( Iterable srcFilenames, Iterable destFilenames, MoveOptions... moveOptions) throws IOException { delegate.rename(srcFilenames, destFilenames, moveOptions); } + /** experimental api. */ + public void renameV2( + Iterable srcPaths, Iterable dstPaths, MoveOptions... moveOptions) + throws IOException { + Set moveOptionSet = Sets.newHashSet(moveOptions); + final MissingStrategy srcMissing; + final OverwriteStrategy dstOverwrite; + + if (moveOptionSet.contains(StandardMoveOptions.IGNORE_MISSING_FILES)) { + srcMissing = MissingStrategy.SKIP_IF_MISSING; + } else { + srcMissing = MissingStrategy.FAIL_IF_MISSING; + } + + if (moveOptionSet.contains(StandardMoveOptions.SKIP_IF_DESTINATION_EXISTS)) { + dstOverwrite = OverwriteStrategy.SKIP_IF_EXISTS; + } else { + dstOverwrite = OverwriteStrategy.SAFE_OVERWRITE; + } + + rename(srcPaths, dstPaths, srcMissing, dstOverwrite); + } + + /** experimental api. */ + public void rename( + Iterable srcPaths, + Iterable dstPaths, + MissingStrategy srcMissing, + OverwriteStrategy dstOverwrite) + throws IOException { + if (delegateV2 != null) { + delegateV2.move(srcPaths, dstPaths, srcMissing, dstOverwrite); + } else { + throw new IOException("GcsUtil V2 not initialized."); + } + } + @VisibleForTesting @SuppressWarnings("JdkObsolete") // for LinkedList java.util.LinkedList makeRewriteOps( @@ -469,6 +527,20 @@ public void remove(Collection filenames) throws IOException { delegate.remove(filenames); } + /** experimental api. */ + public void removeV2(Iterable paths) throws IOException { + remove(paths, MissingStrategy.SKIP_IF_MISSING); + } + + /** experimental api. */ + public void remove(Iterable paths, MissingStrategy strategy) throws IOException { + if (delegateV2 != null) { + delegateV2.remove(paths, strategy); + } else { + throw new IOException("GcsUtil V2 not initialized."); + } + } + @SuppressFBWarnings("NM_CLASS_NOT_EXCEPTION") public static class StorageObjectOrIOException { final GcsUtilV1.StorageObjectOrIOException delegate; diff --git a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtilV2.java b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtilV2.java index a2df45511c95..b00b7ce0d728 100644 --- a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtilV2.java +++ b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtilV2.java @@ -18,19 +18,24 @@ package org.apache.beam.sdk.extensions.gcp.util; import static org.apache.beam.sdk.io.FileSystemUtils.wildcardToRegexp; +import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument; import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkNotNull; import com.google.api.gax.paging.Page; import com.google.auto.value.AutoValue; import com.google.cloud.storage.Blob; +import com.google.cloud.storage.BlobId; +import com.google.cloud.storage.BlobInfo; import com.google.cloud.storage.Bucket; import com.google.cloud.storage.BucketInfo; +import com.google.cloud.storage.CopyWriter; import com.google.cloud.storage.Storage; import com.google.cloud.storage.Storage.BlobField; import com.google.cloud.storage.Storage.BlobGetOption; import com.google.cloud.storage.Storage.BlobListOption; import com.google.cloud.storage.Storage.BucketField; import com.google.cloud.storage.Storage.BucketGetOption; +import com.google.cloud.storage.Storage.CopyRequest; import com.google.cloud.storage.StorageBatch; import com.google.cloud.storage.StorageBatchResult; import com.google.cloud.storage.StorageException; @@ -71,18 +76,27 @@ public GcsUtilV2 create(PipelineOptions options) { /** Maximum number of requests permitted in a GCS batch request. */ private static final int MAX_REQUESTS_PER_BATCH = 100; + /** + * Limit the number of bytes Cloud Storage will attempt to copy before responding to an individual + * request. If you see Read Timeout errors, try reducing this value. + */ + private static final long MEGABYTES_COPIED_PER_CHUNK = 2048L; + GcsUtilV2(PipelineOptions options) { String projectId = options.as(GcpOptions.class).getProject(); storage = StorageOptions.newBuilder().setProjectId(projectId).build().getService(); } @SuppressWarnings({ - "nullness" // For Creating AccessDeniedException and FileAlreadyExistsException with null. + "nullness" // For Creating AccessDeniedException FileNotFoundException, and + // FileAlreadyExistsException with null. }) private IOException translateStorageException(GcsPath gcsPath, StorageException e) { switch (e.getCode()) { case 403: return new AccessDeniedException(gcsPath.toString(), null, e.getMessage()); + case 404: + return new FileNotFoundException(e.getMessage()); case 409: return new FileAlreadyExistsException(gcsPath.toString(), null, e.getMessage()); default: @@ -259,6 +273,151 @@ public List expand(GcsPath gcsPattern) throws IOException { return results; } + public enum MissingStrategy { + FAIL_IF_MISSING, + SKIP_IF_MISSING, + } + + public void remove(Iterable paths, MissingStrategy strategy) throws IOException { + for (List pathPartition : + Lists.partition(Lists.newArrayList(paths), MAX_REQUESTS_PER_BATCH)) { + + // Create a new empty batch every time + StorageBatch batch = storage.batch(); + List> batchResultFutures = new ArrayList<>(); + + for (GcsPath path : pathPartition) { + batchResultFutures.add(batch.delete(path.getBucket(), path.getObject())); + } + batch.submit(); + + for (int i = 0; i < batchResultFutures.size(); i++) { + StorageBatchResult future = batchResultFutures.get(i); + try { + Boolean deleted = future.get(); + if (!deleted) { + if (strategy == MissingStrategy.FAIL_IF_MISSING) { + throw new FileNotFoundException( + String.format( + "The specified file does not exist: %s", pathPartition.get(i).toString())); + } else { + LOG.warn("Ignoring failed deletion on file {}.", pathPartition.get(i).toString()); + } + } + } catch (StorageException e) { + throw translateStorageException(pathPartition.get(i), e); + } + } + } + } + + public enum OverwriteStrategy { + FAIL_IF_EXISTS, // Fail if target exists + SKIP_IF_EXISTS, // Skip if target exists + SAFE_OVERWRITE, // Overwrite only if the generation matches (atomic) + ALWAYS_OVERWRITE // Overwrite regardless of state + } + + private void rewriteHelper( + Iterable srcPaths, + Iterable dstPaths, + boolean deleteSrc, + MissingStrategy srcMissing, + OverwriteStrategy dstOverwrite) + throws IOException { + List srcList = Lists.newArrayList(srcPaths); + List dstList = Lists.newArrayList(dstPaths); + checkArgument( + srcList.size() == dstList.size(), + "Number of source files %s must equal number of destination files %s", + srcList.size(), + dstList.size()); + + for (int i = 0; i < srcList.size(); i++) { + GcsPath srcPath = srcList.get(i); + GcsPath dstPath = dstList.get(i); + BlobId srcId = BlobId.of(srcPath.getBucket(), srcPath.getObject()); + BlobId dstId = BlobId.of(dstPath.getBucket(), dstPath.getObject()); + + CopyRequest.Builder copyRequestBuilder = + CopyRequest.newBuilder() + .setSource(srcId) + .setMegabytesCopiedPerChunk(MEGABYTES_COPIED_PER_CHUNK); + + if (dstOverwrite == OverwriteStrategy.ALWAYS_OVERWRITE) { + copyRequestBuilder.setTarget(dstId); + } else { + // FAIL_IF_EXISTS, SKIP_IF_EXISTS and SAFE_OVERWRITE require checking the target blob + BlobInfo existingTarget; + try { + existingTarget = storage.get(dstId); + } catch (StorageException e) { + throw translateStorageException(dstPath, e); + } + + if (existingTarget == null) { + copyRequestBuilder.setTarget(dstId, Storage.BlobTargetOption.doesNotExist()); + } else { + switch (dstOverwrite) { + case SKIP_IF_EXISTS: + LOG.warn("Ignoring rewriting from {} to {} because target exists.", srcPath, dstPath); + continue; // Skip to next file in for-loop + + case SAFE_OVERWRITE: + copyRequestBuilder.setTarget( + dstId, Storage.BlobTargetOption.generationMatch(existingTarget.getGeneration())); + break; + + case FAIL_IF_EXISTS: + throw new FileAlreadyExistsException( + srcPath.toString(), + dstPath.toString(), + "Target object already exists and strategy is FAIL_IF_EXISTS"); + default: + throw new IllegalStateException("Unknown OverwriteStrategy: " + dstOverwrite); + } + } + } + + try { + CopyWriter copyWriter = storage.copy(copyRequestBuilder.build()); + copyWriter.getResult(); + + if (deleteSrc) { + if (!storage.delete(srcId)) { + // This may happen if the source file is deleted by another process after copy. + LOG.warn( + "Source file {} could not be deleted after move to {}. It may not have existed.", + srcPath, + dstPath); + } + } + } catch (StorageException e) { + if (e.getCode() == 404 && srcMissing == MissingStrategy.SKIP_IF_MISSING) { + LOG.warn( + "Ignoring rewriting from {} to {} because source does not exist.", srcPath, dstPath); + continue; + } + throw translateStorageException(srcPath, e); + } + } + } + + public void copy( + Iterable srcPaths, Iterable dstPaths, OverwriteStrategy strategy) + throws IOException { + rewriteHelper(srcPaths, dstPaths, false, MissingStrategy.FAIL_IF_MISSING, strategy); + } + + public void move( + Iterable srcPaths, + Iterable dstPaths, + MissingStrategy srcMissing, + OverwriteStrategy dstOverwrite) + throws IOException { + rewriteHelper(srcPaths, dstPaths, true, srcMissing, dstOverwrite); + } + /** Get the {@link Bucket} from Cloud Storage path or propagates an exception. */ public Bucket getBucket(GcsPath path, BucketGetOption... options) throws IOException { String bucketName = path.getBucket(); diff --git a/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtilParameterizedIT.java b/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtilParameterizedIT.java index db5097c95155..80ffd72924fa 100644 --- a/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtilParameterizedIT.java +++ b/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtilParameterizedIT.java @@ -37,7 +37,10 @@ import java.util.List; import java.util.stream.Collectors; import org.apache.beam.sdk.extensions.gcp.options.GcsOptions; +import org.apache.beam.sdk.extensions.gcp.util.GcsUtilV2.MissingStrategy; +import org.apache.beam.sdk.extensions.gcp.util.GcsUtilV2.OverwriteStrategy; import org.apache.beam.sdk.extensions.gcp.util.gcsfs.GcsPath; +import org.apache.beam.sdk.io.fs.MoveOptions; import org.apache.beam.sdk.options.ExperimentalOptions; import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.testing.TestPipelineOptions; @@ -297,4 +300,291 @@ public void testCreateAndRemoveBucket() throws IOException { } } } + + private List createTestBucketHelper(String bucketName) throws IOException { + final List originPaths = + Arrays.asList( + GcsPath.fromUri("gs://apache-beam-samples/shakespeare/kingrichardii.txt"), + GcsPath.fromUri("gs://apache-beam-samples/shakespeare/kingrichardiii.txt")); + + final List testPaths = + originPaths.stream() + .map(o -> GcsPath.fromComponents(bucketName, o.getObject())) + .collect(Collectors.toList()); + + // create bucket and copy some initial files into there + if (experiment.equals("use_gcsutil_v2")) { + gcsUtil.createBucket(BucketInfo.of(bucketName)); + + gcsUtil.copyV2(originPaths, testPaths); + } else { + GcsOptions gcsOptions = options.as(GcsOptions.class); + gcsUtil.createBucket(gcsOptions.getProject(), new Bucket().setName(bucketName)); + + final List originList = + originPaths.stream().map(o -> o.toString()).collect(Collectors.toList()); + final List testList = + testPaths.stream().map(o -> o.toString()).collect(Collectors.toList()); + gcsUtil.copy(originList, testList); + } + + return testPaths; + } + + private void tearDownTestBucketHelper(String bucketName) { + try { + // use "**" in the pattern to match any characters including "/". + final List paths = + gcsUtil.expand(GcsPath.fromUri(String.format("gs://%s/**", bucketName))); + if (experiment.equals("use_gcsutil_v2")) { + gcsUtil.remove(paths, MissingStrategy.SKIP_IF_MISSING); + gcsUtil.removeBucket(BucketInfo.of(bucketName)); + } else { + gcsUtil.remove(paths.stream().map(GcsPath::toString).collect(Collectors.toList())); + gcsUtil.removeBucket(new Bucket().setName(bucketName)); + } + } catch (IOException e) { + System.err.println( + "Error during tear down of test bucket " + bucketName + ": " + e.getMessage()); + } + } + + @Test + public void testCopy() throws IOException { + final String existingBucket = "apache-beam-temp-bucket-12345"; + final String nonExistentBucket = "my-random-test-bucket-12345"; + + try { + final List srcPaths = createTestBucketHelper(existingBucket); + final List dstPaths = + srcPaths.stream() + .map(o -> GcsPath.fromComponents(existingBucket, o.getObject() + ".bak")) + .collect(Collectors.toList()); + final List errPaths = + srcPaths.stream() + .map(o -> GcsPath.fromComponents(nonExistentBucket, o.getObject())) + .collect(Collectors.toList()); + + assertNotExists(dstPaths.get(0)); + assertNotExists(dstPaths.get(1)); + + if (experiment.equals("use_gcsutil_v2")) { + // (1) when the target files do not exist + gcsUtil.copyV2(srcPaths, dstPaths); + assertExists(dstPaths.get(0)); + assertExists(dstPaths.get(1)); + + // (2) when the target files exist + // (2a) no exception on SAFE_OVERWRITE, ALWAYS_OVERWRITE, SKIP_IF_EXISTS + gcsUtil.copyV2(srcPaths, dstPaths); + gcsUtil.copy(srcPaths, dstPaths, OverwriteStrategy.ALWAYS_OVERWRITE); + gcsUtil.copy(srcPaths, dstPaths, OverwriteStrategy.SKIP_IF_EXISTS); + + // (2b) raise exception on FAIL_IF_EXISTS + assertThrows( + FileAlreadyExistsException.class, + () -> gcsUtil.copy(srcPaths, dstPaths, OverwriteStrategy.FAIL_IF_EXISTS)); + + // (3) raise exception when the target bucket is nonexistent. + assertThrows(FileNotFoundException.class, () -> gcsUtil.copyV2(srcPaths, errPaths)); + + // (4) raise exception when the source files are nonexistent. + assertThrows(FileNotFoundException.class, () -> gcsUtil.copyV2(errPaths, dstPaths)); + } else { + final List srcList = + srcPaths.stream().map(o -> o.toString()).collect(Collectors.toList()); + final List dstList = + dstPaths.stream().map(o -> o.toString()).collect(Collectors.toList()); + final List errList = + errPaths.stream().map(o -> o.toString()).collect(Collectors.toList()); + + // (1) when the target files do not exist + gcsUtil.copy(srcList, dstList); + assertExists(dstPaths.get(0)); + assertExists(dstPaths.get(1)); + + // (2) when the target files exist, no exception + gcsUtil.copy(srcList, dstList); + + // (3) raise exception when the target bucket is nonexistent. + assertThrows(FileNotFoundException.class, () -> gcsUtil.copy(srcList, errList)); + + // (4) raise exception when the source files are nonexistent. + assertThrows(FileNotFoundException.class, () -> gcsUtil.copy(errList, dstList)); + } + } finally { + tearDownTestBucketHelper(existingBucket); + } + } + + @Test + public void testRemove() throws IOException { + final String existingBucket = "apache-beam-temp-bucket-12345"; + final String nonExistentBucket = "my-random-test-bucket-12345"; + + try { + final List srcPaths = createTestBucketHelper(existingBucket); + final List errPaths = + srcPaths.stream() + .map(o -> GcsPath.fromComponents(nonExistentBucket, o.getObject())) + .collect(Collectors.toList()); + + assertExists(srcPaths.get(0)); + assertExists(srcPaths.get(1)); + + if (experiment.equals("use_gcsutil_v2")) { + // (1) when the files to remove exist + gcsUtil.removeV2(srcPaths); + assertNotExists(srcPaths.get(0)); + assertNotExists(srcPaths.get(1)); + + // (2) when the files to remove have been deleted + // (2a) no exception on SKIP_IF_MISSING + gcsUtil.removeV2(srcPaths); + gcsUtil.remove(srcPaths, MissingStrategy.SKIP_IF_MISSING); + + // (2b) raise exception on FAIL_IF_MISSING + assertThrows( + FileNotFoundException.class, + () -> gcsUtil.remove(srcPaths, MissingStrategy.FAIL_IF_MISSING)); + + // (3) when the files are from an nonexistent bucket + // (3a) no exception on SKIP_IF_MISSING + gcsUtil.removeV2(errPaths); + gcsUtil.remove(errPaths, MissingStrategy.SKIP_IF_MISSING); + + // (3b) raise exception on FAIL_IF_MISSING + assertThrows( + FileNotFoundException.class, + () -> gcsUtil.remove(errPaths, MissingStrategy.FAIL_IF_MISSING)); + } else { + final List srcList = + srcPaths.stream().map(o -> o.toString()).collect(Collectors.toList()); + final List errList = + errPaths.stream().map(o -> o.toString()).collect(Collectors.toList()); + + // (1) when the files to remove exist + gcsUtil.remove(srcList); + assertNotExists(srcPaths.get(0)); + assertNotExists(srcPaths.get(1)); + + // (2) when the files to remove have been deleted, no exception + gcsUtil.remove(srcList); + + // (3) when the files are from an nonexistent bucket, no exception + gcsUtil.remove(errList); + } + } finally { + tearDownTestBucketHelper(existingBucket); + } + } + + @Test + public void testRename() throws IOException { + final String existingBucket = "apache-beam-temp-bucket-12345"; + final String nonExistentBucket = "my-random-test-bucket-12345"; + + try { + final List srcPaths = createTestBucketHelper(existingBucket); + final List tmpPaths = + srcPaths.stream() + .map(o -> GcsPath.fromComponents(existingBucket, "tmp/" + o.getObject())) + .collect(Collectors.toList()); + final List dstPaths = + srcPaths.stream() + .map(o -> GcsPath.fromComponents(existingBucket, o.getObject() + ".bak")) + .collect(Collectors.toList()); + final List errPaths = + srcPaths.stream() + .map(o -> GcsPath.fromComponents(nonExistentBucket, o.getObject())) + .collect(Collectors.toList()); + + assertNotExists(dstPaths.get(0)); + assertNotExists(dstPaths.get(1)); + if (experiment.equals("use_gcsutil_v2")) { + // Make a copy of sources + gcsUtil.copyV2(srcPaths, tmpPaths); + + // (1) when the source files exist and target files do not + gcsUtil.renameV2(tmpPaths, dstPaths); + assertNotExists(tmpPaths.get(0)); + assertNotExists(tmpPaths.get(1)); + assertExists(dstPaths.get(0)); + assertExists(dstPaths.get(1)); + + // (2) when the source files do not exist + // (2a) no exception if IGNORE_MISSING_FILES is set + gcsUtil.renameV2(errPaths, dstPaths, MoveOptions.StandardMoveOptions.IGNORE_MISSING_FILES); + + // (2b) raise exception if if IGNORE_MISSING_FILES is not set + assertThrows(FileNotFoundException.class, () -> gcsUtil.renameV2(errPaths, dstPaths)); + + // (3) when both source files and target files exist + gcsUtil.renameV2( + srcPaths, dstPaths, MoveOptions.StandardMoveOptions.SKIP_IF_DESTINATION_EXISTS); + gcsUtil.renameV2(srcPaths, dstPaths); + } else { + final List srcList = + srcPaths.stream().map(o -> o.toString()).collect(Collectors.toList()); + final List tmpList = + tmpPaths.stream().map(o -> o.toString()).collect(Collectors.toList()); + final List dstList = + dstPaths.stream().map(o -> o.toString()).collect(Collectors.toList()); + final List errList = + errPaths.stream().map(o -> o.toString()).collect(Collectors.toList()); + + // Make a copy of sources + gcsUtil.copy(srcList, tmpList); + + // (1) when the source files exist and target files do not + gcsUtil.rename(tmpList, dstList); + assertNotExists(tmpPaths.get(0)); + assertNotExists(tmpPaths.get(1)); + assertExists(dstPaths.get(0)); + assertExists(dstPaths.get(1)); + + // (2) when the source files do not exist + // (2a) no exception if IGNORE_MISSING_FILES is set + gcsUtil.rename(errList, dstList, MoveOptions.StandardMoveOptions.IGNORE_MISSING_FILES); + + // (2b) raise exception if if IGNORE_MISSING_FILES is not set + assertThrows(FileNotFoundException.class, () -> gcsUtil.rename(errList, dstList)); + + // (3) when both source files and target files exist + assertExists(srcPaths.get(0)); + assertExists(srcPaths.get(1)); + assertExists(dstPaths.get(0)); + assertExists(dstPaths.get(1)); + + // There is a bug in V1 where SKIP_IF_DESTINATION_EXISTS is not honored. + gcsUtil.rename( + srcList, dstList, MoveOptions.StandardMoveOptions.SKIP_IF_DESTINATION_EXISTS); + + assertNotExists(srcPaths.get(0)); // BUG! The renaming is supposed to be skipped + assertNotExists(srcPaths.get(1)); // BUG! The renaming is supposed to be skipped + // assertExists(srcPaths.get(0)); + // assertExists(srcPaths.get(1)); + assertExists(dstPaths.get(0)); + assertExists(dstPaths.get(1)); + } + } finally { + tearDownTestBucketHelper(existingBucket); + } + } + + private void assertExists(GcsPath path) throws IOException { + if (experiment.equals("use_gcsutil_v2")) { + gcsUtil.getBlob(path); + } else { + gcsUtil.getObject(path); + } + } + + private void assertNotExists(GcsPath path) throws IOException { + if (experiment.equals("use_gcsutil_v2")) { + assertThrows(FileNotFoundException.class, () -> gcsUtil.getBlob(path)); + } else { + assertThrows(FileNotFoundException.class, () -> gcsUtil.getObject(path)); + } + } } diff --git a/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/util/gcsfs/GcsPathTest.java b/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/util/gcsfs/GcsPathTest.java index f344c6c2dba7..9512fec312cf 100644 --- a/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/util/gcsfs/GcsPathTest.java +++ b/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/util/gcsfs/GcsPathTest.java @@ -351,6 +351,7 @@ public void testSubPathError() { @Test public void testIsWildcard() { + assertTrue(GcsPath.isWildcard(GcsPath.fromUri("gs://bucket/*"))); assertTrue(GcsPath.isWildcard(GcsPath.fromUri("gs://bucket/foo*"))); assertTrue(GcsPath.isWildcard(GcsPath.fromUri("gs://bucket/foo?"))); assertTrue(GcsPath.isWildcard(GcsPath.fromUri("gs://bucket/foo[a-z]"))); @@ -359,6 +360,7 @@ public void testIsWildcard() { @Test public void testGetNonWildcardPrefix() { + assertEquals("gs://bucket/", GcsPath.getNonWildcardPrefix("gs://bucket/*")); assertEquals("gs://bucket/foo", GcsPath.getNonWildcardPrefix("gs://bucket/foo*")); assertEquals("gs://bucket/foo", GcsPath.getNonWildcardPrefix("gs://bucket/foo?")); assertEquals("gs://bucket/foo", GcsPath.getNonWildcardPrefix("gs://bucket/foo[a-z]"));