Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,11 @@ ConcurrentHashMap<UUID, ReferenceCounted<OmSnapshot>> getDbMap() {
return dbMap;
}

@VisibleForTesting
Set<UUID> getPendingEvictionQueue() {
return pendingEvictionQueue;
}

/**
* @return number of DB instances currently held in cache.
*/
Expand All @@ -158,6 +163,7 @@ public void invalidate(UUID key) {
}
omMetrics.decNumSnapshotCacheSize();
}
pendingEvictionQueue.remove(k);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

pendingEvictionQueue may have the entry k, if snapshot purge response is happening after, and all references of the snapshot is decremented (releasing the SnapshotCache lock of the key), but before the periodic cleanup thread kicks in.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

note: this case should not happen during checkpointing, because checkpoints holds a write lock of the snapshot cache, and once released, it invokes cleanup() immediately.

return null;
});
}
Expand Down Expand Up @@ -323,8 +329,14 @@ private UncheckedAutoCloseableSupplier<OMLockDetails> lock(Supplier<OMLockDetail

AtomicReference<OMLockDetails> lockDetails = new AtomicReference<>(emptyLockFunction.get());
if (lockDetails.get().isLockAcquired()) {
if (!cleanupFunction.get()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the lock is released by calling emptyUnlockFunction.get(), if cleanup operation was not successful or if it throws a Throwable.

try {
if (!cleanupFunction.get()) {
lockDetails.set(emptyUnlockFunction.get());
throw new IllegalStateException("Failed to acquire lock as cleanup did not drain the cache.");
}
} catch (Throwable t) {
lockDetails.set(emptyUnlockFunction.get());
throw t;
}
}

Expand Down Expand Up @@ -377,26 +389,25 @@ private synchronized Void cleanup(UUID evictionKey, boolean expectKeyToBePresent
}

dbMap.compute(evictionKey, (k, v) -> {
pendingEvictionQueue.remove(k);
ReferenceCounted<OmSnapshot> result = null;
if (v == null) {
throw new IllegalStateException("SnapshotId '" + k + "' does not exist in cache. The RocksDB " +
"instance of the Snapshot may not be closed properly.");
}

if (v.getTotalRefCount() > 0) {
LOG.warn("SnapshotId '{}' does not exist in cache. The RocksDB " +
"instance of the Snapshot may not be closed properly.", k);
} else if (v.getTotalRefCount() > 0) {
LOG.debug("SnapshotId {} is still being referenced ({}), skipping its clean up.", k, v.getTotalRefCount());
return v;
result = v;
} else {
LOG.debug("Closing SnapshotId {}. It is not being referenced anymore.", k);
// Close the instance, which also closes its DB handle.
try {
v.get().close();
} catch (IOException ex) {
throw new IllegalStateException("Error while closing snapshot DB.", ex);
throw new IllegalStateException("Error while closing snapshot DB for snapshotId " + k, ex);
}
omMetrics.decNumSnapshotCacheSize();
return null;
}
pendingEvictionQueue.remove(k);
return result;
});
return null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import static org.apache.hadoop.ozone.om.lock.DAGLeveledResource.SNAPSHOT_DB_LOCK;
import static org.apache.hadoop.ozone.om.lock.OzoneManagerLock.LeveledResource.VOLUME_LOCK;
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertInstanceOf;
Expand Down Expand Up @@ -46,6 +47,7 @@
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.utils.db.DBStore;
import org.apache.hadoop.hdds.utils.db.Table;
import org.apache.hadoop.ozone.om.OMMetadataManager;
import org.apache.hadoop.ozone.om.OMMetrics;
Expand Down Expand Up @@ -511,4 +513,134 @@ void testSnapshotOperationsNotBlockedDuringCompaction() throws IOException, Inte
verify(store1, times(1)).compactTable("table2");
verify(store1, times(0)).compactTable("keyTable");
}

private static IOzoneManagerLock newAcquiringLock() {
IOzoneManagerLock acquiringLock = mock(IOzoneManagerLock.class);
when(acquiringLock.acquireReadLock(eq(SNAPSHOT_DB_LOCK), any(String[].class)))
.thenReturn(OMLockDetails.EMPTY_DETAILS_LOCK_ACQUIRED);
when(acquiringLock.releaseReadLock(eq(SNAPSHOT_DB_LOCK), any(String[].class)))
.thenReturn(OMLockDetails.EMPTY_DETAILS_LOCK_NOT_ACQUIRED);
when(acquiringLock.acquireResourceWriteLock(eq(SNAPSHOT_DB_LOCK)))
.thenReturn(OMLockDetails.EMPTY_DETAILS_LOCK_ACQUIRED);
when(acquiringLock.releaseResourceWriteLock(eq(SNAPSHOT_DB_LOCK)))
.thenReturn(OMLockDetails.EMPTY_DETAILS_LOCK_NOT_ACQUIRED);
when(acquiringLock.acquireWriteLock(eq(SNAPSHOT_DB_LOCK), any(String[].class)))
.thenReturn(OMLockDetails.EMPTY_DETAILS_LOCK_ACQUIRED);
when(acquiringLock.releaseWriteLock(eq(SNAPSHOT_DB_LOCK), any(String[].class)))
.thenReturn(OMLockDetails.EMPTY_DETAILS_LOCK_NOT_ACQUIRED);
return acquiringLock;
}

private OmSnapshot mockSnapshot(UUID snapshotId) {
final OmSnapshot omSnapshot = mock(OmSnapshot.class);
when(omSnapshot.getSnapshotTableKey()).thenReturn(snapshotId.toString());
when(omSnapshot.getSnapshotID()).thenReturn(snapshotId);

return omSnapshot;
}

@Test
@DisplayName("Stale eviction key (invalidate + late close) is cleaned up without throwing")
void testStaleEvictionKeyDuringCleanup() throws IOException {
snapshotCache = new SnapshotCache(cacheLoader, CACHE_SIZE_LIMIT, omMetrics, 0, true, newAcquiringLock());
final UUID snapshotId = UUID.randomUUID();

// Acquire a snapshot handle so it is ref-counted in the cache.
UncheckedAutoCloseableSupplier<OmSnapshot> handle = snapshotCache.get(snapshotId);
assertEquals(1, snapshotCache.size());

// Invalidate removes the dbMap entry. The handle still exists and will later hit refcount=0.
snapshotCache.invalidate(snapshotId);
assertEquals(0, snapshotCache.size());

// Late close triggers ReferenceCounted callback which can re-add snapshotId to pendingEvictionQueue.
handle.close();
assertTrue(snapshotCache.getPendingEvictionQueue().contains(snapshotId));

// cleanup(true) is invoked by lock(); it should remove the stale key and not throw.
assertDoesNotThrow(() -> {
try (UncheckedAutoCloseableSupplier<OMLockDetails> lockDetails = snapshotCache.lock()) {
assertTrue(lockDetails.get().isLockAcquired());
}
});
assertFalse(snapshotCache.getPendingEvictionQueue().contains(snapshotId));
}

@Test
@DisplayName("Close failure keeps snapshot in eviction queue for retry")
void testCloseFailureRetriesSnapshot() throws Exception {

snapshotCache = new SnapshotCache(cacheLoader, CACHE_SIZE_LIMIT, omMetrics, 0, true, newAcquiringLock());
final UUID snapshotId = UUID.randomUUID();

final AtomicBoolean failCloseOnce = new AtomicBoolean(true);
final OmSnapshot failingSnapshot = mockSnapshot(snapshotId);

OMMetadataManager metadataManager = mock(OMMetadataManager.class);
DBStore store = mock(DBStore.class);
when(failingSnapshot.getMetadataManager()).thenReturn(metadataManager);
when(metadataManager.getStore()).thenReturn(store);
when(store.listTables()).thenReturn(new ArrayList<>());

doAnswer(invocation -> {
if (failCloseOnce.getAndSet(false)) {
throw new IOException("close failed");
}
return null;
}).when(failingSnapshot).close();

when(cacheLoader.load(eq(snapshotId))).thenReturn(failingSnapshot);

// Load + close handle so refcount transitions to 0 and snapshotId is queued for eviction.
try (UncheckedAutoCloseableSupplier<OmSnapshot> ignored = snapshotCache.get(snapshotId)) {
assertEquals(1, snapshotCache.size());
assertEquals(1, omMetrics.getNumSnapshotCacheSize());
}
assertEquals(0L, snapshotCache.getDbMap().get(snapshotId).getTotalRefCount());
assertTrue(snapshotCache.getPendingEvictionQueue().contains(snapshotId));

// First cleanup attempt fails to close; entry should remain in dbMap and key should stay queued for retry.
assertThrows(IllegalStateException.class, () -> snapshotCache.lock());
assertTrue(snapshotCache.getDbMap().containsKey(snapshotId));
assertTrue(snapshotCache.getPendingEvictionQueue().contains(snapshotId));
assertEquals(1, omMetrics.getNumSnapshotCacheSize());

// Second cleanup attempt should succeed (close no longer throws), removing entry and eviction key.
try (UncheckedAutoCloseableSupplier<OMLockDetails> lockDetails = snapshotCache.lock()) {
assertTrue(lockDetails.get().isLockAcquired());
}
assertFalse(snapshotCache.getDbMap().containsKey(snapshotId));
assertFalse(snapshotCache.getPendingEvictionQueue().contains(snapshotId));
assertEquals(0, omMetrics.getNumSnapshotCacheSize());
}

@Test
@DisplayName("lock supplier releases write lock if cleanup throws an exception")
void testLockSupplierReleasesWriteLockOnCleanupException() throws Exception {
IOzoneManagerLock acquiringLock = newAcquiringLock();
snapshotCache = new SnapshotCache(cacheLoader, CACHE_SIZE_LIMIT, omMetrics, 0, true, acquiringLock);

final UUID snapshotId = UUID.randomUUID();
final OmSnapshot failingSnapshot = mockSnapshot(snapshotId);

OMMetadataManager metadataManager = mock(OMMetadataManager.class);
DBStore store = mock(DBStore.class);
when(failingSnapshot.getMetadataManager()).thenReturn(metadataManager);
when(metadataManager.getStore()).thenReturn(store);
// Trigger an unchecked exception during compaction, which is not caught by cleanup().
when(store.listTables()).thenThrow(new RuntimeException("listTables failed"));

when(cacheLoader.load(eq(snapshotId))).thenReturn(failingSnapshot);

// Load the snapshot and close so it is enqueued for eviction (refcount reaches 0).
try (UncheckedAutoCloseableSupplier<OmSnapshot> ignored = snapshotCache.get(snapshotId)) {
assertEquals(1, snapshotCache.size());
}
assertTrue(snapshotCache.getPendingEvictionQueue().contains(snapshotId));

// cleanup(true) will throw -> lock() should release the resource write lock before rethrowing.
assertThrows(RuntimeException.class, () -> snapshotCache.lock());
verify(acquiringLock, times(1)).acquireResourceWriteLock(eq(SNAPSHOT_DB_LOCK));
verify(acquiringLock, times(1)).releaseResourceWriteLock(eq(SNAPSHOT_DB_LOCK));
}
}