Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/trigger_files/beam_PostCommit_Java.json
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
{
"comment": "Modify this file in a trivial way to cause this test suite to run",
"modification": 4
"modification": 6
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,16 +37,21 @@
import java.nio.channels.WritableByteChannel;
import java.util.Collection;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.function.Supplier;
import org.apache.beam.sdk.extensions.gcp.options.GcsOptions;
import org.apache.beam.sdk.extensions.gcp.util.GcsUtilV2.BlobResult;
import org.apache.beam.sdk.extensions.gcp.util.GcsUtilV2.MissingStrategy;
import org.apache.beam.sdk.extensions.gcp.util.GcsUtilV2.OverwriteStrategy;
import org.apache.beam.sdk.extensions.gcp.util.gcsfs.GcsPath;
import org.apache.beam.sdk.io.fs.MoveOptions;
import org.apache.beam.sdk.io.fs.MoveOptions.StandardMoveOptions;
import org.apache.beam.sdk.options.DefaultValueFactory;
import org.apache.beam.sdk.options.ExperimentalOptions;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Sets;
import org.checkerframework.checker.nullness.qual.Nullable;

public class GcsUtil {
Expand Down Expand Up @@ -433,12 +438,65 @@ public void copy(Iterable<String> srcFilenames, Iterable<String> destFilenames)
delegate.copy(srcFilenames, destFilenames);
}

/** experimental api. */
public void copyV2(Iterable<GcsPath> srcPaths, Iterable<GcsPath> dstPaths) throws IOException {
copy(srcPaths, dstPaths, OverwriteStrategy.SAFE_OVERWRITE);
}

/** experimental api. */
public void copy(
Iterable<GcsPath> srcPaths, Iterable<GcsPath> dstPaths, OverwriteStrategy strategy)
throws IOException {
if (delegateV2 != null) {
delegateV2.copy(srcPaths, dstPaths, strategy);
} else {
throw new IOException("GcsUtil V2 not initialized.");
}
}

public void rename(
Iterable<String> srcFilenames, Iterable<String> destFilenames, MoveOptions... moveOptions)
throws IOException {
delegate.rename(srcFilenames, destFilenames, moveOptions);
}

/** experimental api. */
public void renameV2(
Iterable<GcsPath> srcPaths, Iterable<GcsPath> dstPaths, MoveOptions... moveOptions)
throws IOException {
Set<MoveOptions> moveOptionSet = Sets.newHashSet(moveOptions);
final MissingStrategy srcMissing;
final OverwriteStrategy dstOverwrite;

if (moveOptionSet.contains(StandardMoveOptions.IGNORE_MISSING_FILES)) {
srcMissing = MissingStrategy.SKIP_IF_MISSING;
} else {
srcMissing = MissingStrategy.FAIL_IF_MISSING;
}

if (moveOptionSet.contains(StandardMoveOptions.SKIP_IF_DESTINATION_EXISTS)) {
dstOverwrite = OverwriteStrategy.SKIP_IF_EXISTS;
} else {
dstOverwrite = OverwriteStrategy.SAFE_OVERWRITE;
}

rename(srcPaths, dstPaths, srcMissing, dstOverwrite);
}

/** experimental api. */
public void rename(
Iterable<GcsPath> srcPaths,
Iterable<GcsPath> dstPaths,
MissingStrategy srcMissing,
OverwriteStrategy dstOverwrite)
throws IOException {
if (delegateV2 != null) {
delegateV2.move(srcPaths, dstPaths, srcMissing, dstOverwrite);
} else {
throw new IOException("GcsUtil V2 not initialized.");
}
}

@VisibleForTesting
@SuppressWarnings("JdkObsolete") // for LinkedList
java.util.LinkedList<GcsUtilV1.RewriteOp> makeRewriteOps(
Expand Down Expand Up @@ -469,6 +527,20 @@ public void remove(Collection<String> filenames) throws IOException {
delegate.remove(filenames);
}

/** experimental api. */
public void removeV2(Iterable<GcsPath> paths) throws IOException {
remove(paths, MissingStrategy.SKIP_IF_MISSING);
}

/** experimental api. */
public void remove(Iterable<GcsPath> paths, MissingStrategy strategy) throws IOException {
if (delegateV2 != null) {
delegateV2.remove(paths, strategy);
} else {
throw new IOException("GcsUtil V2 not initialized.");
}
}

@SuppressFBWarnings("NM_CLASS_NOT_EXCEPTION")
public static class StorageObjectOrIOException {
final GcsUtilV1.StorageObjectOrIOException delegate;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,19 +18,24 @@
package org.apache.beam.sdk.extensions.gcp.util;

import static org.apache.beam.sdk.io.FileSystemUtils.wildcardToRegexp;
import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument;
import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkNotNull;

import com.google.api.gax.paging.Page;
import com.google.auto.value.AutoValue;
import com.google.cloud.storage.Blob;
import com.google.cloud.storage.BlobId;
import com.google.cloud.storage.BlobInfo;
import com.google.cloud.storage.Bucket;
import com.google.cloud.storage.BucketInfo;
import com.google.cloud.storage.CopyWriter;
import com.google.cloud.storage.Storage;
import com.google.cloud.storage.Storage.BlobField;
import com.google.cloud.storage.Storage.BlobGetOption;
import com.google.cloud.storage.Storage.BlobListOption;
import com.google.cloud.storage.Storage.BucketField;
import com.google.cloud.storage.Storage.BucketGetOption;
import com.google.cloud.storage.Storage.CopyRequest;
import com.google.cloud.storage.StorageBatch;
import com.google.cloud.storage.StorageBatchResult;
import com.google.cloud.storage.StorageException;
Expand Down Expand Up @@ -71,18 +76,27 @@ public GcsUtilV2 create(PipelineOptions options) {
/** Maximum number of requests permitted in a GCS batch request. */
private static final int MAX_REQUESTS_PER_BATCH = 100;

/**
* Limit the number of bytes Cloud Storage will attempt to copy before responding to an individual
* request. If you see Read Timeout errors, try reducing this value.
*/
private static final long MEGABYTES_COPIED_PER_CHUNK = 2048L;

GcsUtilV2(PipelineOptions options) {
String projectId = options.as(GcpOptions.class).getProject();
storage = StorageOptions.newBuilder().setProjectId(projectId).build().getService();
}

@SuppressWarnings({
"nullness" // For Creating AccessDeniedException and FileAlreadyExistsException with null.
"nullness" // For Creating AccessDeniedException FileNotFoundException, and
// FileAlreadyExistsException with null.
})
private IOException translateStorageException(GcsPath gcsPath, StorageException e) {
switch (e.getCode()) {
case 403:
return new AccessDeniedException(gcsPath.toString(), null, e.getMessage());
case 404:
return new FileNotFoundException(e.getMessage());
case 409:
return new FileAlreadyExistsException(gcsPath.toString(), null, e.getMessage());
default:
Expand Down Expand Up @@ -259,6 +273,151 @@ public List<GcsPath> expand(GcsPath gcsPattern) throws IOException {
return results;
}

public enum MissingStrategy {
FAIL_IF_MISSING,
SKIP_IF_MISSING,
}

public void remove(Iterable<GcsPath> paths, MissingStrategy strategy) throws IOException {
for (List<GcsPath> pathPartition :
Lists.partition(Lists.newArrayList(paths), MAX_REQUESTS_PER_BATCH)) {

// Create a new empty batch every time
StorageBatch batch = storage.batch();
List<StorageBatchResult<Boolean>> batchResultFutures = new ArrayList<>();

for (GcsPath path : pathPartition) {
batchResultFutures.add(batch.delete(path.getBucket(), path.getObject()));
}
batch.submit();

for (int i = 0; i < batchResultFutures.size(); i++) {
StorageBatchResult<Boolean> future = batchResultFutures.get(i);
try {
Boolean deleted = future.get();
if (!deleted) {
if (strategy == MissingStrategy.FAIL_IF_MISSING) {
throw new FileNotFoundException(
String.format(
"The specified file does not exist: %s", pathPartition.get(i).toString()));
} else {
LOG.warn("Ignoring failed deletion on file {}.", pathPartition.get(i).toString());
}
}
} catch (StorageException e) {
throw translateStorageException(pathPartition.get(i), e);
}
}
}
}

public enum OverwriteStrategy {
FAIL_IF_EXISTS, // Fail if target exists
SKIP_IF_EXISTS, // Skip if target exists
SAFE_OVERWRITE, // Overwrite only if the generation matches (atomic)
ALWAYS_OVERWRITE // Overwrite regardless of state
}

private void rewriteHelper(
Iterable<GcsPath> srcPaths,
Iterable<GcsPath> dstPaths,
boolean deleteSrc,
MissingStrategy srcMissing,
OverwriteStrategy dstOverwrite)
throws IOException {
List<GcsPath> srcList = Lists.newArrayList(srcPaths);
List<GcsPath> dstList = Lists.newArrayList(dstPaths);
checkArgument(
srcList.size() == dstList.size(),
"Number of source files %s must equal number of destination files %s",
srcList.size(),
dstList.size());

for (int i = 0; i < srcList.size(); i++) {
GcsPath srcPath = srcList.get(i);
GcsPath dstPath = dstList.get(i);
BlobId srcId = BlobId.of(srcPath.getBucket(), srcPath.getObject());
BlobId dstId = BlobId.of(dstPath.getBucket(), dstPath.getObject());

CopyRequest.Builder copyRequestBuilder =
CopyRequest.newBuilder()
.setSource(srcId)
.setMegabytesCopiedPerChunk(MEGABYTES_COPIED_PER_CHUNK);

if (dstOverwrite == OverwriteStrategy.ALWAYS_OVERWRITE) {
copyRequestBuilder.setTarget(dstId);
} else {
// FAIL_IF_EXISTS, SKIP_IF_EXISTS and SAFE_OVERWRITE require checking the target blob
BlobInfo existingTarget;
try {
existingTarget = storage.get(dstId);
} catch (StorageException e) {
throw translateStorageException(dstPath, e);
}

if (existingTarget == null) {
copyRequestBuilder.setTarget(dstId, Storage.BlobTargetOption.doesNotExist());
} else {
switch (dstOverwrite) {
case SKIP_IF_EXISTS:
LOG.warn("Ignoring rewriting from {} to {} because target exists.", srcPath, dstPath);
continue; // Skip to next file in for-loop

case SAFE_OVERWRITE:
copyRequestBuilder.setTarget(
dstId, Storage.BlobTargetOption.generationMatch(existingTarget.getGeneration()));
break;

case FAIL_IF_EXISTS:
throw new FileAlreadyExistsException(
srcPath.toString(),
dstPath.toString(),
"Target object already exists and strategy is FAIL_IF_EXISTS");
default:
throw new IllegalStateException("Unknown OverwriteStrategy: " + dstOverwrite);
}
}
}

try {
CopyWriter copyWriter = storage.copy(copyRequestBuilder.build());
copyWriter.getResult();

if (deleteSrc) {
if (!storage.delete(srcId)) {
// This may happen if the source file is deleted by another process after copy.
LOG.warn(
"Source file {} could not be deleted after move to {}. It may not have existed.",
srcPath,
dstPath);
}
}
} catch (StorageException e) {
if (e.getCode() == 404 && srcMissing == MissingStrategy.SKIP_IF_MISSING) {
LOG.warn(
"Ignoring rewriting from {} to {} because source does not exist.", srcPath, dstPath);
continue;
}
throw translateStorageException(srcPath, e);
}
}
}

public void copy(
Iterable<GcsPath> srcPaths, Iterable<GcsPath> dstPaths, OverwriteStrategy strategy)
throws IOException {
rewriteHelper(srcPaths, dstPaths, false, MissingStrategy.FAIL_IF_MISSING, strategy);
}

public void move(
Iterable<GcsPath> srcPaths,
Iterable<GcsPath> dstPaths,
MissingStrategy srcMissing,
OverwriteStrategy dstOverwrite)
throws IOException {
rewriteHelper(srcPaths, dstPaths, true, srcMissing, dstOverwrite);
}

/** Get the {@link Bucket} from Cloud Storage path or propagates an exception. */
public Bucket getBucket(GcsPath path, BucketGetOption... options) throws IOException {
String bucketName = path.getBucket();
Expand Down
Loading
Loading