diff --git a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/PhoenixSyncTableRegionScanner.java b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/PhoenixSyncTableRegionScanner.java
new file mode 100644
index 00000000000..13108a58c81
--- /dev/null
+++ b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/PhoenixSyncTableRegionScanner.java
@@ -0,0 +1,350 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.coprocessor;
+
+import static org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil.DEFAULT_PHOENIX_SYNC_TABLE_CHUNK_SIZE_BYTES;
+import static org.apache.phoenix.query.QueryConstants.AGG_TIMESTAMP;
+import static org.apache.phoenix.query.QueryConstants.SINGLE_COLUMN_FAMILY;
+import static org.apache.phoenix.schema.types.PDataType.FALSE_BYTES;
+import static org.apache.phoenix.schema.types.PDataType.TRUE_BYTES;
+
+import java.io.ByteArrayInputStream;
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.PrivateCellUtil;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+import org.apache.hadoop.hbase.regionserver.Region;
+import org.apache.hadoop.hbase.regionserver.RegionScanner;
+import org.apache.hadoop.hbase.regionserver.ScannerContext;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.phoenix.coprocessorclient.BaseScannerRegionObserverConstants;
+import org.apache.phoenix.util.PhoenixKeyValueUtil;
+import org.bouncycastle.crypto.digests.SHA256Digest;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.phoenix.thirdparty.com.google.common.annotations.VisibleForTesting;
+
+/**
+ * Server-side coprocessor that performs chunk formation and SHA-256 hashing for
+ * PhoenixSyncTableTool.
+ *
+ * Accumulates rows into chunks (based on size limits) and computes a hash of all row data (keys,
+ * column families, qualifiers, timestamps, cell types, values).
+ *
+ * Source scan (isTargetScan=false): Returns complete chunks bounded by region boundaries. Sets
+ * hasMoreRows=false when region is exhausted.
+ *
+ * Target scan (isTargetScan=true): Returns partial chunks with serialized digest state when region
+ * boundary is reached, allowing cross-region hash continuation.
+ *
+ * Returns chunk metadata cells: END_KEY, HASH (or digest state), ROW_COUNT, IS_PARTIAL_CHUNK
+ */
+public class PhoenixSyncTableRegionScanner extends BaseRegionScanner {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(PhoenixSyncTableRegionScanner.class);
+ private static final byte[] CHUNK_METADATA_FAMILY = SINGLE_COLUMN_FAMILY;
+ private static final int MAX_SHA256_DIGEST_STATE_SIZE = 128;
+ private final Region region;
+ private final Scan scan;
+ private final RegionCoprocessorEnvironment env;
+ private final UngroupedAggregateRegionObserver ungroupedAggregateRegionObserver;
+ private final long chunkSizeBytes;
+ private boolean isTargetScan = false;
+ private byte[] chunkStartKey = null;
+ private byte[] chunkEndKey = null;
+ private long currentChunkSize = 0L;
+ private long currentChunkRowCount = 0L;
+ // We are not using jdk bundled SHA, since their digest can't be serialization/deserialization
+ // which is needed for passing around partial chunk
+ private SHA256Digest digest;
+ private boolean hasMoreRows = true;
+ // If target chunk was partial, and we are continuing to
+ // update digest before calculating checksum
+ private boolean isUsingContinuedDigest;
+ private final byte[] timestampBuffer = new byte[8];
+
+ /**
+ * Creates a PhoenixSyncTableRegionScanner for chunk-based hashing.
+ * @param innerScanner The underlying region scanner
+ * @param region The region being scanned
+ * @param scan The scan request
+ * @param env The coprocessor environment
+ * @param ungroupedAggregateRegionObserver Parent observer for region state checks
+ * @throws IllegalStateException if digest state restoration fails
+ */
+ @VisibleForTesting
+ public PhoenixSyncTableRegionScanner(final RegionScanner innerScanner, final Region region,
+ final Scan scan, final RegionCoprocessorEnvironment env,
+ final UngroupedAggregateRegionObserver ungroupedAggregateRegionObserver) {
+ super(innerScanner);
+ this.region = region;
+ this.scan = scan;
+ this.env = env;
+ this.ungroupedAggregateRegionObserver = ungroupedAggregateRegionObserver;
+ byte[] chunkSizeAttr =
+ scan.getAttribute(BaseScannerRegionObserverConstants.SYNC_TABLE_CHUNK_SIZE_BYTES);
+ if (chunkSizeAttr == null) { // Since we don't set chunk size scan attr for target cluster scan
+ this.isTargetScan = true;
+ }
+ this.chunkSizeBytes = chunkSizeAttr != null
+ ? Bytes.toLong(chunkSizeAttr)
+ : DEFAULT_PHOENIX_SYNC_TABLE_CHUNK_SIZE_BYTES;
+
+ // Check if we should continue from a previous digest state (cross-region continuation)
+ byte[] continuedDigestStateAttr =
+ scan.getAttribute(BaseScannerRegionObserverConstants.SYNC_TABLE_CONTINUED_DIGEST_STATE);
+ if (continuedDigestStateAttr != null) {
+ try {
+ this.digest = decodeDigestState(continuedDigestStateAttr);
+ this.isUsingContinuedDigest = true;
+ } catch (IOException e) {
+ throw new IllegalStateException("Failed to restore continued digest state", e);
+ }
+ } else {
+ this.digest = new SHA256Digest();
+ this.isUsingContinuedDigest = false;
+ }
+ }
+
+ /**
+ * Accumulates rows into a chunk and returns chunk metadata cells.
+ * @param results Output list to populate with chunk metadata cells
+ * @return true if more chunks available, false if scanning complete
+ */
+ @Override
+ public boolean next(List results) throws IOException {
+ region.startRegionOperation();
+ try {
+ resetChunkState();
+ RegionScanner localScanner = delegate;
+ synchronized (localScanner) {
+ List rowCells = new ArrayList<>();
+ while (hasMoreRows) {
+ ungroupedAggregateRegionObserver.checkForRegionClosingOrSplitting();
+ rowCells.clear();
+ hasMoreRows = localScanner.nextRaw(rowCells);
+ if (rowCells.isEmpty()) {
+ break;
+ }
+
+ byte[] rowKey = CellUtil.cloneRow(rowCells.get(0));
+ long rowSize = calculateRowSize(rowCells);
+ addRowToChunk(rowKey, rowCells, rowSize);
+ if (!isTargetScan && willExceedChunkLimits(rowSize)) {
+ break;
+ }
+ }
+ }
+ if (chunkStartKey == null) {
+ return false;
+ }
+
+ // checking if this next() call was Partial chunk. Only needed for target scan.
+ // Will be partial chunk until chunkEndKey < source chunk endKey
+ boolean isPartialChunk = isTargetScan && Bytes.compareTo(chunkEndKey, scan.getStopRow()) < 0;
+ buildChunkMetadataResult(results, isPartialChunk);
+ return hasMoreRows;
+
+ } catch (Throwable t) {
+ LOGGER.error(
+ "Exception during chunk scanning in region {} table {} at chunk startKey: {}, endkey: {})",
+ region.getRegionInfo().getRegionNameAsString(),
+ region.getRegionInfo().getTable().getNameAsString(),
+ chunkStartKey != null ? Bytes.toStringBinary(chunkStartKey) : "null",
+ chunkEndKey != null ? Bytes.toStringBinary(chunkEndKey) : "null", t);
+ throw t;
+ } finally {
+ region.closeRegionOperation();
+ }
+ }
+
+ @Override
+ public boolean next(List result, ScannerContext scannerContext) throws IOException {
+ return next(result);
+ }
+
+ /**
+ * Resets chunk state for a new chunk. Note: If this scanner was initialized with continued digest
+ * state, the first call to this method will NOT reset the digest, allowing us to continue hashing
+ * from the previous region's state.
+ */
+ private void resetChunkState() {
+ chunkStartKey = null;
+ chunkEndKey = null;
+ currentChunkSize = 0;
+ currentChunkRowCount = 0;
+ if (!isUsingContinuedDigest) {
+ digest.reset();
+ }
+ isUsingContinuedDigest = false;
+ }
+
+ private long calculateRowSize(List cells) {
+ long size = 0;
+ for (Cell cell : cells) {
+ size += PrivateCellUtil.estimatedSerializedSizeOf(cell);
+ }
+ return size;
+ }
+
+ private boolean willExceedChunkLimits(long rowSize) {
+ return currentChunkSize + rowSize > chunkSizeBytes;
+ }
+
+ /**
+ * Adds a row to the current chunk and updates digest
+ */
+ private void addRowToChunk(byte[] rowKey, List cells, long rowSize) {
+ // Set chunk start key on first row
+ if (chunkStartKey == null) {
+ chunkStartKey = rowKey;
+ }
+ chunkEndKey = rowKey;
+ currentChunkSize += rowSize;
+ currentChunkRowCount++;
+ updateDigestWithRow(rowKey, cells);
+ }
+
+ /**
+ * Updates the SHA-256 digest with data from a row. Hash includes: row key + cell family + cell
+ * qualifier + cell timestamp + cell type + cell value. This ensures that any difference in the
+ * data will result in different hashes. Optimized to avoid cloning - reads directly from cell's
+ * backing arrays (zero-copy).
+ */
+ private void updateDigestWithRow(byte[] rowKey, List cells) {
+ digest.update(rowKey, 0, rowKey.length);
+ for (Cell cell : cells) {
+ digest.update(cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength());
+ digest.update(cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength());
+ long ts = cell.getTimestamp();
+ // Big-Endian Byte Serialization
+ timestampBuffer[0] = (byte) (ts >>> 56);
+ timestampBuffer[1] = (byte) (ts >>> 48);
+ timestampBuffer[2] = (byte) (ts >>> 40);
+ timestampBuffer[3] = (byte) (ts >>> 32);
+ timestampBuffer[4] = (byte) (ts >>> 24);
+ timestampBuffer[5] = (byte) (ts >>> 16);
+ timestampBuffer[6] = (byte) (ts >>> 8);
+ timestampBuffer[7] = (byte) ts;
+ digest.update(timestampBuffer, 0, 8);
+
+ digest.update(cell.getType().getCode());
+ digest.update(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength());
+ }
+ }
+
+ /**
+ * Encodes a SHA256Digest state to a byte array with length prefix for validation. This
+ * production-grade implementation adds security checks for critical deployment: - Length prefix
+ * for validation and extensibility - Prevents malicious large allocations - Enables detection of
+ * corrupted serialization
+ * @param digest The digest whose state should be encoded
+ * @return Byte array containing 4-byte length prefix + encoded state
+ */
+ private byte[] encodeDigestState(SHA256Digest digest) {
+ byte[] encoded = digest.getEncodedState();
+ ByteBuffer buffer = ByteBuffer.allocate(4 + encoded.length);
+ buffer.putInt(encoded.length);
+ buffer.put(encoded);
+ return buffer.array();
+ }
+
+ /**
+ * Decodes a SHA256Digest state from a byte array.
+ * @param encodedState Byte array containing 4-byte length prefix + encoded state
+ * @return SHA256Digest restored to the saved state
+ * @throws IOException if state is invalid, corrupted, or security checks fail
+ */
+ private SHA256Digest decodeDigestState(byte[] encodedState) throws IOException {
+ if (encodedState == null) {
+ throw new IllegalArgumentException(
+ String.format("Invalid encoded digest state in region %s table %s: encodedState is null",
+ region.getRegionInfo().getRegionNameAsString(),
+ region.getRegionInfo().getTable().getNameAsString()));
+ }
+
+ DataInputStream dis = new DataInputStream(new ByteArrayInputStream(encodedState));
+ int stateLength = dis.readInt();
+ // Prevent malicious large allocations, hash digest can never go beyond ~96 bytes, giving some
+ // buffer up to 128 Bytes
+ if (stateLength > MAX_SHA256_DIGEST_STATE_SIZE) {
+ throw new IllegalArgumentException(
+ String.format("Invalid SHA256 state length in region %s table %s: %d expected <= %d",
+ region.getRegionInfo().getRegionNameAsString(),
+ region.getRegionInfo().getTable().getNameAsString(), stateLength,
+ MAX_SHA256_DIGEST_STATE_SIZE));
+ }
+ byte[] state = new byte[stateLength];
+ dis.readFully(state);
+ return new SHA256Digest(state);
+ }
+
+ /**
+ * Builds chunk metadata result cells and adds them to the results list. Returns a single
+ * "row"[rowkey=chunkStartKey] with multiple cells containing chunk metadata[chunkEndKey,
+ * hash/digest, rowCount, hasMoreRows, isPartialChunk]. For complete chunks: includes final
+ * SHA-256 hash (32 bytes) For partial chunks: includes serialized MessageDigest state for
+ * continuation
+ * @param results Output list to populate with chunk metadata cells
+ * @param isPartialChunk true if this is a partial chunk (region boundary reached before
+ * completion)
+ */
+ private void buildChunkMetadataResult(List| results, boolean isPartialChunk)
+ throws IOException {
+ byte[] resultRowKey = this.chunkStartKey;
+ results.add(PhoenixKeyValueUtil.newKeyValue(resultRowKey, CHUNK_METADATA_FAMILY,
+ BaseScannerRegionObserverConstants.SYNC_TABLE_END_KEY_QUALIFIER, AGG_TIMESTAMP, chunkEndKey));
+ results.add(PhoenixKeyValueUtil.newKeyValue(resultRowKey, CHUNK_METADATA_FAMILY,
+ BaseScannerRegionObserverConstants.SYNC_TABLE_ROW_COUNT_QUALIFIER, AGG_TIMESTAMP,
+ Bytes.toBytes(currentChunkRowCount)));
+ if (isPartialChunk) {
+ // Partial chunk digest
+ byte[] digestState = encodeDigestState(digest);
+ results.add(PhoenixKeyValueUtil.newKeyValue(resultRowKey, CHUNK_METADATA_FAMILY,
+ BaseScannerRegionObserverConstants.SYNC_TABLE_IS_PARTIAL_CHUNK_QUALIFIER, AGG_TIMESTAMP,
+ TRUE_BYTES));
+ results.add(PhoenixKeyValueUtil.newKeyValue(resultRowKey, CHUNK_METADATA_FAMILY,
+ BaseScannerRegionObserverConstants.SYNC_TABLE_HASH_QUALIFIER, AGG_TIMESTAMP, digestState));
+ } else {
+ // Complete chunk - finalize and return hash
+ byte[] hash = new byte[digest.getDigestSize()];
+ digest.doFinal(hash, 0);
+ results.add(PhoenixKeyValueUtil.newKeyValue(resultRowKey, CHUNK_METADATA_FAMILY,
+ BaseScannerRegionObserverConstants.SYNC_TABLE_HASH_QUALIFIER, AGG_TIMESTAMP, hash));
+ results.add(PhoenixKeyValueUtil.newKeyValue(resultRowKey, CHUNK_METADATA_FAMILY,
+ BaseScannerRegionObserverConstants.SYNC_TABLE_IS_PARTIAL_CHUNK_QUALIFIER, AGG_TIMESTAMP,
+ FALSE_BYTES));
+ }
+ }
+
+ @Override
+ public void close() throws IOException {
+ try {
+ super.close();
+ } catch (Exception e) {
+ LOGGER.error("Error closing PhoenixSyncTableRegionScanner", e);
+ }
+ }
+}
diff --git a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
index d85f8005405..2015802a21c 100644
--- a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
+++ b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
@@ -417,6 +417,14 @@ public RegionScanner run() throws Exception {
return rebuildIndices(s, region, scan, env);
}
});
+ } else if (ScanUtil.isSyncTableChunkFormation(scan)) {
+ return User.runAsLoginUser(new PrivilegedExceptionAction() {
+ @Override
+ public RegionScanner run() throws Exception {
+ return new PhoenixSyncTableRegionScanner(s, region, scan, env,
+ UngroupedAggregateRegionObserver.this);
+ }
+ });
}
boolean useNewValueColumnQualifier = EncodedColumnsUtil.useNewValueColumnQualifier(scan);
diff --git a/phoenix-core-server/src/main/java/org/apache/phoenix/mapreduce/PhoenixNoOpSingleRecordReader.java b/phoenix-core-server/src/main/java/org/apache/phoenix/mapreduce/PhoenixNoOpSingleRecordReader.java
new file mode 100644
index 00000000000..28ec1ce4404
--- /dev/null
+++ b/phoenix-core-server/src/main/java/org/apache/phoenix/mapreduce/PhoenixNoOpSingleRecordReader.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.mapreduce;
+
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.db.DBInputFormat;
+import org.apache.hadoop.mapreduce.lib.db.DBWritable;
+
+/**
+ * A minimal RecordReader that returns exactly one dummy record per InputSplit.
+ *
+ * Use this when your mapper:
+ *
+ * - Doesn't need actual row data from the RecordReader
+ * - Only needs split/region boundaries (accessible via {@code context.getInputSplit()})
+ * - Delegates all work to a server-side coprocessor
+ *
+ *
+ * This avoids the overhead of scanning and returning all rows when the mapper only needs to be
+ * triggered once per region/split. The standard {@link PhoenixRecordReader} iterates through all
+ * rows, calling {@code map()} for each row - which is wasteful when the mapper ignores the row data
+ * entirely.
+ *
+ * How it works:
+ *
+ * - {@link #nextKeyValue()} returns {@code true} exactly once, then {@code false}
+ * - This triggers {@code map()} exactly once per InputSplit (region)
+ * - The mapper extracts region boundaries from the InputSplit, not from records
+ *
+ * @see PhoenixSyncTableInputFormat
+ * @see PhoenixRecordReader
+ */
+public class PhoenixNoOpSingleRecordReader extends RecordReader {
+
+ private boolean hasRecord = true;
+
+ /**
+ * Initialize the RecordReader. No initialization is needed since we return a single dummy record.
+ * @param split The InputSplit containing region boundaries
+ * @param context The task context
+ */
+ @Override
+ public void initialize(InputSplit split, TaskAttemptContext context) {
+ // No initialization needed
+ }
+
+ /**
+ * Returns true exactly once to trigger a single map() call per split.
+ * @return true on first call, false on subsequent calls which makes Mapper task to exit calling
+ * map method
+ */
+ @Override
+ public boolean nextKeyValue() {
+ if (hasRecord) {
+ hasRecord = false;
+ return true;
+ }
+ return false;
+ }
+
+ /**
+ * Returns a NullWritable key (mapper ignores this).
+ * @return NullWritable singleton
+ */
+ @Override
+ public NullWritable getCurrentKey() {
+ return NullWritable.get();
+ }
+
+ /**
+ * Returns a NullDBWritable value (mapper ignores this). The mapper extracts what it needs from
+ * the InputSplit, not from this value.
+ * @return A new NullDBWritable instance
+ */
+ @Override
+ public DBWritable getCurrentValue() {
+ return new DBInputFormat.NullDBWritable();
+ }
+
+ /**
+ * Returns progress: 0.0 before the record is consumed, 1.0 after.
+ * @return 0.0f if record not yet consumed, 1.0f otherwise
+ */
+ @Override
+ public float getProgress() {
+ return hasRecord ? 0.0f : 1.0f;
+ }
+
+ /**
+ * Close the RecordReader. Nothing to close since we hold no resources.
+ */
+ @Override
+ public void close() {
+ // Nothing to close
+ }
+}
diff --git a/phoenix-core-server/src/main/java/org/apache/phoenix/mapreduce/PhoenixSyncTableInputFormat.java b/phoenix-core-server/src/main/java/org/apache/phoenix/mapreduce/PhoenixSyncTableInputFormat.java
new file mode 100644
index 00000000000..76b01a9d14d
--- /dev/null
+++ b/phoenix-core-server/src/main/java/org/apache/phoenix/mapreduce/PhoenixSyncTableInputFormat.java
@@ -0,0 +1,210 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.mapreduce;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.phoenix.mapreduce.util.ConnectionUtil;
+import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil;
+import org.apache.phoenix.query.KeyRange;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * InputFormat designed for PhoenixSyncTableTool that generates splits based on HBase region
+ * boundaries. Filters out already-processed mapper regions using checkpoint data, enabling
+ * resumable sync jobs. Uses {@link PhoenixNoOpSingleRecordReader} to invoke the mapper once per
+ * split (region).
+ */
+public class PhoenixSyncTableInputFormat extends PhoenixInputFormat {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(PhoenixSyncTableInputFormat.class);
+
+ public PhoenixSyncTableInputFormat() {
+ super();
+ }
+
+ /**
+ * Returns a {@link PhoenixNoOpSingleRecordReader} that emits exactly one dummy record per split.
+ *
+ * PhoenixSyncTableMapper doesn't need actual row data from the RecordReader - it extracts region
+ * boundaries from the InputSplit and delegates all scanning to the PhoenixSyncTableRegionScanner
+ * coprocessor. Using PhoenixNoOpSingleRecordReader ensures that {@code map()} is called exactly
+ * once per region no matter what scan looks like, avoiding the overhead of the default
+ * PhoenixRecordReader which would call {@code map()} for every row of scan.
+ * @param split Input Split
+ * @return A PhoenixNoOpSingleRecordReader instance
+ */
+ @SuppressWarnings("rawtypes")
+ @Override
+ public RecordReader createRecordReader(InputSplit split, TaskAttemptContext context) {
+ return new PhoenixNoOpSingleRecordReader();
+ }
+
+ /**
+ * Generates InputSplits for the Phoenix sync table job, splits are done based on region boundary
+ * and then filter out already-completed regions using sync table checkpoint table.
+ */
+ @Override
+ public List getSplits(JobContext context) throws IOException, InterruptedException {
+ Configuration conf = context.getConfiguration();
+ String tableName = PhoenixConfigurationUtil.getPhoenixSyncTableName(conf);
+ String targetZkQuorum = PhoenixConfigurationUtil.getPhoenixSyncTableTargetZkQuorum(conf);
+ Long fromTime = PhoenixConfigurationUtil.getPhoenixSyncTableFromTime(conf);
+ Long toTime = PhoenixConfigurationUtil.getPhoenixSyncTableToTime(conf);
+ List allSplits = super.getSplits(context);
+ if (allSplits == null || allSplits.isEmpty()) {
+ throw new IOException(String.format(
+ "PhoenixInputFormat generated no splits for table %s. Check table exists and has regions.",
+ tableName));
+ }
+ LOGGER.info("Total splits generated {} of table {} for PhoenixSyncTable ", allSplits.size(),
+ tableName);
+ List completedRegions;
+ try {
+ completedRegions =
+ queryCompletedMapperRegions(conf, tableName, targetZkQuorum, fromTime, toTime);
+ } catch (SQLException e) {
+ throw new RuntimeException(e);
+ }
+ if (completedRegions.isEmpty()) {
+ LOGGER.info("No completed regions for table {} - processing all {} splits", tableName,
+ allSplits.size());
+ return allSplits;
+ }
+
+ List unprocessedSplits = filterCompletedSplits(allSplits, completedRegions);
+ LOGGER.info("Found {} completed mapper regions for table {}, {} unprocessed splits remaining",
+ completedRegions.size(), tableName, unprocessedSplits.size());
+ return unprocessedSplits;
+ }
+
+ /**
+ * Queries Sync checkpoint table for completed mapper regions
+ */
+ private List queryCompletedMapperRegions(Configuration conf, String tableName,
+ String targetZkQuorum, Long fromTime, Long toTime) throws SQLException {
+ List completedRegions = new ArrayList<>();
+ try (Connection conn = ConnectionUtil.getInputConnection(conf)) {
+ PhoenixSyncTableOutputRepository repository = new PhoenixSyncTableOutputRepository(conn);
+ List completedRows =
+ repository.getProcessedMapperRegions(tableName, targetZkQuorum, fromTime, toTime);
+ for (PhoenixSyncTableOutputRow row : completedRows) {
+ KeyRange keyRange = KeyRange.getKeyRange(row.getStartRowKey(), row.getEndRowKey());
+ completedRegions.add(keyRange);
+ }
+ }
+ return completedRegions;
+ }
+
+ /**
+ * Filters out splits that are fully contained within already completed mapper region boundary.
+ * @param allSplits All splits generated from region boundaries
+ * @param completedRegions Regions already verified (from checkpoint table)
+ * @return Splits that need processing
+ */
+ private List filterCompletedSplits(List allSplits,
+ List completedRegions) {
+ allSplits.sort((s1, s2) -> {
+ PhoenixInputSplit ps1 = (PhoenixInputSplit) s1;
+ PhoenixInputSplit ps2 = (PhoenixInputSplit) s2;
+ return KeyRange.COMPARATOR.compare(ps1.getKeyRange(), ps2.getKeyRange());
+ });
+ List unprocessedSplits = new ArrayList<>();
+ int splitIdx = 0;
+ int completedIdx = 0;
+
+ // Two pointer comparison across splitRange and completedRange
+ while (splitIdx < allSplits.size() && completedIdx < completedRegions.size()) {
+ PhoenixInputSplit split = (PhoenixInputSplit) allSplits.get(splitIdx);
+ KeyRange splitRange = split.getKeyRange();
+ KeyRange completedRange = completedRegions.get(completedIdx);
+ byte[] splitStart = splitRange.getLowerRange();
+ byte[] splitEnd = splitRange.getUpperRange();
+ byte[] completedStart = completedRange.getLowerRange();
+ byte[] completedEnd = completedRange.getUpperRange();
+
+ // No overlap b/w completedRange/splitRange.
+ // completedEnd is before splitStart, increment completed pointer to catch up. For scenario
+ // like below
+ // [----splitRange-----)
+ // [----completed----)
+ // If completedEnd is [], it means this is for last region, this check has no meaning.
+ if (
+ !Bytes.equals(completedEnd, HConstants.EMPTY_END_ROW)
+ && Bytes.compareTo(completedEnd, splitStart) <= 0
+ ) {
+ completedIdx++;
+ } else if (
+ !Bytes.equals(splitEnd, HConstants.EMPTY_END_ROW)
+ && Bytes.compareTo(completedStart, splitEnd) >= 0
+ ) {
+ // No overlap b/w completedRange/splitRange.
+ // splitEnd is before completedStart, add this splitRange to unprocessed. For scenario like
+ // below
+ // [----splitRange-----)
+ // [----completed----)
+ // If splitEnd is [], it means this is for last region, this check has no meaning.
+ unprocessedSplits.add(allSplits.get(splitIdx));
+ splitIdx++;
+ } else {
+ // Some overlap detected, check if SplitRange is fullyContained within completedRange
+ // [----splitRange-----)
+ // [----completed----) // partialContained -- unprocessedSplits
+ // OR
+ // [----splitRange-----)
+ // [----completed----) // partialContained -- unprocessedSplits
+ // OR
+ // [----splitRange-----------)
+ // [----completed--) // partialContained -- unprocessedSplits
+ // OR
+ // [----splitRange-----)
+ // [----completed----------) // fullyContained -- nothing to process
+ boolean startContained = Bytes.compareTo(completedStart, splitStart) <= 0;
+ // If we are at end of completedRange region, we can assume end boundary is always contained
+ // wrt splitRange
+ boolean endContained = Bytes.equals(completedEnd, HConstants.EMPTY_END_ROW)
+ || Bytes.compareTo(splitEnd, completedEnd) <= 0;
+
+ boolean fullyContained = startContained && endContained;
+ if (!fullyContained) {
+ unprocessedSplits.add(allSplits.get(splitIdx));
+ }
+ splitIdx++;
+ }
+ }
+
+ // Add any remaining splits (if completed regions exhausted)
+ // These splits cannot be contained since no completed regions left to check
+ while (splitIdx < allSplits.size()) {
+ unprocessedSplits.add(allSplits.get(splitIdx));
+ splitIdx++;
+ }
+ return unprocessedSplits;
+ }
+}
diff --git a/phoenix-core-server/src/main/java/org/apache/phoenix/mapreduce/PhoenixSyncTableMapper.java b/phoenix-core-server/src/main/java/org/apache/phoenix/mapreduce/PhoenixSyncTableMapper.java
new file mode 100644
index 00000000000..2920e81aae1
--- /dev/null
+++ b/phoenix-core-server/src/main/java/org/apache/phoenix/mapreduce/PhoenixSyncTableMapper.java
@@ -0,0 +1,668 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.mapreduce;
+
+import static org.apache.phoenix.schema.types.PDataType.TRUE_BYTES;
+
+import java.io.IOException;
+import java.security.MessageDigest;
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.sql.Timestamp;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Pair;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.lib.db.DBInputFormat;
+import org.apache.phoenix.coprocessorclient.BaseScannerRegionObserverConstants;
+import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.mapreduce.util.ConnectionUtil;
+import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil;
+import org.apache.phoenix.mapreduce.util.PhoenixMapReduceUtil;
+import org.apache.phoenix.query.KeyRange;
+import org.apache.phoenix.schema.PTable;
+import org.apache.phoenix.util.MetaDataUtil;
+import org.apache.phoenix.util.PhoenixRuntime;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.phoenix.thirdparty.com.google.common.annotations.VisibleForTesting;
+
+/**
+ * Mapper that acts as a driver for synchronizing table between source and target clusters. The
+ * actual work of chunking and hashing is done server-side by the coprocessor. This mapper fetches
+ * chunk hashes from both clusters, compares them and write to checkpoint table.
+ */
+public class PhoenixSyncTableMapper
+ extends Mapper {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(PhoenixSyncTableMapper.class);
+
+ public enum SyncCounters {
+ CHUNKS_VERIFIED,
+ CHUNKS_MISMATCHED,
+ SOURCE_ROWS_PROCESSED,
+ TARGET_ROWS_PROCESSED,
+ }
+
+ private String tableName;
+ private String targetZkQuorum;
+ private Long fromTime;
+ private Long toTime;
+ private boolean isDryRun;
+ private long chunkSizeBytes;
+ private Configuration conf;
+ private Connection sourceConnection;
+ private Connection targetConnection;
+ private Connection globalConnection;
+ private PTable pTable;
+ private byte[] physicalTableName;
+ private byte[] mapperRegionStart;
+ private byte[] mapperRegionEnd;
+ private PhoenixSyncTableOutputRepository syncTableOutputRepository;
+ private Timestamp mapperStartTime;
+
+ @Override
+ protected void setup(Context context) throws InterruptedException {
+ try {
+ super.setup(context);
+ mapperStartTime = new Timestamp(System.currentTimeMillis());
+ this.conf = context.getConfiguration();
+ tableName = PhoenixConfigurationUtil.getPhoenixSyncTableName(conf);
+ targetZkQuorum = PhoenixConfigurationUtil.getPhoenixSyncTableTargetZkQuorum(conf);
+ fromTime = PhoenixConfigurationUtil.getPhoenixSyncTableFromTime(conf);
+ toTime = PhoenixConfigurationUtil.getPhoenixSyncTableToTime(conf);
+ isDryRun = PhoenixConfigurationUtil.getPhoenixSyncTableDryRun(conf);
+ chunkSizeBytes = PhoenixConfigurationUtil.getPhoenixSyncTableChunkSizeBytes(conf);
+ extractRegionBoundariesFromSplit(context);
+ sourceConnection = ConnectionUtil.getInputConnection(conf);
+ pTable = sourceConnection.unwrap(PhoenixConnection.class).getTable(tableName);
+ physicalTableName = pTable.getPhysicalName().getBytes();
+ connectToTargetCluster();
+ globalConnection = createGlobalConnection(conf);
+ syncTableOutputRepository = new PhoenixSyncTableOutputRepository(globalConnection);
+ } catch (SQLException | IOException e) {
+ tryClosingResources();
+ throw new RuntimeException(
+ String.format("Failed to setup PhoenixSyncTableMapper for table: %s", tableName), e);
+ }
+ }
+
+ /**
+ * Extracts mapper region boundaries from the PhoenixInputSplit
+ */
+ private void extractRegionBoundariesFromSplit(Context context) {
+ PhoenixInputSplit split = (PhoenixInputSplit) context.getInputSplit();
+ KeyRange keyRange = split.getKeyRange();
+ if (keyRange == null) {
+ throw new IllegalStateException(String.format(
+ "PhoenixInputSplit has no KeyRange for table: %s . Cannot determine region boundaries for sync operation.",
+ tableName));
+ }
+ mapperRegionStart = keyRange.getLowerRange();
+ mapperRegionEnd = keyRange.getUpperRange();
+ }
+
+ /**
+ * Connects to the target cluster using the target ZK quorum, port, znode, krb principal
+ */
+ private void connectToTargetCluster() throws SQLException, IOException {
+ Configuration targetConf =
+ PhoenixMapReduceUtil.createConfigurationForZkQuorum(conf, targetZkQuorum);
+ targetConnection = ConnectionUtil.getInputConnection(targetConf);
+ }
+
+ /**
+ * Creates a global (non-tenant) connection for the checkpoint table.
+ */
+ private Connection createGlobalConnection(Configuration conf) throws SQLException {
+ Configuration globalConf = new Configuration(conf);
+ globalConf.unset(PhoenixConfigurationUtil.MAPREDUCE_TENANT_ID);
+ globalConf.unset(PhoenixRuntime.CURRENT_SCN_ATTRIB);
+ return ConnectionUtil.getInputConnection(globalConf);
+ }
+
+ /**
+ * Processes a mapper region by comparing chunks between source and target clusters. Gets already
+ * processed chunks from checkpoint table, resumes from check pointed progress and records final
+ * status for chunks & mapper (VERIFIED/MISMATCHED).
+ */
+ @Override
+ protected void map(NullWritable key, DBInputFormat.NullDBWritable value, Context context)
+ throws IOException, InterruptedException {
+ context.getCounter(PhoenixJobCounters.INPUT_RECORDS).increment(1);
+ try {
+ List processedChunks =
+ syncTableOutputRepository.getProcessedChunks(tableName, targetZkQuorum, fromTime, toTime,
+ mapperRegionStart, mapperRegionEnd);
+ List> unprocessedRanges =
+ calculateUnprocessedRanges(mapperRegionStart, mapperRegionEnd, processedChunks);
+
+ boolean isStartKeyInclusive = shouldStartKeyBeInclusive(mapperRegionStart, processedChunks);
+ for (Pair range : unprocessedRanges) {
+ processMapperRanges(range.getFirst(), range.getSecond(), isStartKeyInclusive, context);
+ isStartKeyInclusive = false;
+ }
+
+ long mismatchedChunk = context.getCounter(SyncCounters.CHUNKS_MISMATCHED).getValue();
+ long verifiedChunk = context.getCounter(SyncCounters.CHUNKS_VERIFIED).getValue();
+ long sourceRowsProcessed = context.getCounter(SyncCounters.SOURCE_ROWS_PROCESSED).getValue();
+ long targetRowsProcessed = context.getCounter(SyncCounters.TARGET_ROWS_PROCESSED).getValue();
+ Timestamp mapperEndTime = new Timestamp(System.currentTimeMillis());
+ String counters = formatMapperCounters(verifiedChunk, mismatchedChunk, sourceRowsProcessed,
+ targetRowsProcessed);
+
+ if (sourceRowsProcessed > 0) {
+ if (mismatchedChunk == 0) {
+ context.getCounter(PhoenixJobCounters.OUTPUT_RECORDS).increment(1);
+ syncTableOutputRepository.checkpointSyncTableResult(tableName, targetZkQuorum,
+ PhoenixSyncTableOutputRow.Type.MAPPER_REGION, fromTime, toTime, isDryRun,
+ mapperRegionStart, mapperRegionEnd, PhoenixSyncTableOutputRow.Status.VERIFIED,
+ mapperStartTime, mapperEndTime, counters);
+ LOGGER.info(
+ "PhoenixSyncTable mapper completed with verified: {} verified chunks, {} mismatched chunks",
+ verifiedChunk, mismatchedChunk);
+ } else {
+ context.getCounter(PhoenixJobCounters.FAILED_RECORDS).increment(1);
+ LOGGER.warn(
+ "PhoenixSyncTable mapper completed with mismatch: {} verified chunks, {} mismatched chunks",
+ verifiedChunk, mismatchedChunk);
+ syncTableOutputRepository.checkpointSyncTableResult(tableName, targetZkQuorum,
+ PhoenixSyncTableOutputRow.Type.MAPPER_REGION, fromTime, toTime, isDryRun,
+ mapperRegionStart, mapperRegionEnd, PhoenixSyncTableOutputRow.Status.MISMATCHED,
+ mapperStartTime, mapperEndTime, counters);
+ }
+ } else {
+ LOGGER.info(
+ "No rows pending to process. All mapper region boundaries are covered for startKey:{}, endKey: {}",
+ mapperRegionStart, mapperRegionEnd);
+ }
+ } catch (SQLException e) {
+ tryClosingResources();
+ throw new RuntimeException("Error processing PhoenixSyncTableMapper", e);
+ }
+ }
+
+ /**
+ * Processes a chunk range by comparing source and target cluster data. Source chunking: Breaks
+ * data into size-based chunks within given mapper region boundary. Target chunking: Follows
+ * source chunk boundaries exactly. Source chunk boundary might be split across multiple target
+ * region, if so corpoc signals for partial chunk with partial digest. Once entire Source chunk is
+ * covered by target scanner, we calculate resulting checksum from combined digest.
+ * @param rangeStart Range start key
+ * @param rangeEnd Range end key
+ * @param context Mapper context for progress and counters
+ * @throws IOException if scan fails
+ * @throws SQLException if database operations fail
+ */
+ private void processMapperRanges(byte[] rangeStart, byte[] rangeEnd, boolean isStartKeyInclusive,
+ Context context) throws IOException, SQLException {
+ try (ChunkScannerContext sourceScanner = createChunkScanner(sourceConnection, rangeStart,
+ rangeEnd, null, isStartKeyInclusive, false, false)) {
+ while (true) {
+ // We only try to get one chunked metadata row returned at a time until no more chunk
+ // returned(i.e null)
+ ChunkInfo sourceChunk = sourceScanner.getNextChunk();
+ if (sourceChunk == null) {
+ break;
+ }
+ sourceChunk.executionStartTime = new Timestamp(System.currentTimeMillis());
+ ChunkInfo targetChunk = getTargetChunkWithSourceBoundary(targetConnection,
+ sourceChunk.startKey, sourceChunk.endKey);
+
+ context.getCounter(SyncCounters.SOURCE_ROWS_PROCESSED).increment(sourceChunk.rowCount);
+ context.getCounter(SyncCounters.TARGET_ROWS_PROCESSED).increment(targetChunk.rowCount);
+ boolean matched = MessageDigest.isEqual(sourceChunk.hash, targetChunk.hash);
+ if (LOGGER.isDebugEnabled()) {
+ LOGGER.debug("Chunk comparison {}, {}: source={} rows, target={} rows, matched={}",
+ Bytes.toStringBinary(sourceChunk.startKey), Bytes.toStringBinary(sourceChunk.endKey),
+ sourceChunk.rowCount, targetChunk.rowCount, matched);
+ }
+ sourceChunk.executionEndTime = new Timestamp(System.currentTimeMillis());
+ String counters = formatChunkCounters(sourceChunk.rowCount, targetChunk.rowCount);
+ if (matched) {
+ handleVerifiedChunk(sourceChunk, context, counters);
+ } else {
+ handleMismatchedChunk(sourceChunk, context, counters);
+ }
+ context.progress();
+ }
+ }
+ LOGGER.info("Completed sync table processing of Mapper region boundary {}, {}",
+ Bytes.toStringBinary(rangeStart), Bytes.toStringBinary(rangeEnd));
+ }
+
+ /**
+ * Scans target across multiple regions and returns a single combined ChunkInfo. Handles partial
+ * chunks by passing digest state to next scanner via scan attributes, enabling cross-region
+ * digest continuation. Since we are scanning rows based on source chunk boundary, it could be
+ * distributed across multiple target regions. We keep on creating scanner across target region
+ * until entire source chunk boundary is processed or chunk is null
+ * @param conn Target connection
+ * @param startKey Source chunk start key
+ * @param endKey Source chunk end key
+ * @return Single ChunkInfo with final hash from all target regions
+ */
+ private ChunkInfo getTargetChunkWithSourceBoundary(Connection conn, byte[] startKey,
+ byte[] endKey) throws IOException, SQLException {
+ ChunkInfo combinedTargetChunk = new ChunkInfo();
+ combinedTargetChunk.startKey = startKey;
+ combinedTargetChunk.endKey = endKey;
+ combinedTargetChunk.hash = null;
+ combinedTargetChunk.rowCount = 0;
+ combinedTargetChunk.isPartial = false;
+ byte[] currentStartKey = startKey;
+ byte[] continuedDigestState = null;
+ boolean isStartKeyInclusive = true;
+ while (true) {
+ // We are creating a new scanner for every target region chunk.
+ // This chunk could be partial or full depending on whether the source region boundary is part
+ // of one or multiple target region.
+ // For every target region scanned, we want to have one row processed and returned back
+ // immediately(that's why we set scan.setLimit(1)/scan.setCaching(1)), since output from one
+ // region partial chunk
+ // scanner is input to next region scanner.
+ try (ChunkScannerContext scanner = createChunkScanner(conn, currentStartKey, endKey,
+ continuedDigestState, isStartKeyInclusive, true, true)) {
+ ChunkInfo chunk = scanner.getNextChunk();
+ // In a happy path where source and target rows are matching, target chunk would never be
+ // null.
+ // If chunk returned null, this would mean it couldn't find last source rows in target,
+ // since we only return isPartial=true until target chunk end key < source chunk endKey.
+ // Hash would still be digest if chunk returned is null and not a checksum, so would never
+ // match(which is expected).
+ // We could convert the digest to checksum but since it won't match anyhow, we don't need
+ // to.
+ if (chunk == null) {
+ break;
+ }
+ combinedTargetChunk.rowCount += chunk.rowCount;
+ // Updating it with either digest(when isPartial) or checksum(when all rows chunked)
+ combinedTargetChunk.hash = chunk.hash;
+ if (chunk.isPartial) {
+ continuedDigestState = chunk.hash;
+ currentStartKey = chunk.endKey;
+ isStartKeyInclusive = false;
+ } else {
+ break;
+ }
+ }
+ }
+ return combinedTargetChunk;
+ }
+
+ /**
+ * Creates a reusable scanner context for fetching chunks from a range.
+ * @param conn Connection to cluster (source or target)
+ * @param startKey Range start key (inclusive)
+ * @param endKey Range end key (exclusive)
+ * @param continuedDigestState If not null, coprocessor will continue hashing from this state (for
+ * cross-region continuation on target)
+ * @param isStartKeyInclusive Whether StartKey Inclusive
+ * @param isEndKeyInclusive Whether EndKey Inclusive
+ * @throws IOException scanner creation fails
+ * @throws SQLException hTable connection fails
+ */
+ private ChunkScannerContext createChunkScanner(Connection conn, byte[] startKey, byte[] endKey,
+ byte[] continuedDigestState, boolean isStartKeyInclusive, boolean isEndKeyInclusive,
+ boolean isTargetScan) throws IOException, SQLException {
+ // Not using try-with-resources since ChunkScannerContext owns the table lifecycle
+ Table hTable =
+ conn.unwrap(PhoenixConnection.class).getQueryServices().getTable(physicalTableName);
+ Scan scan =
+ createChunkScan(startKey, endKey, isStartKeyInclusive, isEndKeyInclusive, isTargetScan);
+ scan.setAttribute(BaseScannerRegionObserverConstants.SYNC_TABLE_CHUNK_FORMATION, TRUE_BYTES);
+ scan.setAttribute(BaseScannerRegionObserverConstants.SKIP_REGION_BOUNDARY_CHECK, TRUE_BYTES);
+ scan.setAttribute(BaseScannerRegionObserverConstants.UNGROUPED_AGG, TRUE_BYTES);
+ if (continuedDigestState != null && continuedDigestState.length > 0) {
+ scan.setAttribute(BaseScannerRegionObserverConstants.SYNC_TABLE_CONTINUED_DIGEST_STATE,
+ continuedDigestState);
+ }
+
+ if (!isTargetScan) {
+ scan.setAttribute(BaseScannerRegionObserverConstants.SYNC_TABLE_CHUNK_SIZE_BYTES,
+ Bytes.toBytes(chunkSizeBytes));
+ }
+ ResultScanner scanner = hTable.getScanner(scan);
+ return new ChunkScannerContext(hTable, scanner);
+ }
+
+ /**
+ * Parses chunk information from the coprocessor result. The PhoenixSyncTableRegionScanner returns
+ * cells with chunk metadata including SHA-256 hash (for complete chunks) or MessageDigest state
+ * (for partial chunks).
+ */
+ private ChunkInfo parseChunkInfo(Result result) {
+ List cells = Arrays.asList(result.rawCells());
+ Cell endKeyCell =
+ MetaDataUtil.getCell(cells, BaseScannerRegionObserverConstants.SYNC_TABLE_END_KEY_QUALIFIER);
+ Cell rowCountCell = MetaDataUtil.getCell(cells,
+ BaseScannerRegionObserverConstants.SYNC_TABLE_ROW_COUNT_QUALIFIER);
+ Cell isPartialChunkCell = MetaDataUtil.getCell(cells,
+ BaseScannerRegionObserverConstants.SYNC_TABLE_IS_PARTIAL_CHUNK_QUALIFIER);
+ Cell hashCell =
+ MetaDataUtil.getCell(cells, BaseScannerRegionObserverConstants.SYNC_TABLE_HASH_QUALIFIER);
+
+ if (
+ endKeyCell == null || rowCountCell == null || isPartialChunkCell == null || hashCell == null
+ ) {
+ throw new RuntimeException("Missing required chunk metadata cells.");
+ }
+
+ ChunkInfo info = new ChunkInfo();
+ info.startKey = result.getRow();
+ info.endKey = CellUtil.cloneValue(endKeyCell);
+ info.rowCount = Bytes.toLong(rowCountCell.getValueArray(), rowCountCell.getValueOffset(),
+ rowCountCell.getValueLength());
+ info.isPartial = isPartialChunkCell.getValueArray()[isPartialChunkCell.getValueOffset()] != 0;
+ info.hash = CellUtil.cloneValue(hashCell);
+ return info;
+ }
+
+ /**
+ * Formats chunk counters as a comma-separated string (optimized for hot path). Avoids
+ * LinkedHashMap allocation by building string directly.
+ * @param sourceRows Source rows processed
+ * @param targetRows Target rows processed
+ * @return Formatted string: "SOURCE_ROWS_PROCESSED=123,TARGET_ROWS_PROCESSED=456"
+ */
+ private String formatChunkCounters(long sourceRows, long targetRows) {
+ return String.format("%s=%d,%s=%d", SyncCounters.SOURCE_ROWS_PROCESSED.name(), sourceRows,
+ SyncCounters.TARGET_ROWS_PROCESSED.name(), targetRows);
+ }
+
+ /**
+ * Formats mapper counters as a comma-separated string. Avoids LinkedHashMap allocation by
+ * building string directly.
+ * @param chunksVerified Chunks verified count
+ * @param chunksMismatched Chunks mismatched count
+ * @param sourceRows Source rows processed
+ * @param targetRows Target rows processed
+ * @return Formatted string with all mapper counters
+ */
+ private String formatMapperCounters(long chunksVerified, long chunksMismatched, long sourceRows,
+ long targetRows) {
+ return String.format("%s=%d,%s=%d,%s=%d,%s=%d", SyncCounters.CHUNKS_VERIFIED.name(),
+ chunksVerified, SyncCounters.CHUNKS_MISMATCHED.name(), chunksMismatched,
+ SyncCounters.SOURCE_ROWS_PROCESSED.name(), sourceRows,
+ SyncCounters.TARGET_ROWS_PROCESSED.name(), targetRows);
+ }
+
+ /***
+ *
+ */
+ private void handleVerifiedChunk(ChunkInfo sourceChunk, Context context, String counters)
+ throws SQLException {
+ syncTableOutputRepository.checkpointSyncTableResult(tableName, targetZkQuorum,
+ PhoenixSyncTableOutputRow.Type.CHUNK, fromTime, toTime, isDryRun, sourceChunk.startKey,
+ sourceChunk.endKey, PhoenixSyncTableOutputRow.Status.VERIFIED, sourceChunk.executionStartTime,
+ sourceChunk.executionEndTime, counters);
+ context.getCounter(SyncCounters.CHUNKS_VERIFIED).increment(1);
+ }
+
+ /***
+ *
+ */
+ private void handleMismatchedChunk(ChunkInfo sourceChunk, Context context, String counters)
+ throws SQLException {
+ LOGGER.warn("Chunk mismatch detected for table: {}, with startKey: {}, endKey {}", tableName,
+ Bytes.toStringBinary(sourceChunk.startKey), Bytes.toStringBinary(sourceChunk.endKey));
+ syncTableOutputRepository.checkpointSyncTableResult(tableName, targetZkQuorum,
+ PhoenixSyncTableOutputRow.Type.CHUNK, fromTime, toTime, isDryRun, sourceChunk.startKey,
+ sourceChunk.endKey, PhoenixSyncTableOutputRow.Status.MISMATCHED,
+ sourceChunk.executionStartTime, sourceChunk.executionEndTime, counters);
+
+ context.getCounter(SyncCounters.CHUNKS_MISMATCHED).increment(1);
+ }
+
+ /**
+ * Creates a Hbase raw scan for a chunk range to capture all cell versions and delete markers.
+ */
+ private Scan createChunkScan(byte[] startKey, byte[] endKey, boolean isStartKeyInclusive,
+ boolean isEndKeyInclusive, boolean isTargetScan) throws IOException {
+ Scan scan = new Scan();
+ scan.withStartRow(startKey, isStartKeyInclusive);
+ scan.withStopRow(endKey, isEndKeyInclusive);
+ scan.setRaw(true);
+ scan.readAllVersions();
+ scan.setCacheBlocks(false);
+ scan.setTimeRange(fromTime, toTime);
+ if (isTargetScan) {
+ scan.setLimit(1);
+ scan.setCaching(1);
+ }
+ return scan;
+ }
+
+ /**
+ * Calculates unprocessed gaps within a mapper region. Since a mapper region is divided into
+ * multiple chunks and we store mapper region boundary and its chunked boundary. Once we have all
+ * the processedChunks which falls in this Mapper region boundary, we look for holes/gaps in
+ * mapper region boundary which haven't been processed as chunks. Given a list of processed
+ * chunks, returns the ranges that haven't been processed yet. This will be useful on retries if
+ * Region boundary has changed and we some chunks in the new region boundary has been processed
+ * which can be skipped
+ * @param mapperRegionStart Start of mapper region
+ * @param mapperRegionEnd End of mapper region
+ * @param processedChunks List of already-processed chunks from getProcessedChunks()
+ * @return List of (startKey, endKey) pairs representing unprocessed ranges
+ */
+ @VisibleForTesting
+ public List> calculateUnprocessedRanges(byte[] mapperRegionStart,
+ byte[] mapperRegionEnd, List processedChunks) {
+ List> gaps = new ArrayList<>();
+ // If processedChunks is null or empty, the entire mapper region needs processing
+ if (processedChunks == null || processedChunks.isEmpty()) {
+ gaps.add(new Pair<>(mapperRegionStart, mapperRegionEnd));
+ return gaps;
+ }
+
+ // Since chunk keys are always inclusive(start/endKey) it would never be null/empty.
+ // But Mapper region boundary can be empty i.e [] for start/end region of table.
+ // We would be doing byte comparison as part of identifying gaps and empty bytes
+ // needs to be considered as special case as comparison won't work on them.
+ boolean isStartRegionOfTable = mapperRegionStart == null || mapperRegionStart.length == 0;
+ boolean isEndRegionOfTable = mapperRegionEnd == null || mapperRegionEnd.length == 0;
+
+ // Track our scanning position through the mapper region as we iterate through chunks
+ byte[] scanPos = mapperRegionStart;
+
+ // With entire Mapper region boundary, we iterate over each chunk and if any gap/hole identified
+ // in Mapper region range which is not covered by processed chunk, we add it to gaps list.
+ // Since chunks are sorted and non-overlapping, only first/last chunks
+ // need boundary clipping. All middle chunks are guaranteed to be within region boundaries.
+ for (int i = 0; i < processedChunks.size(); i++) {
+ PhoenixSyncTableOutputRow chunk = processedChunks.get(i);
+ byte[] chunkStart = chunk.getStartRowKey();
+ byte[] chunkEnd = chunk.getEndRowKey();
+ boolean initialChunk = i == 0;
+ boolean lastChunk = i == processedChunks.size() - 1;
+
+ // Determine effective start boundary for this chunk
+ // Only the first chunk might start before mapperRegionStart and need clipping
+ byte[] effectiveStart;
+ if (initialChunk && !isStartRegionOfTable) {
+ // initialChunk chunk, clip boundary outside of Mapper region.
+ // Example: Mapper region [20, 85), first chunk [10, 30]
+ // effectiveStart = max(10, 20) = 20
+ effectiveStart =
+ Bytes.compareTo(chunkStart, mapperRegionStart) > 0 ? chunkStart : mapperRegionStart;
+ } else {
+ // isFirstRegionOfTable -> Mapper region [,80) effectiveStart = chunkStart
+ // Not an initial chunks: chunk start guaranteed to be within region boundaries, no clipping
+ // needed
+ effectiveStart = chunkStart;
+ }
+
+ // Determine effective end boundary for this chunk
+ // Only the last chunk might extend beyond mapperRegionEnd and need clipping
+ byte[] effectiveEnd;
+ if (lastChunk && !isEndRegionOfTable) {
+ // last Chunk, clip boundary outside of Mapper region.
+ // Example: Mapper region [20, 85), last chunk [70, 90]
+ // → effectiveEnd = min(90, 85) = 85
+ effectiveEnd = Bytes.compareTo(chunkEnd, mapperRegionEnd) < 0 ? chunkEnd : mapperRegionEnd;
+ } else {
+ // isLastRegionOfTable -> Mapper region [80,) effectiveEnd = chunkEnd
+ // Not last chunk: chunk end is guaranteed to be within region boundaries, no clipping
+ // needed
+ effectiveEnd = chunkEnd;
+ }
+
+ // Check for gap BEFORE this chunk
+ // If there's space between our current position and where this chunk starts, that's a gap
+ // that needs processing
+ // Example: scanPos=30 (processed till this key), effectiveStart=70 (chunk start key)
+ // Gap detected: [30, 70) needs processing
+ if (Bytes.compareTo(scanPos, effectiveStart) < 0) {
+ gaps.add(new Pair<>(scanPos, effectiveStart));
+ }
+ // We've now "processed" up to this key
+ scanPos = effectiveEnd;
+ }
+
+ // Since Mapper region end is exclusive, we want to add any remaining key boundary as gaps
+ // except when scanPos == mapperRegionEnd (i.e end of Mapper region boundary got covered by
+ // chunk)
+ if (isEndRegionOfTable || Bytes.compareTo(scanPos, mapperRegionEnd) < 0) {
+ gaps.add(new Pair<>(scanPos, mapperRegionEnd));
+ }
+ return gaps;
+ }
+
+ /***
+ * Checking if start key should be inclusive, this is specific to scenario when there are
+ * processed chunks within this Mapper region boundary. [---MapperRegion---------------)
+ * [--chunk1--] [--chunk2--] // With processed chunk, for this specific scenario, only we need to
+ * have first unprocessedRanges startKeyInclusive = true, for unprocessedRanges, their startkey
+ * would be false, since it would have been already covered by processed chunk
+ * [---MapperRegion---------------) [--chunk1--] [--chunk2--] // In such scenario, we don't want
+ * startKeyInclusive for any unprocessedRanges
+ */
+ boolean shouldStartKeyBeInclusive(byte[] mapperRegionStart,
+ List processedChunks) {
+ if (
+ mapperRegionStart == null || mapperRegionStart.length == 0 || processedChunks == null
+ || processedChunks.isEmpty()
+ ) {
+ return true;
+ }
+ return Bytes.compareTo(processedChunks.get(0).getStartRowKey(), mapperRegionStart) > 0;
+ }
+
+ @Override
+ protected void cleanup(Context context) throws IOException, InterruptedException {
+ tryClosingResources();
+ super.cleanup(context);
+ }
+
+ private void tryClosingResources() {
+ if (sourceConnection != null) {
+ try {
+ sourceConnection.close();
+ } catch (SQLException e) {
+ LOGGER.error("Error while closing source connection in PhoenixSyncTableMapper", e);
+ }
+ }
+ if (targetConnection != null) {
+ try {
+ targetConnection.close();
+ } catch (SQLException e) {
+ LOGGER.error("Error while closing target connection in PhoenixSyncTableMapper", e);
+ }
+ }
+ if (globalConnection != null) {
+ try {
+ globalConnection.close();
+ } catch (SQLException e) {
+ LOGGER.error("Error while closing output connection in PhoenixSyncTableMapper", e);
+ }
+ }
+ }
+
+ /**
+ * Hold chunk metadata returned from coprocessor
+ */
+ private static class ChunkInfo {
+ byte[] startKey;
+ byte[] endKey;
+ byte[] hash;
+ long rowCount;
+ boolean isPartial;
+ boolean hasMoreRowsInRegion;
+ Timestamp executionStartTime;
+ Timestamp executionEndTime;
+
+ @Override
+ public String toString() {
+ return String.format("Chunk[start=%s, end=%s, rows=%d, partial=%s, hasMoreRowsInRegion=%s]",
+ Bytes.toStringBinary(startKey), Bytes.toStringBinary(endKey), rowCount, isPartial,
+ hasMoreRowsInRegion);
+ }
+ }
+
+ /**
+ * Holds a ResultScanner and Table reference for reuse across multiple chunks.
+ */
+ private class ChunkScannerContext implements AutoCloseable {
+ private final Table table;
+ private final ResultScanner scanner;
+
+ ChunkScannerContext(Table table, ResultScanner scanner) {
+ this.table = table;
+ this.scanner = scanner;
+ }
+
+ /**
+ * Fetches the next chunk from the scanner. Each call retrieves one chunk's metadata from the
+ * server-side coprocessor.
+ * @return ChunkInfo or null if no more chunks available for region
+ * @throws IOException if scan fails
+ */
+ ChunkInfo getNextChunk() throws IOException {
+ Result result = scanner.next();
+ if (result == null || result.isEmpty()) {
+ return null;
+ }
+ return parseChunkInfo(result);
+ }
+
+ @Override
+ public void close() throws IOException {
+ try {
+ if (scanner != null) {
+ scanner.close();
+ }
+ } finally {
+ if (table != null) {
+ table.close();
+ }
+ }
+ }
+ }
+}
diff --git a/phoenix-core-server/src/main/java/org/apache/phoenix/mapreduce/PhoenixSyncTableOutputRepository.java b/phoenix-core-server/src/main/java/org/apache/phoenix/mapreduce/PhoenixSyncTableOutputRepository.java
new file mode 100644
index 00000000000..e66a94067bf
--- /dev/null
+++ b/phoenix-core-server/src/main/java/org/apache/phoenix/mapreduce/PhoenixSyncTableOutputRepository.java
@@ -0,0 +1,237 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.mapreduce;
+
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.sql.Timestamp;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.phoenix.mapreduce.PhoenixSyncTableOutputRow.Status;
+import org.apache.phoenix.mapreduce.PhoenixSyncTableOutputRow.Type;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Repository for managing the PHOENIX_SYNC_TABLE_CHECKPOINT table. This table stores checkpoint
+ * information for the PhoenixSyncTableTool, enabling: 1. Mapper Level checkpointing (skip completed
+ * mapper regions on restart) 2. Chunk level checkpointing (skip completed chunks)
+ */
+public class PhoenixSyncTableOutputRepository {
+
+ private static final Logger LOGGER =
+ LoggerFactory.getLogger(PhoenixSyncTableOutputRepository.class);
+ public static final String SYNC_TABLE_CHECKPOINT_TABLE_NAME = "PHOENIX_SYNC_TABLE_CHECKPOINT";
+ private static final int OUTPUT_TABLE_TTL_SECONDS = 30 * 24 * 60 * 60;
+ private final Connection connection;
+ private static final byte[] EMPTY_START_KEY_SENTINEL = new byte[] { 0x00 };
+ private static final String UPSERT_CHECKPOINT_SQL =
+ "UPSERT INTO " + SYNC_TABLE_CHECKPOINT_TABLE_NAME
+ + " (TABLE_NAME, TARGET_CLUSTER, TYPE, FROM_TIME, TO_TIME, IS_DRY_RUN,"
+ + " START_ROW_KEY, END_ROW_KEY, IS_FIRST_REGION, EXECUTION_START_TIME, EXECUTION_END_TIME,"
+ + " STATUS, COUNTERS) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)";
+
+ /**
+ * Creates a repository for managing sync table checkpoint operations. Note: The connection is
+ * stored as-is and shared across operations. The caller retains ownership and is responsible for
+ * connection lifecycle.
+ * @param connection Phoenix connection (must remain open for repository lifetime)
+ */
+ public PhoenixSyncTableOutputRepository(Connection connection) {
+ this.connection = connection;
+ }
+
+ public void createSyncCheckpointTableIfNotExists() throws SQLException {
+ String ddl = "CREATE TABLE IF NOT EXISTS " + SYNC_TABLE_CHECKPOINT_TABLE_NAME + " (\n"
+ + " TABLE_NAME VARCHAR NOT NULL,\n" + " TARGET_CLUSTER VARCHAR NOT NULL,\n"
+ + " TYPE VARCHAR(20) NOT NULL,\n" + " FROM_TIME BIGINT NOT NULL,\n"
+ + " TO_TIME BIGINT NOT NULL,\n" + " IS_DRY_RUN BOOLEAN NOT NULL,\n"
+ + " START_ROW_KEY VARBINARY NOT NULL,\n" + " END_ROW_KEY VARBINARY,\n"
+ + " IS_FIRST_REGION BOOLEAN, \n" + " EXECUTION_START_TIME TIMESTAMP,\n"
+ + " EXECUTION_END_TIME TIMESTAMP,\n" + " STATUS VARCHAR(20),\n"
+ + " COUNTERS VARCHAR(255), \n" + " CONSTRAINT PK PRIMARY KEY (\n"
+ + " TABLE_NAME,\n" + " TARGET_CLUSTER,\n" + " TYPE ,\n"
+ + " FROM_TIME,\n" + " TO_TIME,\n" + " IS_DRY_RUN,\n"
+ + " START_ROW_KEY )" + ") TTL=" + OUTPUT_TABLE_TTL_SECONDS;
+
+ try (Statement stmt = connection.createStatement()) {
+ stmt.execute(ddl);
+ connection.commit();
+ LOGGER.info("Successfully created or verified existence of {} table",
+ SYNC_TABLE_CHECKPOINT_TABLE_NAME);
+ }
+ }
+
+ public void checkpointSyncTableResult(String tableName, String targetCluster, Type type,
+ Long fromTime, Long toTime, boolean isDryRun, byte[] startKey, byte[] endKey, Status status,
+ Timestamp executionStartTime, Timestamp executionEndTime, String counters) throws SQLException {
+
+ // Validate required parameters
+ if (tableName == null || tableName.isEmpty()) {
+ throw new IllegalArgumentException("TableName cannot be null or empty for checkpoint");
+ }
+ if (targetCluster == null || targetCluster.isEmpty()) {
+ throw new IllegalArgumentException("TargetCluster cannot be null or empty for checkpoint");
+ }
+ if (type == null) {
+ throw new IllegalArgumentException("Type cannot be null for checkpoint");
+ }
+ if (fromTime == null || toTime == null) {
+ throw new IllegalArgumentException("FromTime and ToTime cannot be null for checkpoint");
+ }
+
+ byte[] effectiveStartKey =
+ (startKey == null || startKey.length == 0) ? EMPTY_START_KEY_SENTINEL : startKey;
+ boolean isFirstRegion = startKey == null || startKey.length == 0;
+
+ try (PreparedStatement ps = connection.prepareStatement(UPSERT_CHECKPOINT_SQL)) {
+ ps.setString(1, tableName);
+ ps.setString(2, targetCluster);
+ ps.setString(3, type.name());
+ ps.setLong(4, fromTime);
+ ps.setLong(5, toTime);
+ ps.setBoolean(6, isDryRun);
+ ps.setBytes(7, effectiveStartKey);
+ ps.setBytes(8, endKey);
+ ps.setBoolean(9, isFirstRegion);
+ ps.setTimestamp(10, executionStartTime);
+ ps.setTimestamp(11, executionEndTime);
+ ps.setString(12, status != null ? status.name() : null);
+ ps.setString(13, counters);
+ ps.executeUpdate();
+ connection.commit();
+ }
+ }
+
+ /**
+ * Converts stored key back to HBase empty key if needed. For first region(empty startKey),
+ * converts EMPTY_START_KEY_SENTINEL back to HConstants.EMPTY_BYTE_ARRAY.
+ */
+ private byte[] toHBaseKey(byte[] storedKey, boolean isFirstRegion) {
+ if (isFirstRegion && Arrays.equals(storedKey, EMPTY_START_KEY_SENTINEL)) {
+ return HConstants.EMPTY_BYTE_ARRAY;
+ }
+ return storedKey;
+ }
+
+ /**
+ * Queries for completed mapper regions. Used by PhoenixSyncTableInputFormat to filter out
+ * already-processed regions.
+ * @param tableName Source table name
+ * @param targetCluster Target cluster ZK quorum
+ * @param fromTime Start timestamp (nullable)
+ * @param toTime End timestamp (nullable)
+ * @return List of completed mapper regions
+ */
+ public List getProcessedMapperRegions(String tableName,
+ String targetCluster, Long fromTime, Long toTime) throws SQLException {
+
+ String query = "SELECT START_ROW_KEY, END_ROW_KEY, IS_FIRST_REGION FROM "
+ + SYNC_TABLE_CHECKPOINT_TABLE_NAME + " WHERE TABLE_NAME = ? AND TARGET_CLUSTER = ?"
+ + " AND TYPE = ? AND FROM_TIME = ? AND TO_TIME = ? AND STATUS IN ( ?, ?)";
+ List results = new ArrayList<>();
+ try (PreparedStatement ps = connection.prepareStatement(query)) {
+ int paramIndex = 1;
+ ps.setString(paramIndex++, tableName);
+ ps.setString(paramIndex++, targetCluster);
+ ps.setString(paramIndex++, Type.MAPPER_REGION.name());
+ ps.setLong(paramIndex++, fromTime);
+ ps.setLong(paramIndex++, toTime);
+ ps.setString(paramIndex++, Status.VERIFIED.name());
+ ps.setString(paramIndex, Status.MISMATCHED.name());
+ try (ResultSet rs = ps.executeQuery()) {
+ while (rs.next()) {
+ boolean isFirstRegion = rs.getBoolean("IS_FIRST_REGION");
+ PhoenixSyncTableOutputRow row = new PhoenixSyncTableOutputRow.Builder()
+ .setStartRowKey(this.toHBaseKey(rs.getBytes("START_ROW_KEY"), isFirstRegion))
+ .setEndRowKey(rs.getBytes("END_ROW_KEY")).build();
+ results.add(row);
+ }
+ }
+ }
+ return results;
+ }
+
+ /**
+ * Queries for processed chunks. Used by PhoenixSyncTableMapper to skip already-processed chunks.
+ * @param tableName Source table name
+ * @param targetCluster Target cluster ZK quorum
+ * @param fromTime Start timestamp (nullable)
+ * @param toTime End timestamp (nullable)
+ * @param mapperRegionStart Mapper region start key
+ * @param mapperRegionEnd Mapper region end key
+ * @return List of processed chunks in the region
+ */
+ public List getProcessedChunks(String tableName, String targetCluster,
+ Long fromTime, Long toTime, byte[] mapperRegionStart, byte[] mapperRegionEnd)
+ throws SQLException {
+ StringBuilder queryBuilder = new StringBuilder();
+ queryBuilder.append("SELECT START_ROW_KEY, END_ROW_KEY, IS_FIRST_REGION FROM "
+ + SYNC_TABLE_CHECKPOINT_TABLE_NAME + " WHERE TABLE_NAME = ? AND TARGET_CLUSTER = ? "
+ + " AND TYPE = ? AND FROM_TIME = ? AND TO_TIME = ?");
+
+ // Check if mapper region boundaries are non-empty (i.e., NOT first/last regions)
+ // Only add boundary conditions for non-empty boundaries
+ boolean hasEndBoundary = mapperRegionEnd != null && mapperRegionEnd.length > 0;
+ boolean hasStartBoundary = mapperRegionStart != null && mapperRegionStart.length > 0;
+
+ // Filter chunks that overlap with this mapper region:
+ // - Chunk overlaps if: chunkStart < mapperRegionEnd (when end boundary exists)
+ // - Chunk overlaps if: chunkEnd > mapperRegionStart (when start boundary exists)
+ if (hasEndBoundary) {
+ queryBuilder.append(" AND START_ROW_KEY <= ?");
+ }
+ if (hasStartBoundary) {
+ queryBuilder.append(" AND END_ROW_KEY >= ?");
+ }
+ queryBuilder.append(" AND STATUS IN (?, ?)");
+
+ List results = new ArrayList<>();
+ try (PreparedStatement ps = connection.prepareStatement(queryBuilder.toString())) {
+ int paramIndex = 1;
+ ps.setString(paramIndex++, tableName);
+ ps.setString(paramIndex++, targetCluster);
+ ps.setString(paramIndex++, Type.CHUNK.name());
+ ps.setLong(paramIndex++, fromTime);
+ ps.setLong(paramIndex++, toTime);
+ if (hasEndBoundary) {
+ ps.setBytes(paramIndex++, mapperRegionEnd);
+ }
+ if (hasStartBoundary) {
+ ps.setBytes(paramIndex++, mapperRegionStart);
+ }
+ ps.setString(paramIndex++, Status.VERIFIED.name());
+ ps.setString(paramIndex, Status.MISMATCHED.name());
+ try (ResultSet rs = ps.executeQuery()) {
+ while (rs.next()) {
+ PhoenixSyncTableOutputRow row = new PhoenixSyncTableOutputRow.Builder()
+ .setStartRowKey(
+ this.toHBaseKey(rs.getBytes("START_ROW_KEY"), rs.getBoolean("IS_FIRST_REGION")))
+ .setEndRowKey(rs.getBytes("END_ROW_KEY")).build();
+ results.add(row);
+ }
+ }
+ }
+ return results;
+ }
+}
diff --git a/phoenix-core-server/src/main/java/org/apache/phoenix/mapreduce/PhoenixSyncTableOutputRow.java b/phoenix-core-server/src/main/java/org/apache/phoenix/mapreduce/PhoenixSyncTableOutputRow.java
new file mode 100644
index 00000000000..3e45435ae4f
--- /dev/null
+++ b/phoenix-core-server/src/main/java/org/apache/phoenix/mapreduce/PhoenixSyncTableOutputRow.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.mapreduce;
+
+import java.util.Arrays;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.util.Bytes;
+
+/**
+ * Data model class representing required row in the PHOENIX_SYNC_TABLE_CHECKPOINT table
+ */
+public class PhoenixSyncTableOutputRow {
+
+ public enum Type {
+ CHUNK,
+ MAPPER_REGION
+ }
+
+ public enum Status {
+ VERIFIED,
+ MISMATCHED
+ }
+
+ private byte[] startRowKey;
+ private byte[] endRowKey;
+
+ @Override
+ public String toString() {
+ return String.format("SyncOutputRow[start=%s, end=%s]", Bytes.toStringBinary(startRowKey),
+ Bytes.toStringBinary(endRowKey));
+ }
+
+ public byte[] getStartRowKey() {
+ return startRowKey != null ? Arrays.copyOf(startRowKey, startRowKey.length) : null;
+ }
+
+ public byte[] getEndRowKey() {
+ return endRowKey != null ? Arrays.copyOf(endRowKey, endRowKey.length) : null;
+ }
+
+ /**
+ * Builder for PhoenixSyncTableOutputRow
+ */
+ public static class Builder {
+ private final PhoenixSyncTableOutputRow row;
+
+ public Builder() {
+ this.row = new PhoenixSyncTableOutputRow();
+ }
+
+ public Builder setStartRowKey(byte[] startRowKey) {
+ row.startRowKey = Arrays.copyOf(startRowKey, startRowKey.length);
+ return this;
+ }
+
+ public Builder setEndRowKey(byte[] endRowKey) {
+ row.endRowKey = (endRowKey == null || endRowKey.length == 0)
+ ? HConstants.EMPTY_END_ROW
+ : Arrays.copyOf(endRowKey, endRowKey.length);
+ return this;
+ }
+
+ public PhoenixSyncTableOutputRow build() {
+ if (row.startRowKey == null) {
+ throw new IllegalStateException("Start row key is required");
+ }
+ return row;
+ }
+ }
+}
diff --git a/phoenix-core-server/src/main/java/org/apache/phoenix/mapreduce/PhoenixSyncTableTool.java b/phoenix-core-server/src/main/java/org/apache/phoenix/mapreduce/PhoenixSyncTableTool.java
new file mode 100644
index 00000000000..12b494ccc44
--- /dev/null
+++ b/phoenix-core-server/src/main/java/org/apache/phoenix/mapreduce/PhoenixSyncTableTool.java
@@ -0,0 +1,423 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.mapreduce;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.util.Arrays;
+import java.util.Properties;
+import org.apache.commons.lang3.exception.ExceptionUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
+import org.apache.hadoop.mapreduce.Counters;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.MRJobConfig;
+import org.apache.hadoop.mapreduce.lib.db.DBInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.phoenix.coprocessor.PhoenixSyncTableRegionScanner;
+import org.apache.phoenix.mapreduce.util.ConnectionUtil;
+import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil;
+import org.apache.phoenix.mapreduce.util.PhoenixMapReduceUtil;
+import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.query.QueryServicesOptions;
+import org.apache.phoenix.schema.PTable;
+import org.apache.phoenix.schema.PTableType;
+import org.apache.phoenix.util.EnvironmentEdgeManager;
+import org.apache.phoenix.util.SchemaUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.phoenix.thirdparty.org.apache.commons.cli.CommandLine;
+import org.apache.phoenix.thirdparty.org.apache.commons.cli.CommandLineParser;
+import org.apache.phoenix.thirdparty.org.apache.commons.cli.DefaultParser;
+import org.apache.phoenix.thirdparty.org.apache.commons.cli.HelpFormatter;
+import org.apache.phoenix.thirdparty.org.apache.commons.cli.Option;
+import org.apache.phoenix.thirdparty.org.apache.commons.cli.Options;
+import org.apache.phoenix.thirdparty.org.apache.commons.cli.ParseException;
+
+/**
+ * A MapReduce tool for verifying and detecting data inconsistencies between Phoenix tables across
+ * two HBase clusters (source and target).
+ * Use Case This tool is designed for replication/migration verification scenarios where
+ * data is replicated from a source Phoenix cluster to a target cluster. It efficiently detects
+ * which data chunks are out of sync without transferring all the data over the network.
+ * How It Works
+ *
+ * - Job Setup: The tool creates a MapReduce job that partitions the table into mapper
+ * regions based on HBase region boundaries.
+ * - Server-Side Chunking: Each mapper triggers a coprocessor scan on both source and
+ * target clusters. The {@link PhoenixSyncTableRegionScanner} coprocessor accumulates rows into
+ * chunks (configurable size, default 1GB) and computes an SHA-256 hash of all row data (keys +
+ * column families + qualifiers + timestamps + values).
+ * - Hash Comparison: The {@link PhoenixSyncTableMapper} receives chunk metadata (start
+ * key, end key, row count, hash) from both clusters and compares the hashes. Matching hashes mean
+ * the chunk data is identical; mismatched hashes indicate inconsistency.
+ * - Result Tracking: Results are check pointed to the {@code PHOENIX_SYNC_TABLE_OUTPUT}
+ * table, tracking verified chunks, mismatched chunks, and processing progress for resumable
+ * operations.
+ *
+ * Usage Example
+ *
+ *
+ * hbase org.apache.phoenix.mapreduce.PhoenixSyncTableTool \ --table-name MY_TABLE \
+ * --target-cluster target-zk1,target-zk2:2181:/hbase
+ *
+ */
+public class PhoenixSyncTableTool extends Configured implements Tool {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(PhoenixSyncTableTool.class);
+
+ private static final Option SCHEMA_NAME_OPTION =
+ new Option("s", "schema", true, "Phoenix schema name (optional)");
+ private static final Option TABLE_NAME_OPTION =
+ new Option("tn", "table-name", true, "Table name (mandatory)");
+ private static final Option TARGET_CLUSTER_OPTION =
+ new Option("tc", "target-cluster", true, "Target cluster ZooKeeper quorum (mandatory)");
+ private static final Option FROM_TIME_OPTION = new Option("ft", "from-time", true,
+ "Start time in milliseconds for sync (optional, defaults to 0)");
+ private static final Option TO_TIME_OPTION = new Option("tt", "to-time", true,
+ "End time in milliseconds for sync (optional, defaults to current time - 1 hour)");
+ private static final Option DRY_RUN_OPTION = new Option("dr", "dry-run", false,
+ "Dry run mode - only checkpoint inconsistencies, do not repair (optional)");
+ private static final Option CHUNK_SIZE_OPTION =
+ new Option("cs", "chunk-size", true, "Chunk size in bytes (optional, defaults to 1GB)");
+ private static final Option RUN_FOREGROUND_OPTION = new Option("runfg", "run-foreground", false,
+ "Run the job in foreground. Default - Runs the job in background.");
+ private static final Option TENANT_ID_OPTION =
+ new Option("tenant", "tenant-id", true, "Tenant ID for tenant-specific table sync (optional)");
+ private static final Option HELP_OPTION = new Option("h", "help", false, "Help");
+
+ private String schemaName;
+ private String tableName;
+ private String targetZkQuorum;
+ private Long startTime;
+ private Long endTime;
+ private boolean isDryRun;
+ private Long chunkSizeBytes;
+ private boolean isForeground;
+ private String tenantId;
+
+ private String qTable;
+ private String qSchemaName;
+
+ private Configuration configuration;
+ private Job job;
+ private PTable pTable;
+
+ /**
+ * Creates an MR job that uses server-side chunking and checksum calculation
+ * @return Configured MapReduce job ready for submission
+ * @throws Exception if job creation fails
+ */
+ private Job configureAndCreatePhoenixSyncTableJob(PTableType tableType) throws Exception {
+ configureTimeoutsAndRetries(configuration);
+ setPhoenixSyncTableToolConfiguration(configuration);
+ Job job = Job.getInstance(configuration, getJobName());
+ job.setMapperClass(PhoenixSyncTableMapper.class);
+ job.setJarByClass(PhoenixSyncTableTool.class);
+ TableMapReduceUtil.initCredentials(job);
+ TableMapReduceUtil.addDependencyJars(job);
+ Configuration conf = job.getConfiguration();
+ HBaseConfiguration.merge(conf, HBaseConfiguration.create(conf));
+ configureInput(job, tableType);
+ configureOutput(job);
+ obtainTargetClusterTokens(job);
+ return job;
+ }
+
+ /**
+ * Obtains HBase delegation tokens from the target cluster and adds them to the job. This is
+ * required for cross-cluster kerberos authentication.
+ * @param job The MapReduce job to add tokens
+ */
+ private void obtainTargetClusterTokens(Job job) throws IOException {
+ Configuration targetConf =
+ PhoenixMapReduceUtil.createConfigurationForZkQuorum(job.getConfiguration(), targetZkQuorum);
+ TableMapReduceUtil.initCredentialsForCluster(job, targetConf);
+ }
+
+ /**
+ * Configures timeouts and retry settings for the sync job
+ */
+ private void configureTimeoutsAndRetries(Configuration configuration) {
+ long syncTableQueryTimeoutMs =
+ configuration.getLong(QueryServices.SYNC_TABLE_QUERY_TIMEOUT_ATTRIB,
+ QueryServicesOptions.DEFAULT_SYNC_TABLE_QUERY_TIMEOUT);
+ long syncTableRPCTimeoutMs = configuration.getLong(QueryServices.SYNC_TABLE_RPC_TIMEOUT_ATTRIB,
+ QueryServicesOptions.DEFAULT_SYNC_TABLE_RPC_TIMEOUT);
+ long syncTableClientScannerTimeoutMs =
+ configuration.getLong(QueryServices.SYNC_TABLE_CLIENT_SCANNER_TIMEOUT_ATTRIB,
+ QueryServicesOptions.DEFAULT_SYNC_TABLE_CLIENT_SCANNER_TIMEOUT);
+ int syncTableRpcRetriesCounter =
+ configuration.getInt(QueryServices.SYNC_TABLE_RPC_RETRIES_COUNTER,
+ QueryServicesOptions.DEFAULT_SYNC_TABLE_RPC_RETRIES_COUNTER);
+
+ configuration.set(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD,
+ Long.toString(syncTableClientScannerTimeoutMs));
+ configuration.set(HConstants.HBASE_RPC_TIMEOUT_KEY, Long.toString(syncTableRPCTimeoutMs));
+ configuration.set(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
+ Integer.toString(syncTableRpcRetriesCounter));
+ configuration.set(MRJobConfig.TASK_TIMEOUT, Long.toString(syncTableQueryTimeoutMs));
+ }
+
+ private void setPhoenixSyncTableToolConfiguration(Configuration configuration) {
+ PhoenixConfigurationUtil.setPhoenixSyncTableName(configuration, qTable);
+ PhoenixConfigurationUtil.setPhoenixSyncTableTargetZkQuorum(configuration, targetZkQuorum);
+ PhoenixConfigurationUtil.setPhoenixSyncTableFromTime(configuration, startTime);
+ PhoenixConfigurationUtil.setPhoenixSyncTableToTime(configuration, endTime);
+ PhoenixConfigurationUtil.setPhoenixSyncTableDryRun(configuration, isDryRun);
+ PhoenixConfigurationUtil.setSplitByStats(configuration, false);
+ if (chunkSizeBytes != null) {
+ PhoenixConfigurationUtil.setPhoenixSyncTableChunkSizeBytes(configuration, chunkSizeBytes);
+ }
+ if (tenantId != null) {
+ PhoenixConfigurationUtil.setTenantId(configuration, tenantId);
+ }
+ PhoenixConfigurationUtil.setCurrentScnValue(configuration, endTime);
+ configuration
+ .setBooleanIfUnset(PhoenixConfigurationUtil.MAPREDUCE_RANDOMIZE_MAPPER_EXECUTION_ORDER, true);
+ }
+
+ private void configureInput(Job job, PTableType tableType) {
+ // With below query plan, we get Input split based on region boundary
+ String hint = (tableType == PTableType.INDEX) ? "" : "/*+ NO_INDEX */ ";
+ String selectStatement = "SELECT " + hint + "1 FROM " + qTable;
+ PhoenixMapReduceUtil.setInput(job, DBInputFormat.NullDBWritable.class,
+ PhoenixSyncTableInputFormat.class, qTable, selectStatement);
+ }
+
+ private void configureOutput(Job job) {
+ job.setNumReduceTasks(0);
+ job.setOutputFormatClass(NullOutputFormat.class);
+ }
+
+ private String getJobName() {
+ StringBuilder jobName = new StringBuilder("PhoenixSyncTableTool");
+ if (qSchemaName != null) {
+ jobName.append("-").append(qSchemaName);
+ }
+ jobName.append("-").append(tableName);
+ jobName.append("-").append(System.currentTimeMillis());
+ return jobName.toString();
+ }
+
+ private CommandLine parseOptions(String[] args) throws IllegalStateException {
+ Options options = getOptions();
+ CommandLineParser parser = DefaultParser.builder().setAllowPartialMatching(false)
+ .setStripLeadingAndTrailingQuotes(false).build();
+ CommandLine cmdLine = null;
+ try {
+ cmdLine = parser.parse(options, args);
+ } catch (ParseException e) {
+ LOGGER.error("Failed to parse command line options. Args: {}. Error: {}",
+ Arrays.toString(args), e.getMessage(), e);
+ printHelpAndExit("Error parsing command line options: " + e.getMessage(), options);
+ }
+
+ if (cmdLine.hasOption(HELP_OPTION.getOpt())) {
+ printHelpAndExit(options, 0);
+ }
+ requireOption(cmdLine, TABLE_NAME_OPTION);
+ requireOption(cmdLine, TARGET_CLUSTER_OPTION);
+ return cmdLine;
+ }
+
+ private void requireOption(CommandLine cmdLine, Option option) {
+ if (!cmdLine.hasOption(option.getOpt())) {
+ throw new IllegalStateException(option.getLongOpt() + " is a mandatory parameter");
+ }
+ }
+
+ private Options getOptions() {
+ Options options = new Options();
+ options.addOption(SCHEMA_NAME_OPTION);
+ options.addOption(TABLE_NAME_OPTION);
+ options.addOption(TARGET_CLUSTER_OPTION);
+ options.addOption(FROM_TIME_OPTION);
+ options.addOption(TO_TIME_OPTION);
+ options.addOption(DRY_RUN_OPTION);
+ options.addOption(CHUNK_SIZE_OPTION);
+ options.addOption(RUN_FOREGROUND_OPTION);
+ options.addOption(TENANT_ID_OPTION);
+ options.addOption(HELP_OPTION);
+ return options;
+ }
+
+ private void printHelpAndExit(String errorMessage, Options options) {
+ System.err.println(errorMessage);
+ printHelpAndExit(options, -1);
+ }
+
+ private void printHelpAndExit(Options options, int exitCode) {
+ HelpFormatter formatter = new HelpFormatter();
+ formatter.printHelp("hadoop jar phoenix-server.jar " + PhoenixSyncTableTool.class.getName(),
+ "Synchronize a Phoenix table between source and target clusters", options,
+ "\nExample usage:\n"
+ + "hadoop jar phoenix-server.jar org.apache.phoenix.mapreduce.PhoenixSyncTableTool \\\n"
+ + " --table-name MY_TABLE \\\n" + " --target-cluster :2181 \\\n"
+ + " --dry-run\n",
+ true);
+ System.exit(exitCode);
+ }
+
+ public void populateSyncTableToolAttributes(CommandLine cmdLine) {
+ tableName = cmdLine.getOptionValue(TABLE_NAME_OPTION.getOpt());
+ targetZkQuorum = cmdLine.getOptionValue(TARGET_CLUSTER_OPTION.getOpt());
+ schemaName = cmdLine.getOptionValue(SCHEMA_NAME_OPTION.getOpt());
+
+ if (cmdLine.hasOption(FROM_TIME_OPTION.getOpt())) {
+ startTime = Long.valueOf(cmdLine.getOptionValue(FROM_TIME_OPTION.getOpt()));
+ } else {
+ startTime = 0L;
+ }
+
+ if (cmdLine.hasOption(TO_TIME_OPTION.getOpt())) {
+ endTime = Long.valueOf(cmdLine.getOptionValue(TO_TIME_OPTION.getOpt()));
+ } else {
+ // Default endTime, current time - 1 hour
+ endTime = EnvironmentEdgeManager.currentTimeMillis(); // - (60 * 60 * 1000);
+ }
+
+ if (cmdLine.hasOption(CHUNK_SIZE_OPTION.getOpt())) {
+ chunkSizeBytes = Long.valueOf(cmdLine.getOptionValue(CHUNK_SIZE_OPTION.getOpt()));
+ }
+ if (cmdLine.hasOption(TENANT_ID_OPTION.getOpt())) {
+ tenantId = cmdLine.getOptionValue(TENANT_ID_OPTION.getOpt());
+ }
+ isDryRun = cmdLine.hasOption(DRY_RUN_OPTION.getOpt());
+ isForeground = cmdLine.hasOption(RUN_FOREGROUND_OPTION.getOpt());
+ qTable = SchemaUtil.getQualifiedTableName(schemaName, tableName);
+ qSchemaName = SchemaUtil.normalizeIdentifier(schemaName);
+ PhoenixMapReduceUtil.validateTimeRange(startTime, endTime, qTable);
+ PhoenixMapReduceUtil.validateMaxLookbackAge(configuration, endTime, qTable);
+ if (LOGGER.isDebugEnabled()) {
+ LOGGER.debug(
+ "PhoenixSyncTableTool configured - Table: {}, Schema: {}, Target: {}, "
+ + "StartTime: {}, EndTime: {}, DryRun: {}, ChunkSize: {}, Foreground: {}, TenantId: {}",
+ qTable, qSchemaName, targetZkQuorum, startTime, endTime, isDryRun, chunkSizeBytes,
+ isForeground, tenantId);
+ }
+ }
+
+ /**
+ * Creates or verifies the Phoenix sync tool checkpoint table
+ */
+ private void createSyncOutputTable(Connection connection) throws SQLException {
+ PhoenixSyncTableOutputRepository repository = new PhoenixSyncTableOutputRepository(connection);
+ repository.createSyncCheckpointTableIfNotExists();
+ }
+
+ /**
+ * Sets up the table reference and validates it exists and is suitable for sync operations.
+ * Validates that the table is not a VIEW
+ */
+ private PTableType validateAndGetTableType() throws SQLException {
+ Properties props = new Properties();
+ if (tenantId != null) {
+ props.setProperty("TenantId", tenantId);
+ }
+ try (Connection connection = ConnectionUtil.getInputConnection(configuration, props)) {
+ pTable = PhoenixMapReduceUtil.validateTableForMRJob(connection, qTable, false, true);
+ return pTable.getType();
+ }
+ }
+
+ private boolean submitPhoenixSyncTableJob() throws Exception {
+ if (!isForeground) {
+ job.submit();
+ LOGGER.info("PhoenixSyncTable Job :{} submitted successfully in background for table {} ",
+ job.getJobName(), qTable);
+ return true;
+ }
+ LOGGER.info("Running PhoenixSyncTable job: {} for table:{} in foreground.", job.getJobName(),
+ qTable);
+ boolean success = job.waitForCompletion(true);
+ if (success) {
+ LOGGER.info("PhoenixSyncTable job: {} completed for table {}", job.getJobName(), qTable);
+ } else {
+ LOGGER.error("PhoenixSyncTable job {} failed for table {} to target cluster {}",
+ job.getJobName(), qTable, targetZkQuorum);
+ }
+ Counters counters = job.getCounters();
+ if (counters != null) {
+ long inputRecords = counters.findCounter(PhoenixJobCounters.INPUT_RECORDS).getValue();
+ long outputRecords = counters.findCounter(PhoenixJobCounters.OUTPUT_RECORDS).getValue();
+ long failedRecords = counters.findCounter(PhoenixJobCounters.FAILED_RECORDS).getValue();
+ long chunksVerified =
+ counters.findCounter(PhoenixSyncTableMapper.SyncCounters.CHUNKS_VERIFIED).getValue();
+ long chunksMismatched =
+ counters.findCounter(PhoenixSyncTableMapper.SyncCounters.CHUNKS_MISMATCHED).getValue();
+ long sourceRowsProcessed =
+ counters.findCounter(PhoenixSyncTableMapper.SyncCounters.SOURCE_ROWS_PROCESSED).getValue();
+ long targetRowsProcessed =
+ counters.findCounter(PhoenixSyncTableMapper.SyncCounters.TARGET_ROWS_PROCESSED).getValue();
+ LOGGER.info(
+ "PhoenixSyncTable job completed, gathered counters are \n Input Record: {}, \n"
+ + "Output Record: {}, \n Failed Record: {}, \n Chunks Verified: {}, \n"
+ + "Chunks Mismatched: {}, \n Source Rows Processed: {}, \n Target Rows Processed: {}",
+ inputRecords, outputRecords, failedRecords, chunksVerified, chunksMismatched,
+ sourceRowsProcessed, targetRowsProcessed);
+ } else {
+ LOGGER.warn("Unable to retrieve job counters for table {} - job may have failed "
+ + "during initialization", qTable);
+ }
+ return success;
+ }
+
+ @Override
+ public int run(String[] args) throws Exception {
+ CommandLine cmdLine;
+ try {
+ cmdLine = parseOptions(args);
+ } catch (IllegalStateException e) {
+ printHelpAndExit(e.getMessage(), getOptions());
+ return -1;
+ }
+ configuration = HBaseConfiguration.addHbaseResources(getConf());
+ try (Connection globalConn = ConnectionUtil.getInputConnection(configuration)) {
+ createSyncOutputTable(globalConn);
+ }
+ populateSyncTableToolAttributes(cmdLine);
+ try {
+ PTableType tableType = validateAndGetTableType();
+ job = configureAndCreatePhoenixSyncTableJob(tableType);
+ boolean result = submitPhoenixSyncTableJob();
+ return result ? 0 : -1;
+ } catch (Exception ex) {
+ LOGGER.error(
+ "Exception occurred while performing phoenix sync table job for table {} to target {}: {}",
+ qTable, targetZkQuorum, ExceptionUtils.getMessage(ex), ex);
+ return -1;
+ }
+ }
+
+ public static void main(String[] args) throws Exception {
+ int exitCode = ToolRunner.run(new PhoenixSyncTableTool(), args);
+ System.exit(exitCode);
+ }
+
+ public Job getJob() {
+ return job;
+ }
+}
diff --git a/phoenix-core-server/src/main/java/org/apache/phoenix/mapreduce/index/IndexScrutinyTool.java b/phoenix-core-server/src/main/java/org/apache/phoenix/mapreduce/index/IndexScrutinyTool.java
index c9111aa7b88..7fbc213b14e 100644
--- a/phoenix-core-server/src/main/java/org/apache/phoenix/mapreduce/index/IndexScrutinyTool.java
+++ b/phoenix-core-server/src/main/java/org/apache/phoenix/mapreduce/index/IndexScrutinyTool.java
@@ -39,7 +39,6 @@
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
-import org.apache.phoenix.coprocessorclient.BaseScannerRegionObserverConstants;
import org.apache.phoenix.jdbc.PhoenixConnection;
import org.apache.phoenix.mapreduce.CsvBulkImportUtil;
import org.apache.phoenix.mapreduce.util.ConnectionUtil;
@@ -403,7 +402,7 @@ public int run(String[] args) throws Exception {
? Long.parseLong(cmdLine.getOptionValue(TIMESTAMP.getOpt()))
: EnvironmentEdgeManager.currentTimeMillis() - 60000;
- validateTimestamp(configuration, ts);
+ PhoenixMapReduceUtil.validateMaxLookbackAge(configuration, ts, qDataTable);
if (indexTable != null) {
if (!IndexTool.isValidIndexTable(connection, qDataTable, indexTable, tenantId)) {
@@ -495,20 +494,6 @@ public int run(String[] args) throws Exception {
}
}
- private void validateTimestamp(Configuration configuration, long ts) {
- long maxLookBackAge = BaseScannerRegionObserverConstants.getMaxLookbackInMillis(configuration);
- if (
- maxLookBackAge != BaseScannerRegionObserverConstants.DEFAULT_PHOENIX_MAX_LOOKBACK_AGE * 1000L
- ) {
- long minTimestamp = EnvironmentEdgeManager.currentTimeMillis() - maxLookBackAge;
- if (ts < minTimestamp) {
- throw new IllegalArgumentException("Index scrutiny can't look back past the "
- + "configured max lookback age: " + maxLookBackAge / 1000 + " seconds");
- }
- }
-
- }
-
@VisibleForTesting
public List getJobs() {
return jobs;
diff --git a/phoenix-core-server/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java b/phoenix-core-server/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java
index 6edfc9370c1..cc918dc46f3 100644
--- a/phoenix-core-server/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java
+++ b/phoenix-core-server/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java
@@ -298,9 +298,6 @@ public static IndexDisableLoggingType fromValue(byte[] value) {
public static final String INDEX_JOB_NAME_TEMPLATE = "PHOENIX_%s.%s_INDX_%s";
- public static final String INVALID_TIME_RANGE_EXCEPTION_MESSAGE = "startTime is greater than "
- + "or equal to endTime " + "or either of them are set in the future; IndexTool can't proceed.";
-
public static final String FEATURE_NOT_APPLICABLE =
"start-time/end-time and retry verify feature are only "
+ "applicable for local or non-transactional global indexes";
@@ -927,9 +924,6 @@ public int populateIndexToolAttributes(CommandLine cmdLine) throws Exception {
lastVerifyTime = new Long(cmdLine.getOptionValue(RETRY_VERIFY_OPTION.getOpt()));
validateLastVerifyTime();
}
- if (isTimeRangeSet(startTime, endTime)) {
- validateTimeRange(startTime, endTime);
- }
if (verify) {
String value = cmdLine.getOptionValue(VERIFY_OPTION.getOpt());
indexVerifyType = IndexVerifyType.fromValue(value);
@@ -954,6 +948,9 @@ public int populateIndexToolAttributes(CommandLine cmdLine) throws Exception {
isForeground = cmdLine.hasOption(RUN_FOREGROUND_OPTION.getOpt());
useSnapshot = cmdLine.hasOption(SNAPSHOT_OPTION.getOpt());
shouldDeleteBeforeRebuild = cmdLine.hasOption(DELETE_ALL_AND_REBUILD_OPTION.getOpt());
+ if (isTimeRangeSet(startTime, endTime)) {
+ PhoenixMapReduceUtil.validateTimeRange(startTime, endTime, qDataTable);
+ }
return 0;
}
@@ -984,15 +981,6 @@ public boolean isValidLastVerifyTime(Long lastVerifyTime) throws Exception {
}
}
- public static void validateTimeRange(Long sTime, Long eTime) {
- Long currentTime = EnvironmentEdgeManager.currentTimeMillis();
- Long st = (sTime == null) ? 0 : sTime;
- Long et = (eTime == null) ? currentTime : eTime;
- if (st.compareTo(currentTime) > 0 || et.compareTo(currentTime) > 0 || st.compareTo(et) >= 0) {
- throw new RuntimeException(INVALID_TIME_RANGE_EXCEPTION_MESSAGE);
- }
- }
-
private Connection getConnection(Configuration configuration) throws SQLException {
return ConnectionUtil.getInputConnection(configuration);
}
diff --git a/phoenix-core-server/src/main/java/org/apache/phoenix/mapreduce/transform/TransformTool.java b/phoenix-core-server/src/main/java/org/apache/phoenix/mapreduce/transform/TransformTool.java
index 473c2fa33b2..daca9a04616 100644
--- a/phoenix-core-server/src/main/java/org/apache/phoenix/mapreduce/transform/TransformTool.java
+++ b/phoenix-core-server/src/main/java/org/apache/phoenix/mapreduce/transform/TransformTool.java
@@ -21,7 +21,6 @@
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_CHILD_LINK_NAME_BYTES;
import static org.apache.phoenix.mapreduce.index.IndexTool.createIndexToolTables;
import static org.apache.phoenix.mapreduce.index.IndexTool.isTimeRangeSet;
-import static org.apache.phoenix.mapreduce.index.IndexTool.validateTimeRange;
import static org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil.setCurrentScnValue;
import static org.apache.phoenix.query.QueryConstants.UNVERIFIED_BYTES;
import static org.apache.phoenix.util.QueryUtil.getConnection;
@@ -334,10 +333,6 @@ public int populateTransformToolAttributesAndValidate(CommandLine cmdLine) throw
endTime = new Long(cmdLine.getOptionValue(END_TIME_OPTION.getOpt()));
}
- if (isTimeRangeSet(startTime, endTime)) {
- validateTimeRange(startTime, endTime);
- }
-
if (
(isPartialTransform || shouldFixUnverified) && (cmdLine.hasOption(AUTO_SPLIT_OPTION.getOpt()))
) {
@@ -362,6 +357,9 @@ public int populateTransformToolAttributesAndValidate(CommandLine cmdLine) throw
dataTable = cmdLine.getOptionValue(DATA_TABLE_OPTION.getOpt());
indexTable = cmdLine.getOptionValue(INDEX_TABLE_OPTION.getOpt());
qDataTable = SchemaUtil.getQualifiedTableName(schemaName, dataTable);
+ if (isTimeRangeSet(startTime, endTime)) {
+ PhoenixMapReduceUtil.validateTimeRange(startTime, endTime, qDataTable);
+ }
isForeground = cmdLine.hasOption(RUN_FOREGROUND_OPTION.getOpt());
if (cmdLine.hasOption(SPLIT_SIZE_OPTION.getOpt())) {
splitSize = Integer.parseInt(cmdLine.getOptionValue(SPLIT_SIZE_OPTION.getOpt()));
diff --git a/phoenix-core-server/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixConfigurationUtil.java b/phoenix-core-server/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixConfigurationUtil.java
index 0bd4830c291..96c159cb02f 100644
--- a/phoenix-core-server/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixConfigurationUtil.java
+++ b/phoenix-core-server/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixConfigurationUtil.java
@@ -197,6 +197,21 @@ public final class PhoenixConfigurationUtil {
// non-index jobs benefit less from this
public static final boolean DEFAULT_MAPREDUCE_RANDOMIZE_MAPPER_EXECUTION_ORDER = false;
+ public static final String PHOENIX_SYNC_TABLE_NAME = "phoenix.sync.table.table.name";
+
+ public static final String PHOENIX_SYNC_TABLE_TARGET_ZK_QUORUM = "phoenix.sync.table.target.zk";
+
+ public static final String PHOENIX_SYNC_TABLE_FROM_TIME = "phoenix.sync.table.from.time";
+
+ public static final String PHOENIX_SYNC_TABLE_TO_TIME = "phoenix.sync.table.to.time";
+
+ public static final String PHOENIX_SYNC_TABLE_DRY_RUN = "phoenix.sync.table.dry.run";
+
+ public static final String PHOENIX_SYNC_TABLE_CHUNK_SIZE_BYTES =
+ "phoenix.sync.table.chunk.size.bytes";
+
+ public static final long DEFAULT_PHOENIX_SYNC_TABLE_CHUNK_SIZE_BYTES = 1024 * 1024 * 1024; // 1GB
+
/**
* Determines type of Phoenix Map Reduce job. 1. QUERY allows running arbitrary queries without
* aggregates 2. UPDATE_STATS collects statistics for the table
@@ -890,4 +905,67 @@ public static boolean isMRRandomizeMapperExecutionOrder(final Configuration conf
return configuration.getBoolean(MAPREDUCE_RANDOMIZE_MAPPER_EXECUTION_ORDER,
DEFAULT_MAPREDUCE_RANDOMIZE_MAPPER_EXECUTION_ORDER);
}
+
+ public static void setPhoenixSyncTableName(Configuration conf, String tableName) {
+ Preconditions.checkNotNull(conf);
+ conf.set(PHOENIX_SYNC_TABLE_NAME, tableName);
+ }
+
+ public static String getPhoenixSyncTableName(Configuration conf) {
+ Preconditions.checkNotNull(conf);
+ return conf.get(PHOENIX_SYNC_TABLE_NAME);
+ }
+
+ public static void setPhoenixSyncTableTargetZkQuorum(Configuration conf, String zkQuorum) {
+ Preconditions.checkNotNull(conf);
+ conf.set(PHOENIX_SYNC_TABLE_TARGET_ZK_QUORUM, zkQuorum);
+ }
+
+ public static String getPhoenixSyncTableTargetZkQuorum(Configuration conf) {
+ Preconditions.checkNotNull(conf);
+ return conf.get(PHOENIX_SYNC_TABLE_TARGET_ZK_QUORUM);
+ }
+
+ public static void setPhoenixSyncTableFromTime(Configuration conf, Long fromTime) {
+ Preconditions.checkNotNull(conf);
+ conf.setLong(PHOENIX_SYNC_TABLE_FROM_TIME, fromTime);
+ }
+
+ public static Long getPhoenixSyncTableFromTime(Configuration conf) {
+ Preconditions.checkNotNull(conf);
+ String value = conf.get(PHOENIX_SYNC_TABLE_FROM_TIME);
+ return Long.valueOf(value);
+ }
+
+ public static void setPhoenixSyncTableToTime(Configuration conf, Long toTime) {
+ Preconditions.checkNotNull(conf);
+ conf.setLong(PHOENIX_SYNC_TABLE_TO_TIME, toTime);
+ }
+
+ public static Long getPhoenixSyncTableToTime(Configuration conf) {
+ Preconditions.checkNotNull(conf);
+ String value = conf.get(PHOENIX_SYNC_TABLE_TO_TIME);
+ return Long.valueOf(value);
+ }
+
+ public static void setPhoenixSyncTableDryRun(Configuration conf, boolean dryRun) {
+ Preconditions.checkNotNull(conf);
+ conf.setBoolean(PHOENIX_SYNC_TABLE_DRY_RUN, dryRun);
+ }
+
+ public static boolean getPhoenixSyncTableDryRun(Configuration conf) {
+ Preconditions.checkNotNull(conf);
+ return conf.getBoolean(PHOENIX_SYNC_TABLE_DRY_RUN, true);
+ }
+
+ public static void setPhoenixSyncTableChunkSizeBytes(Configuration conf, Long chunkSizeBytes) {
+ Preconditions.checkNotNull(conf);
+ conf.setLong(PHOENIX_SYNC_TABLE_CHUNK_SIZE_BYTES, chunkSizeBytes);
+ }
+
+ public static long getPhoenixSyncTableChunkSizeBytes(Configuration conf) {
+ Preconditions.checkNotNull(conf);
+ return conf.getLong(PHOENIX_SYNC_TABLE_CHUNK_SIZE_BYTES,
+ DEFAULT_PHOENIX_SYNC_TABLE_CHUNK_SIZE_BYTES);
+ }
}
diff --git a/phoenix-core-server/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixMapReduceUtil.java b/phoenix-core-server/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixMapReduceUtil.java
index 4378ed56cfe..6dcab4690c6 100644
--- a/phoenix-core-server/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixMapReduceUtil.java
+++ b/phoenix-core-server/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixMapReduceUtil.java
@@ -18,20 +18,30 @@
package org.apache.phoenix.mapreduce.util;
import java.io.IOException;
+import java.sql.Connection;
+import java.sql.SQLException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.db.DBWritable;
+import org.apache.phoenix.coprocessorclient.BaseScannerRegionObserverConstants;
+import org.apache.phoenix.jdbc.PhoenixConnection;
import org.apache.phoenix.mapreduce.PhoenixInputFormat;
import org.apache.phoenix.mapreduce.PhoenixOutputFormat;
import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil.SchemaType;
+import org.apache.phoenix.schema.PTable;
+import org.apache.phoenix.schema.PTableType;
+import org.apache.phoenix.util.EnvironmentEdgeManager;
/**
* Utility class for setting Configuration parameters for the Map Reduce job
*/
public final class PhoenixMapReduceUtil {
+ public static final String INVALID_TIME_RANGE_EXCEPTION_MESSAGE = "Invalid time range for table";
+
private PhoenixMapReduceUtil() {
}
@@ -223,4 +233,100 @@ public static void setTenantId(final Job job, final String tenantId) {
PhoenixConfigurationUtil.setTenantId(job.getConfiguration(), tenantId);
}
+ /**
+ * Validates that start and end times are in the past and start < end.
+ * @param startTime Start timestamp in millis (nullable, defaults to 0)
+ * @param endTime End timestamp in millis (nullable, defaults to current time)
+ * @param tableName Table name for error messages
+ * @throws IllegalArgumentException if time range is invalid
+ */
+ public static void validateTimeRange(Long startTime, Long endTime, String tableName) {
+ long currentTime = EnvironmentEdgeManager.currentTimeMillis();
+ long st = (startTime == null) ? 0L : startTime;
+ long et = (endTime == null) ? currentTime : endTime;
+
+ if (et > currentTime || st >= et) {
+ throw new IllegalArgumentException(String.format(
+ "%s %s: start and end times must be in the past "
+ + "and start < end. Start: %d, End: %d, Current: %d",
+ INVALID_TIME_RANGE_EXCEPTION_MESSAGE, tableName, st, et, currentTime));
+ }
+ }
+
+ /**
+ * Validates that the end time doesn't exceed the max lookback age configured in Phoenix.
+ * @param configuration Hadoop configuration
+ * @param endTime End timestamp in millis
+ * @param tableName Table name for error messages
+ * @throws IllegalArgumentException if endTime is before min allowed timestamp
+ */
+ public static void validateMaxLookbackAge(Configuration configuration, Long endTime,
+ String tableName) {
+ long maxLookBackAge = BaseScannerRegionObserverConstants.getMaxLookbackInMillis(configuration);
+ if (maxLookBackAge > 0) {
+ long minTimestamp = EnvironmentEdgeManager.currentTimeMillis() - maxLookBackAge;
+ if (endTime < minTimestamp) {
+ throw new IllegalArgumentException(String.format(
+ "Table %s can't look back past the configured max lookback age: %d ms. "
+ + "End time: %d, Min allowed timestamp: %d",
+ tableName, maxLookBackAge, endTime, minTimestamp));
+ }
+ }
+ }
+
+ /**
+ * Validates that a table is suitable for MR operations. Checks table existence, type, and state.
+ * @param connection Phoenix connection
+ * @param qualifiedTableName Qualified table name
+ * @param allowViews Whether to allow VIEW tables
+ * @param allowIndexes Whether to allow INDEX tables
+ * @return PTable instance
+ * @throws SQLException if connection fails
+ * @throws IllegalArgumentException if validation fails
+ */
+ public static PTable validateTableForMRJob(Connection connection, String qualifiedTableName,
+ boolean allowViews, boolean allowIndexes) throws SQLException {
+ PTable pTable = connection.unwrap(PhoenixConnection.class).getTableNoCache(qualifiedTableName);
+
+ if (pTable == null) {
+ throw new IllegalArgumentException(
+ String.format("Table %s does not exist", qualifiedTableName));
+ } else if (!allowViews && pTable.getType() == PTableType.VIEW) {
+ throw new IllegalArgumentException(
+ String.format("Cannot run MR job on VIEW table %s", qualifiedTableName));
+ } else if (!allowIndexes && pTable.getType() == PTableType.INDEX) {
+ throw new IllegalArgumentException(
+ String.format("Cannot run MR job on INDEX table %s directly", qualifiedTableName));
+ }
+
+ return pTable;
+ }
+
+ /**
+ * Configures a Configuration object with ZooKeeper settings from a ZK quorum string.
+ * @param baseConf Base configuration to create from (typically job configuration)
+ * @param zkQuorum ZooKeeper quorum string in format: "zk_quorum:port:znode" Example:
+ * "zk1,zk2,zk3:2181:/hbase"
+ * @return New Configuration with ZK settings applied
+ * @throws RuntimeException if zkQuorum format is invalid (must have exactly 3 parts)
+ */
+ public static Configuration createConfigurationForZkQuorum(Configuration baseConf,
+ String zkQuorum) {
+ Configuration conf = org.apache.hadoop.hbase.HBaseConfiguration.create(baseConf);
+ String[] parts = zkQuorum.split(":");
+
+ if (!(parts.length == 3 || parts.length == 4)) {
+ throw new RuntimeException(
+ "Invalid ZooKeeper quorum format. Expected: zk_quorum:port:znode OR "
+ + "zk_quorum:port:znode:krb_principal. Got: " + zkQuorum);
+ }
+
+ conf.set(HConstants.ZOOKEEPER_QUORUM, parts[0]);
+ conf.set(HConstants.ZOOKEEPER_CLIENT_PORT, parts[1]);
+ conf.set(HConstants.ZOOKEEPER_ZNODE_PARENT, parts[2]);
+ if (parts.length == 4) {
+ conf.set(HConstants.ZK_CLIENT_KERBEROS_PRINCIPAL, parts[3]);
+ }
+ return conf;
+ }
}
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/mapreduce/PhoenixPhoenixSyncTableToolIT.java b/phoenix-core/src/it/java/org/apache/phoenix/mapreduce/PhoenixPhoenixSyncTableToolIT.java
new file mode 100644
index 00000000000..7c151afda65
--- /dev/null
+++ b/phoenix-core/src/it/java/org/apache/phoenix/mapreduce/PhoenixPhoenixSyncTableToolIT.java
@@ -0,0 +1,279 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.mapreduce;
+
+import static org.junit.Assert.*;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.sql.Timestamp;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Objects;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.Counters;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.phoenix.end2end.NeedsOwnMiniClusterTest;
+import org.apache.phoenix.jdbc.HighAvailabilityTestingUtility.HBaseTestingUtilityPair;
+import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.jdbc.PhoenixDriver;
+import org.apache.phoenix.mapreduce.PhoenixSyncTableMapper.SyncCounters;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TestName;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@Category(NeedsOwnMiniClusterTest.class)
+public class PhoenixPhoenixSyncTableToolIT {
+ private static final Logger LOGGER = LoggerFactory.getLogger(PhoenixPhoenixSyncTableToolIT.class);
+
+ private static final HBaseTestingUtilityPair CLUSTERS = new HBaseTestingUtilityPair();
+ private static final String TEST_TABLE_NAME = "TEST_SYNC_TABLE";
+ private static final int REPLICATION_WAIT_TIMEOUT_MS = 100000;
+ private static final int REPLICATION_POLL_INTERVAL_MS = 500;
+
+ private Connection sourceConnection;
+ private Connection targetConnection;
+ private String targetZkQuorum;
+
+ @Rule
+ public final TestName testName = new TestName();
+
+ @BeforeClass
+ public static void setUpBeforeClass() throws Exception {
+ CLUSTERS.start(); // Starts both clusters and sets up replication
+ DriverManager.registerDriver(PhoenixDriver.INSTANCE);
+ }
+
+ @AfterClass
+ public static void tearDownAfterClass() throws Exception {
+ DriverManager.deregisterDriver(PhoenixDriver.INSTANCE);
+ CLUSTERS.close();
+ }
+
+ @Before
+ public void setUp() throws Exception {
+ // Create Phoenix connections to both clusters
+ String sourceJdbcUrl = "jdbc:phoenix:" + CLUSTERS.getZkUrl1();
+ String targetJdbcUrl = "jdbc:phoenix:" + CLUSTERS.getZkUrl2();
+
+ sourceConnection = DriverManager.getConnection(sourceJdbcUrl);
+ targetConnection = DriverManager.getConnection(targetJdbcUrl);
+
+ // Extract target ZK quorum for PhoenixSyncTableTool (format: host:port:znode)
+ // Input format: "127.0.0.1\:52638::/hbase" → Output: "127.0.0.1:52638:/hbase"
+ // Note: The backslash is a single character, not escaped in the actual string
+ targetZkQuorum = CLUSTERS.getZkUrl2().replace("\\", "").replace("::", ":");
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ dropTableIfExists(sourceConnection, TEST_TABLE_NAME);
+ dropTableIfExists(targetConnection, TEST_TABLE_NAME);
+
+ // Close connections
+ if (sourceConnection != null) {
+ sourceConnection.close();
+ }
+ if (targetConnection != null) {
+ targetConnection.close();
+ }
+ }
+
+ @Test
+ public void testSyncTableWithDataDifference() throws Exception {
+ createTableOnBothClusters(sourceConnection, targetConnection, TEST_TABLE_NAME);
+
+ insertTestData(sourceConnection, 1, 1000);
+
+ waitForReplication(targetConnection, TEST_TABLE_NAME, 1000, REPLICATION_WAIT_TIMEOUT_MS);
+
+ verifyDataIdentical(sourceConnection, targetConnection, TEST_TABLE_NAME);
+
+ introduceTargetDifferences();
+
+ List sourceRowsBefore = queryAllRows(sourceConnection,
+ "SELECT ID, NAME, NAME_VALUE FROM " + TEST_TABLE_NAME + " ORDER BY ID");
+ List targetRowsBefore = queryAllRows(targetConnection,
+ "SELECT ID, NAME, NAME_VALUE FROM " + TEST_TABLE_NAME + " ORDER BY ID");
+
+ assertEquals(sourceRowsBefore, targetRowsBefore);
+
+ Configuration conf = new Configuration(CLUSTERS.getHBaseCluster1().getConfiguration());
+ String[] args = new String[] { "--table-name", TEST_TABLE_NAME, "--target-cluster",
+ targetZkQuorum, "--run-foreground", "--chunk-size", "10240" };
+ PhoenixSyncTableTool tool = new PhoenixSyncTableTool();
+ tool.setConf(conf);
+ int exitCode = tool.run(args);
+ Job job = tool.getJob();
+ assertNotNull("Job should not be null", job);
+ assertEquals(0, exitCode);
+ Counters counters = job.getCounters();
+ long chunksMismatched = counters.findCounter(SyncCounters.CHUNKS_MISMATCHED).getValue();
+ assertEquals("Should have detected mismatched chunks", 4, chunksMismatched);
+ }
+
+ private void createTableOnBothClusters(Connection sourceConn, Connection targetConn,
+ String tableName) throws SQLException {
+ String ddl = "CREATE TABLE " + tableName + " (\n" + " ID INTEGER NOT NULL PRIMARY KEY,\n"
+ + " NAME VARCHAR(50),\n" + " NAME_VALUE BIGINT,\n" + " UPDATED_DATE TIMESTAMP\n"
+ + ") REPLICATION_SCOPE=1,UPDATE_CACHE_FREQUENCY = 0\n" + "SPLIT ON (500, 650, 800)";
+
+ sourceConn.createStatement().execute(ddl);
+ sourceConn.commit();
+ // Clear cache to prevent it from affecting target cluster table creation.
+ // Both region servers share the same JVM
+ ((PhoenixConnection) sourceConn).getQueryServices().clearCache();
+
+ ddl = "CREATE TABLE " + tableName + " (\n" + " ID INTEGER NOT NULL PRIMARY KEY,\n"
+ + " NAME VARCHAR(50),\n" + " NAME_VALUE BIGINT,\n" + " UPDATED_DATE TIMESTAMP\n"
+ + ") UPDATE_CACHE_FREQUENCY = 0\n" + "SPLIT ON (60, 100, 300, 525, 600, 900)";
+
+ targetConn.createStatement().execute(ddl);
+ targetConn.commit();
+ ((PhoenixConnection) targetConn).getQueryServices().clearCache();
+ }
+
+ private void insertTestData(Connection conn, int startId, int endId) throws SQLException {
+ String upsert = "UPSERT INTO " + TEST_TABLE_NAME
+ + " (ID, NAME, NAME_VALUE, UPDATED_DATE) VALUES (?, ?, ?, ?)";
+ PreparedStatement stmt = conn.prepareStatement(upsert);
+ for (int i = startId; i <= endId; i++) {
+ stmt.setInt(1, i);
+ stmt.setString(2, "NAME_" + i);
+ stmt.setLong(3, (long) i);
+ stmt.setTimestamp(4, new Timestamp(System.currentTimeMillis()));
+ stmt.executeUpdate();
+ conn.commit();
+ }
+ }
+
+ /**
+ * Waits for HBase replication to complete by polling target cluster.
+ */
+ private void waitForReplication(Connection targetConn, String tableName, int expectedRows,
+ long timeoutMs) throws Exception {
+ long startTime = System.currentTimeMillis();
+ String countQuery = "SELECT COUNT(*) FROM " + tableName;
+
+ while (System.currentTimeMillis() - startTime < timeoutMs) {
+ ResultSet rs = targetConn.createStatement().executeQuery(countQuery);
+ rs.next();
+ int count = rs.getInt(1);
+ rs.close();
+
+ if (count == expectedRows) {
+ return;
+ }
+
+ Thread.sleep(REPLICATION_POLL_INTERVAL_MS);
+ }
+
+ fail("Replication timeout: expected " + expectedRows + " rows on target");
+ }
+
+ /**
+ * Verifies that source and target have identical data.
+ */
+ private void verifyDataIdentical(Connection sourceConn, Connection targetConn, String tableName)
+ throws SQLException {
+ String query = "SELECT ID, NAME, NAME_VALUE FROM " + tableName + " ORDER BY ID";
+ List sourceRows = queryAllRows(sourceConn, query);
+ List targetRows = queryAllRows(targetConn, query);
+
+ assertEquals("Row counts should match", sourceRows.size(), targetRows.size());
+
+ for (int i = 0; i < sourceRows.size(); i++) {
+ assertEquals("Row " + i + " should be identical", sourceRows.get(i), targetRows.get(i));
+ }
+ }
+
+ private void introduceTargetDifferences() throws SQLException {
+ String updateValue = "UPSERT INTO " + TEST_TABLE_NAME + " (ID, NAME) VALUES (65, 'NAME_65')";
+ PreparedStatement ps1 = targetConnection.prepareStatement(updateValue);
+ ps1.executeUpdate();
+
+ String updateValue2 = "UPSERT INTO " + TEST_TABLE_NAME + " (ID, NAME) VALUES (300, 'NAME_300')";
+ PreparedStatement ps2 = targetConnection.prepareStatement(updateValue2);
+ ps2.executeUpdate();
+
+ String updateValue3 = "UPSERT INTO " + TEST_TABLE_NAME + " (ID, NAME) VALUES (500, 'NAME_500')";
+ PreparedStatement ps3 = targetConnection.prepareStatement(updateValue3);
+ ps3.executeUpdate();
+
+ String updateValue4 = "UPSERT INTO " + TEST_TABLE_NAME + " (ID, NAME) VALUES (650, 'NAME_650')";
+ PreparedStatement ps4 = targetConnection.prepareStatement(updateValue4);
+ ps4.executeUpdate();
+
+ targetConnection.commit();
+ }
+
+ /**
+ * Queries all rows from a table.
+ */
+ private List queryAllRows(Connection conn, String query) throws SQLException {
+ List rows = new ArrayList<>();
+
+ try (Statement stmt = conn.createStatement(); ResultSet rs = stmt.executeQuery(query)) {
+
+ while (rs.next()) {
+ TestRow row = new TestRow();
+ row.id = rs.getInt("ID");
+ row.name = rs.getString("NAME");
+ row.name_value = rs.getLong("NAME_VALUE");
+ rows.add(row);
+ }
+ }
+
+ return rows;
+ }
+
+ /**
+ * Drops a table if it exists.
+ */
+ private void dropTableIfExists(Connection conn, String tableName) {
+ try {
+ conn.createStatement().execute("DROP TABLE IF EXISTS " + tableName);
+ conn.commit();
+ } catch (SQLException e) {
+ LOGGER.warn("Failed to drop table {}: {}", tableName, e.getMessage());
+ }
+ }
+
+ private static class TestRow {
+ int id;
+ String name;
+ long name_value;
+
+ public boolean equals(Object o) {
+ if (!(o instanceof TestRow)) return false;
+ TestRow other = (TestRow) o;
+ return id == other.id && Objects.equals(name, other.name) && name_value == other.name_value;
+ }
+ }
+}
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/index/IndexToolTest.java b/phoenix-core/src/test/java/org/apache/phoenix/index/IndexToolTest.java
index f6b408d1067..28f6c4ae1e2 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/index/IndexToolTest.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/index/IndexToolTest.java
@@ -18,8 +18,8 @@
package org.apache.phoenix.index;
import static org.apache.phoenix.mapreduce.index.IndexTool.FEATURE_NOT_APPLICABLE;
-import static org.apache.phoenix.mapreduce.index.IndexTool.INVALID_TIME_RANGE_EXCEPTION_MESSAGE;
import static org.apache.phoenix.mapreduce.index.IndexTool.RETRY_VERIFY_NOT_APPLICABLE;
+import static org.apache.phoenix.mapreduce.util.PhoenixMapReduceUtil.INVALID_TIME_RANGE_EXCEPTION_MESSAGE;
import static org.junit.Assert.assertEquals;
import static org.mockito.Mockito.when;
| | | | | | | |