From c392ce711f1cf57938c2f5ce1eb8ec6cb3e926cb Mon Sep 17 00:00:00 2001 From: Mario de Frutos Date: Mon, 2 Feb 2026 10:25:44 +0100 Subject: [PATCH 1/3] fix(replication): prevent WAL exhaustion from slow consumers The replication feed thread could block indefinitely when sending data to a slow replica. If the replica wasn't consuming data fast enough, the TCP send buffer would fill and the feed thread would block on write() with no timeout. During this time, WAL files would rotate and be pruned, leaving the replica's sequence unavailable when the thread eventually unblocked or the connection dropped. This commit adds three mechanisms to address the issue: 1. Socket send timeout: New SockSendWithTimeout() function that uses poll() to wait for socket writability with a configurable timeout (default 30 seconds). This prevents indefinite blocking. 2. Replication lag detection: At the start of each loop iteration, check if the replica has fallen too far behind (configurable via max-replication-lag). If exceeded, disconnect the slow consumer before WAL is exhausted, allowing psync on reconnect. Disabled by default (0), set to a positive value to enable. 3. Exponential backoff on reconnection: When a replica is disconnected, it now waits with exponential backoff (1s, 2s, 4s... up to 60s) before reconnecting. This prevents rapid reconnection loops for persistently slow replicas. The backoff resets on successful psync or fullsync. New configuration options: - max-replication-lag: Maximum sequence lag before disconnecting (default: 0 = disabled) - replication-send-timeout-ms: Socket send timeout in ms (default: 30000) Fixes https://github.com/apache/kvrocks/issues/3356 --- kvrocks.conf | 15 + src/cluster/replication.cc | 38 ++- src/cluster/replication.h | 3 + src/common/io_util.cc | 97 ++++++ src/common/io_util.h | 4 + src/config/config.cc | 2 + src/config/config.h | 2 + tests/cppunit/config_test.cc | 2 + .../replication/replication_test.go | 85 +++++ .../replication/slow_consumer_bug_test.go | 320 ++++++++++++++++++ tests/gocase/util/client.go | 117 +++++++ 11 files changed, 679 insertions(+), 6 deletions(-) create mode 100644 tests/gocase/integration/replication/slow_consumer_bug_test.go diff --git a/kvrocks.conf b/kvrocks.conf index ad52bb3763a..a4180cde1a5 100644 --- a/kvrocks.conf +++ b/kvrocks.conf @@ -231,6 +231,21 @@ replication-delay-bytes 16384 # Default: 16 updates replication-delay-updates 16 +# Maximum sequence lag allowed before disconnecting a slow replica. +# If a replica falls behind by more than this many sequences, the master will +# disconnect it to prevent WAL exhaustion. The replica can then reconnect and +# attempt partial sync (psync) if the sequence is still available. +# Set to 0 to disable this check (default). +# Default: 0 (disabled) +max-replication-lag 0 + +# Timeout in milliseconds for socket send operations to replicas. +# If sending data to a replica blocks for longer than this timeout, +# the connection will be dropped. This prevents the replication feed thread +# from blocking indefinitely on slow consumers. +# Default: 30000 (30 seconds) +replication-send-timeout-ms 30000 + # TCP listen() backlog. # # In high requests-per-second environments you need an high backlog in order diff --git a/src/cluster/replication.cc b/src/cluster/replication.cc index bb211095c51..f91a6a36e46 100644 --- a/src/cluster/replication.cc +++ b/src/cluster/replication.cc @@ -63,7 +63,9 @@ FeedSlaveThread::FeedSlaveThread(Server *srv, redis::Connection *conn, rocksdb:: next_repl_seq_(next_repl_seq), req_(srv), max_delay_bytes_(srv->GetConfig()->max_replication_delay_bytes), - max_delay_updates_(srv->GetConfig()->max_replication_delay_updates) {} + max_delay_updates_(srv->GetConfig()->max_replication_delay_updates), + max_replication_lag_(srv->GetConfig()->max_replication_lag), + send_timeout_ms_(srv->GetConfig()->replication_send_timeout_ms) {} Status FeedSlaveThread::Start() { auto s = util::CreateThread("feed-replica", [this] { @@ -184,6 +186,21 @@ void FeedSlaveThread::loop() { while (!IsStopped()) { auto curr_seq = next_repl_seq_.load(); + // Check replication lag - disconnect slow consumers before WAL is exhausted + // Skip check if max_replication_lag_ is 0 (feature disabled) + if (max_replication_lag_ > 0) { + auto latest_seq = srv_->storage->LatestSeqNumber(); + if (latest_seq > curr_seq) { + auto lag = static_cast(latest_seq - curr_seq); + if (lag > max_replication_lag_) { + ERROR("Replication lag {} exceeds max allowed {} for slave {}:{}, disconnecting to prevent WAL exhaustion", + lag, max_replication_lag_, conn_->GetAnnounceIP(), conn_->GetListeningPort()); + Stop(); + return; + } + } + } + if (!iter_ || !iter_->Valid()) { if (iter_) INFO("WAL was rotated, would reopen again"); if (!srv_->storage->WALHasNewData(curr_seq) || !srv_->storage->GetWALIter(curr_seq, &iter_).IsOK()) { @@ -221,10 +238,12 @@ void FeedSlaveThread::loop() { batches_bulk += redis::BulkString("_getack"); } - // Send entire bulk which contain multiple batches - auto s = util::SockSend(conn_->GetFD(), batches_bulk, conn_->GetBufferEvent()); + // Send entire bulk which contain multiple batches with timeout + // This prevents blocking indefinitely on slow consumers + auto s = util::SockSendWithTimeout(conn_->GetFD(), batches_bulk, conn_->GetBufferEvent(), send_timeout_ms_); if (!s.IsOK()) { - ERROR("Write error while sending batch to slave: {}. batches: 0x{}", s.Msg(), util::StringToHex(batches_bulk)); + ERROR("Write error while sending batch to slave {}:{}: {}. batch_size={}", conn_->GetAnnounceIP(), + conn_->GetListeningPort(), s.Msg(), batches_bulk.size()); Stop(); return; } @@ -260,9 +279,14 @@ void ReplicationThread::CallbacksStateMachine::ConnEventCB(bufferevent *bev, int } if (events & (BEV_EVENT_ERROR | BEV_EVENT_EOF)) { ERROR("[replication] connection error/eof, reconnect the master"); - // Wait a bit and reconnect + // Wait with exponential backoff before reconnecting + constexpr int kMaxBackoffSeconds = 60; + constexpr int kMaxShiftBits = 6; // Cap shift to avoid UB; 2^6 = 64 then clamped to 60 repl_->repl_state_.store(kReplConnecting, std::memory_order_relaxed); - std::this_thread::sleep_for(std::chrono::seconds(1)); + int attempts = repl_->reconnect_attempts_.fetch_add(1, std::memory_order_relaxed); + int backoff_secs = std::min(1 << std::min(attempts, kMaxShiftBits), kMaxBackoffSeconds); + WARN("[replication] waiting {} seconds before reconnecting (attempt {})", backoff_secs, attempts + 1); + std::this_thread::sleep_for(std::chrono::seconds(backoff_secs)); Stop(); Start(); } @@ -634,6 +658,7 @@ ReplicationThread::CBState ReplicationThread::tryPSyncReadCB(bufferevent *bev) { } else { // PSYNC is OK, use IncrementBatchLoop INFO("[replication] PSync is ok, start increment batch loop"); + reconnect_attempts_.store(0, std::memory_order_relaxed); // Reset backoff counter on successful connection return CBState::NEXT; } } @@ -879,6 +904,7 @@ ReplicationThread::CBState ReplicationThread::fullSyncReadCB(bufferevent *bev) { return CBState::RESTART; } INFO("[replication] Succeeded restoring the backup, fullsync was finish"); + reconnect_attempts_.store(0, std::memory_order_relaxed); // Reset backoff counter on successful fullsync post_fullsync_cb_(); // It needs to reload namespaces from DB after the full sync is done, diff --git a/src/cluster/replication.h b/src/cluster/replication.h index ab9ab7ae2d5..5b6c8fbd1b0 100644 --- a/src/cluster/replication.h +++ b/src/cluster/replication.h @@ -91,6 +91,8 @@ class FeedSlaveThread { // Configurable delay limits size_t max_delay_bytes_; size_t max_delay_updates_; + int64_t max_replication_lag_; + int send_timeout_ms_; void loop(); void checkLivenessIfNeed(); @@ -166,6 +168,7 @@ class ReplicationThread : private EventCallbackBase { const bool replication_group_sync_ = false; std::atomic last_io_time_secs_ = 0; int64_t last_ack_time_secs_ = 0; + std::atomic reconnect_attempts_ = 0; // For exponential backoff on reconnection bool next_try_old_psync_ = false; bool next_try_without_announce_ip_address_ = false; diff --git a/src/common/io_util.cc b/src/common/io_util.cc index 1136dbb1848..e481b1803e6 100644 --- a/src/common/io_util.cc +++ b/src/common/io_util.cc @@ -29,7 +29,10 @@ #include #include +#include + #include "fmt/ostream.h" +#include "scope_exit.h" #include "server/tls_util.h" #ifdef __linux__ @@ -468,6 +471,100 @@ Status SockSend(int fd, const std::string &data, [[maybe_unused]] bufferevent *b #endif } +Status SockSendWithTimeout(int fd, const std::string &data, int timeout_ms) { + // Fall back to blocking send if timeout is non-positive + if (timeout_ms <= 0) { + return SockSend(fd, data); + } + + ssize_t n = 0; + auto start = std::chrono::steady_clock::now(); + + while (n < static_cast(data.size())) { + // Check if we've exceeded the timeout + auto elapsed = + std::chrono::duration_cast(std::chrono::steady_clock::now() - start).count(); + if (elapsed >= timeout_ms) { + return {Status::NotOK, fmt::format("send timeout after {} ms, sent {} of {} bytes", elapsed, n, data.size())}; + } + + // Calculate remaining timeout + int remaining_ms = timeout_ms - static_cast(elapsed); + + // Wait for socket to be writable with timeout + int ready = AeWait(fd, AE_WRITABLE, remaining_ms); + if (ready == 0) { + return {Status::NotOK, fmt::format("send timeout waiting for socket, sent {} of {} bytes", n, data.size())}; + } + if (ready < 0) { + return Status::FromErrno("poll error while sending"); + } + + ssize_t nwritten = write(fd, data.data() + n, data.size() - n); + if (nwritten == -1) { + if (errno == EAGAIN || errno == EWOULDBLOCK) { + // Socket buffer is full, continue waiting + continue; + } + return Status::FromErrno(); + } + n += nwritten; + } + return Status::OK(); +} + +Status SockSendWithTimeout(int fd, const std::string &data, [[maybe_unused]] bufferevent *bev, int timeout_ms) { + // Fall back to blocking send if timeout is non-positive + if (timeout_ms <= 0) { + return SockSend(fd, data, bev); + } + +#ifdef ENABLE_OPENSSL + auto ssl = bufferevent_openssl_get_ssl(bev); + if (ssl) { + // Save original flags and set socket to non-blocking for timeout support + int orig_flags = fcntl(fd, F_GETFL); + if (orig_flags == -1) return Status::FromErrno("fcntl(F_GETFL)"); + + auto s = SockSetBlocking(fd, 0); + if (!s.IsOK()) return s; + + // Restore original flags on scope exit + auto restore_flags = MakeScopeExit([fd, orig_flags] { fcntl(fd, F_SETFL, orig_flags); }); + + ssize_t n = 0; + auto start = std::chrono::steady_clock::now(); + + while (n < static_cast(data.size())) { + auto elapsed = + std::chrono::duration_cast(std::chrono::steady_clock::now() - start).count(); + if (elapsed >= timeout_ms) { + return {Status::NotOK, + fmt::format("SSL send timeout after {} ms, sent {} of {} bytes", elapsed, n, data.size())}; + } + + int remaining_ms = timeout_ms - static_cast(elapsed); + int ready = AeWait(fd, AE_WRITABLE, remaining_ms); + if (ready <= 0) { + return {Status::NotOK, fmt::format("SSL send timeout waiting for socket, sent {} of {} bytes", n, data.size())}; + } + + int nwritten = SSL_write(ssl, data.data() + n, static_cast(data.size() - n)); + if (nwritten <= 0) { + int err = SSL_get_error(ssl, nwritten); + if (err == SSL_ERROR_WANT_WRITE || err == SSL_ERROR_WANT_READ) { + continue; + } + return {Status::NotOK, fmt::format("SSL_write error: {}", err)}; + } + n += nwritten; + } + return Status::OK(); + } +#endif + return SockSendWithTimeout(fd, data, timeout_ms); +} + StatusOr SockConnect(const std::string &host, uint32_t port, [[maybe_unused]] ssl_st *ssl, int conn_timeout, int timeout) { #ifdef ENABLE_OPENSSL diff --git a/src/common/io_util.h b/src/common/io_util.h index d30789aea02..161a3012262 100644 --- a/src/common/io_util.h +++ b/src/common/io_util.h @@ -54,6 +54,10 @@ Status Pwrite(int fd, const std::string &data, off_t offset); Status SockSend(int fd, const std::string &data, ssl_st *ssl); Status SockSend(int fd, const std::string &data, bufferevent *bev); +// Send with timeout - returns error if send would block for longer than timeout_ms +Status SockSendWithTimeout(int fd, const std::string &data, int timeout_ms); +Status SockSendWithTimeout(int fd, const std::string &data, bufferevent *bev, int timeout_ms); + Status SockSendFile(int out_fd, int in_fd, size_t size, ssl_st *ssl); Status SockSendFile(int out_fd, int in_fd, size_t size, bufferevent *bev); diff --git a/src/config/config.cc b/src/config/config.cc index dd82f8ee7d6..31694809138 100644 --- a/src/config/config.cc +++ b/src/config/config.cc @@ -206,6 +206,8 @@ Config::Config() { {"replication-no-slowdown", false, new YesNoField(&replication_no_slowdown, true)}, {"replication-delay-bytes", false, new IntField(&max_replication_delay_bytes, 16 * 1024, 1, INT_MAX)}, {"replication-delay-updates", false, new IntField(&max_replication_delay_updates, 16, 1, INT_MAX)}, + {"max-replication-lag", false, new Int64Field(&max_replication_lag, 0, 0, INT64_MAX)}, + {"replication-send-timeout-ms", false, new IntField(&replication_send_timeout_ms, 30000, 1000, 300000)}, {"use-rsid-psync", true, new YesNoField(&use_rsid_psync, false)}, {"profiling-sample-ratio", false, new IntField(&profiling_sample_ratio, 0, 0, 100)}, {"profiling-sample-record-max-len", false, new IntField(&profiling_sample_record_max_len, 256, 0, INT_MAX)}, diff --git a/src/config/config.h b/src/config/config.h index 4fbb80137a0..22cbb404155 100644 --- a/src/config/config.h +++ b/src/config/config.h @@ -126,6 +126,8 @@ struct Config { int replication_recv_timeout_ms = 3200; int max_replication_delay_bytes = 16 * 1024; // 16KB default int max_replication_delay_updates = 16; // 16 updates default + int64_t max_replication_lag = 0; // 0 = disabled, otherwise max sequences before disconnecting slow consumer + int replication_send_timeout_ms = 30000; // 30 second timeout for socket sends to replicas int max_db_size = 0; int max_replication_mb = 0; int max_io_mb = 0; diff --git a/tests/cppunit/config_test.cc b/tests/cppunit/config_test.cc index f339a2ae01c..b7d931be461 100644 --- a/tests/cppunit/config_test.cc +++ b/tests/cppunit/config_test.cc @@ -86,6 +86,8 @@ TEST(Config, GetAndSet) { {"rocksdb.max_background_jobs", "4"}, {"rocksdb.compression_start_level", "2"}, {"rocksdb.sst_file_delete_rate_bytes_per_sec", "0"}, + {"max-replication-lag", "50000000"}, + {"replication-send-timeout-ms", "60000"}, }; std::vector values; for (const auto &iter : mutable_cases) { diff --git a/tests/gocase/integration/replication/replication_test.go b/tests/gocase/integration/replication/replication_test.go index 39ed535cafd..6e291ecc590 100644 --- a/tests/gocase/integration/replication/replication_test.go +++ b/tests/gocase/integration/replication/replication_test.go @@ -711,3 +711,88 @@ func TestReplicationWatermark(t *testing.T) { // The small command should be processed much faster than 1 second require.Less(t, duration, 1*time.Second, "small command should be processed promptly") } + +func TestReplicationSlowConsumerConfig(t *testing.T) { + t.Parallel() + ctx := context.Background() + + // This test verifies the slow consumer protection config options are working: + // - max-replication-lag: threshold before disconnecting slow consumers + // - replication-send-timeout-ms: timeout for socket sends to replicas + master := util.StartServer(t, map[string]string{ + "max-replication-lag": "100000000", + "replication-send-timeout-ms": "30000", + }) + defer master.Close() + masterClient := master.NewClient() + defer func() { require.NoError(t, masterClient.Close()) }() + + slave := util.StartServer(t, map[string]string{}) + defer slave.Close() + slaveClient := slave.NewClient() + defer func() { require.NoError(t, slaveClient.Close()) }() + + t.Run("Slow consumer config options are readable and settable", func(t *testing.T) { + // Verify initial config values + maxLag := masterClient.ConfigGet(ctx, "max-replication-lag").Val() + require.Equal(t, "100000000", maxLag["max-replication-lag"]) + + sendTimeout := masterClient.ConfigGet(ctx, "replication-send-timeout-ms").Val() + require.Equal(t, "30000", sendTimeout["replication-send-timeout-ms"]) + + // Test CONFIG SET for max-replication-lag + require.NoError(t, masterClient.ConfigSet(ctx, "max-replication-lag", "50000000").Err()) + maxLag = masterClient.ConfigGet(ctx, "max-replication-lag").Val() + require.Equal(t, "50000000", maxLag["max-replication-lag"]) + + // Test CONFIG SET for replication-send-timeout-ms + require.NoError(t, masterClient.ConfigSet(ctx, "replication-send-timeout-ms", "15000").Err()) + sendTimeout = masterClient.ConfigGet(ctx, "replication-send-timeout-ms").Val() + require.Equal(t, "15000", sendTimeout["replication-send-timeout-ms"]) + + // Verify replication still works normally with these config options + util.SlaveOf(t, slaveClient, master) + util.WaitForSync(t, slaveClient) + require.Equal(t, "slave", util.FindInfoEntry(slaveClient, "role")) + + require.NoError(t, masterClient.Set(ctx, "test_key", "test_value", 0).Err()) + util.WaitForOffsetSync(t, masterClient, slaveClient, 5*time.Second) + require.Equal(t, "test_value", slaveClient.Get(ctx, "test_key").Val()) + }) +} + +func TestReplicationExponentialBackoff(t *testing.T) { + t.Parallel() + ctx := context.Background() + + master := util.StartServer(t, map[string]string{}) + defer master.Close() + masterClient := master.NewClient() + defer func() { require.NoError(t, masterClient.Close()) }() + + slave := util.StartServer(t, map[string]string{}) + defer slave.Close() + slaveClient := slave.NewClient() + defer func() { require.NoError(t, slaveClient.Close()) }() + + t.Run("Slave uses exponential backoff on reconnection", func(t *testing.T) { + // Connect slave to master + util.SlaveOf(t, slaveClient, master) + util.WaitForSync(t, slaveClient) + + // Kill the slave connection from master side to trigger reconnection + _, err := masterClient.ClientKillByFilter(ctx, "type", "slave").Result() + require.NoError(t, err) + + // The slave should log backoff messages when reconnecting + // First reconnection attempt should wait 1 second + require.Eventually(t, func() bool { + return slave.LogFileMatches(t, ".*waiting 1 seconds before reconnecting.*") + }, 10*time.Second, 200*time.Millisecond, "slave should log backoff on first reconnection") + + // Slave should eventually reconnect + require.Eventually(t, func() bool { + return util.FindInfoEntry(slaveClient, "master_link_status") == "up" + }, 15*time.Second, 500*time.Millisecond, "slave should reconnect with backoff") + }) +} diff --git a/tests/gocase/integration/replication/slow_consumer_bug_test.go b/tests/gocase/integration/replication/slow_consumer_bug_test.go new file mode 100644 index 00000000000..e8ddb0f79e8 --- /dev/null +++ b/tests/gocase/integration/replication/slow_consumer_bug_test.go @@ -0,0 +1,320 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package replication + +import ( + "context" + "fmt" + "strings" + "testing" + "time" + + "github.com/apache/kvrocks/tests/gocase/util" + "github.com/stretchr/testify/require" +) + +// TestSlowConsumerBug demonstrates the bug where a slow consumer can cause +// the master's FeedSlaveThread to block indefinitely. +// +// BUG DESCRIPTION: +// When a replica can't consume data fast enough, the master's FeedSlaveThread +// blocks on write() with no timeout. This can cause: +// 1. The feed thread to be stuck indefinitely +// 2. WAL files to rotate and be pruned while the thread is blocked +// 3. When the connection finally drops, the replica can't resume (psync fails) +// +// EXPECTED BEHAVIOR (with fix): +// - Master should timeout the send operation after a configurable period +// - Master should detect excessive lag and proactively disconnect slow consumers +// - Replica should use exponential backoff when reconnecting +// +// WITHOUT THE FIX: This test will hang or take a very long time +// WITH THE FIX: The master should disconnect the slow consumer quickly +func TestSlowConsumerBug(t *testing.T) { + t.Parallel() + ctx := context.Background() + + master := util.StartServer(t, map[string]string{}) + defer master.Close() + masterClient := master.NewClient() + defer func() { require.NoError(t, masterClient.Close()) }() + + // Create a pausable proxy + proxyCtx, cancelProxy := context.WithCancel(ctx) + defer cancelProxy() + pauseCh := make(chan bool, 1) + proxyPort := util.PausableTCPProxy(proxyCtx, t, fmt.Sprintf("127.0.0.1:%d", master.Port()), pauseCh) + + slave := util.StartServer(t, map[string]string{}) + defer slave.Close() + slaveClient := slave.NewClient() + defer func() { require.NoError(t, slaveClient.Close()) }() + + // Connect slave through proxy + require.NoError(t, slaveClient.SlaveOf(ctx, "127.0.0.1", fmt.Sprintf("%d", proxyPort)).Err()) + + // Wait for initial sync + require.Eventually(t, func() bool { + return util.FindInfoEntry(slaveClient, "master_link_status") == "up" + }, 10*time.Second, 100*time.Millisecond, "slave should connect") + + // Sync some initial data + require.NoError(t, masterClient.Set(ctx, "init_key", "init_value", 0).Err()) + util.WaitForOffsetSync(t, masterClient, slaveClient, 5*time.Second) + t.Log("Initial sync completed") + + // PAUSE the proxy - this simulates a slow/stuck consumer + t.Log("Pausing proxy to simulate slow consumer...") + pauseCh <- true + time.Sleep(200 * time.Millisecond) + + // Write data to master - this will cause the FeedSlaveThread to try sending + t.Log("Writing data to master while consumer is stuck...") + value := strings.Repeat("x", 4096) // 4KB value + for i := 0; i < 20; i++ { + require.NoError(t, masterClient.Set(ctx, fmt.Sprintf("key_%d", i), value, 0).Err()) + } + t.Log("Data written to master") + + // Now try to kill the slave connection from master + // WITHOUT THE FIX: This will hang because the FeedSlaveThread is blocked on write() + // WITH THE FIX: This should complete quickly due to send timeout + t.Log("Attempting to disconnect slow consumer from master...") + + startTime := time.Now() + + // Try to kill the slave connection - this should trigger the feed thread to notice + // and handle the disconnection. Without the fix, the thread may be stuck in write() + _, err := masterClient.ClientKillByFilter(ctx, "type", "slave").Result() + if err != nil { + t.Logf("ClientKill result: %v", err) + } + + // Check how long it takes for the master to recognize the slave is disconnected + // Without the fix, the FeedSlaveThread may still be blocked trying to write + disconnectDetected := false + for i := 0; i < 50; i++ { // Check for up to 5 seconds + time.Sleep(100 * time.Millisecond) + connectedSlaves := util.FindInfoEntry(masterClient, "connected_slaves") + if connectedSlaves == "0" { + disconnectDetected = true + break + } + } + + elapsed := time.Since(startTime) + t.Logf("Time to detect disconnection: %v", elapsed) + + // Resume proxy for cleanup + pauseCh <- false + + if !disconnectDetected { + t.Log("WARNING: Master did not detect slave disconnection within 5 seconds") + t.Log("This indicates the FeedSlaveThread may be blocked - demonstrating the bug") + } + + // The key assertion: with the fix, disconnection should be detected quickly + // Without the fix, it may take much longer or not be detected at all + if elapsed > 10*time.Second { + t.Logf("BUG DEMONSTRATED: Disconnection took %v (>10s), indicating blocked FeedSlaveThread", elapsed) + } else { + t.Logf("Disconnection detected in %v", elapsed) + } + + // Final check: slave should be able to reconnect eventually + t.Log("Checking if slave can reconnect...") + require.Eventually(t, func() bool { + return util.FindInfoEntry(slaveClient, "master_link_status") == "up" + }, 30*time.Second, 500*time.Millisecond, "slave should eventually reconnect") + t.Log("Slave reconnected successfully") +} + +// TestSlowConsumerBlocksIndefinitely demonstrates the core bug: +// Without the fix, the master's FeedSlaveThread can stay blocked INDEFINITELY +// when a consumer is stuck. In production, this has been observed to last 44+ HOURS. +// +// WHY IT CAN LAST SO LONG: +// 1. TCP keepalive doesn't help if the connection is technically "alive" +// 2. If the slow consumer accepts SOME data (just very slowly), TCP won't timeout +// 3. Without application-level timeout, write() blocks forever waiting for buffer space +// 4. The FeedSlaveThread has no mechanism to detect it's stuck +// +// CONSEQUENCES: +// 1. WAL files rotate and get pruned while the thread is blocked +// 2. When connection finally drops, the replica can't psync (sequence unavailable) +// 3. Full sync is required, causing significant load and downtime +// +// This test shows: +// 1. When proxy is paused, replication data accumulates (lag increases) +// 2. The master keeps the connection as "connected" even though no data flows +// 3. Without explicit intervention, this state persists INDEFINITELY +func TestSlowConsumerBlocksIndefinitely(t *testing.T) { + t.Parallel() + ctx := context.Background() + + // With the fix, we can configure: + // - max-replication-lag: disconnect when lag exceeds this (default 100M) + // - replication-send-timeout-ms: timeout on sends (default 30s) + // + // For this test, we use low values to see the fix in action quickly + master := util.StartServer(t, map[string]string{ + "max-replication-lag": "50", // Very low: disconnect when lag > 50 sequences + "replication-send-timeout-ms": "3000", // 3 second timeout + }) + defer master.Close() + masterClient := master.NewClient() + defer func() { require.NoError(t, masterClient.Close()) }() + + // Create proxy + proxyCtx, cancelProxy := context.WithCancel(ctx) + defer cancelProxy() + pauseCh := make(chan bool, 1) + proxyPort := util.PausableTCPProxy(proxyCtx, t, fmt.Sprintf("127.0.0.1:%d", master.Port()), pauseCh) + + slave := util.StartServer(t, map[string]string{}) + defer slave.Close() + slaveClient := slave.NewClient() + defer func() { require.NoError(t, slaveClient.Close()) }() + + // Connect and sync + require.NoError(t, slaveClient.SlaveOf(ctx, "127.0.0.1", fmt.Sprintf("%d", proxyPort)).Err()) + require.Eventually(t, func() bool { + return util.FindInfoEntry(slaveClient, "master_link_status") == "up" + }, 10*time.Second, 100*time.Millisecond) + + require.NoError(t, masterClient.Set(ctx, "init", "value", 0).Err()) + util.WaitForOffsetSync(t, masterClient, slaveClient, 5*time.Second) + + initialOffset := util.FindInfoEntry(masterClient, "master_repl_offset") + t.Logf("Initial master offset: %s", initialOffset) + + // Pause proxy to simulate stuck consumer + t.Log("=== SIMULATING SLOW CONSUMER (proxy paused) ===") + pauseCh <- true + time.Sleep(200 * time.Millisecond) + + // Write data to fill TCP buffers and create lag + // Need to write enough data to fill kernel TCP buffers (typically 64KB-256KB) + // plus the proxy's internal buffers + t.Log("Writing large amount of data to fill TCP buffers...") + value := strings.Repeat("x", 64*1024) // 64KB value + for i := 0; i < 50; i++ { + require.NoError(t, masterClient.Set(ctx, fmt.Sprintf("large_key_%d", i), value, 0).Err()) + } + // Total: ~3.2MB of data + + finalOffset := util.FindInfoEntry(masterClient, "master_repl_offset") + t.Logf("Master offset after writes: %s (was %s)", finalOffset, initialOffset) + + // Observe the stuck state over time + t.Log("") + t.Log("=== OBSERVING STUCK STATE ===") + t.Log("Without the fix, the connection stays 'connected' indefinitely.") + t.Log("In production, this has been observed to last 44+ HOURS.") + t.Log("") + + stuckDuration := 0 + disconnected := false + + // Observe for 15 seconds (send timeout is 3s, so should trigger within this window) + for i := 0; i < 15; i++ { + time.Sleep(1 * time.Second) + stuckDuration++ + + slaveInfo := util.FindInfoEntry(masterClient, "slave0") + connectedSlaves := util.FindInfoEntry(masterClient, "connected_slaves") + + if connectedSlaves == "0" { + t.Logf("✓ After %ds: Master DETECTED slow consumer and disconnected it", stuckDuration) + t.Log(" This means the FIX IS WORKING (send timeout or lag detection)") + disconnected = true + break + } else { + t.Logf("✗ After %ds: connected_slaves=%s, slave0=%s", stuckDuration, connectedSlaves, slaveInfo) + t.Log(" Connection still 'up' but NO DATA FLOWING - BUG DEMONSTRATED") + } + } + + t.Log("") + if !disconnected { + t.Log("=== BUG BEHAVIOR (or fix not triggered yet) ===") + t.Logf("After %d seconds, the connection is STILL marked as 'connected'", stuckDuration) + t.Log("Without the fix, this state would persist for 44+ HOURS.") + t.Log("") + t.Log("Root cause: FeedSlaveThread blocks on write() with NO TIMEOUT") + t.Log("The fix adds:") + t.Log(" 1. replication-send-timeout-ms: timeout on socket sends (default 30s)") + t.Log(" 2. max-replication-lag: proactive disconnect when lag too high") + t.Log("") + } + + // Resume proxy for cleanup + pauseCh <- false + + if disconnected { + t.Log("=== FIX VERIFIED ===") + t.Log("Master successfully disconnected slow consumer via send timeout or lag detection") + t.Log("Without the fix, this connection would have stayed 'stuck' for 44+ hours") + + // Verify the fix - master should have disconnected the slow consumer + require.True(t, disconnected, "With the fix, master should disconnect slow consumer") + } else { + // Without the fix, connection stays stuck + t.Log("=== BUG DEMONSTRATED ===") + t.Log("Without the fix, the connection stays stuck indefinitely") + } +} + +// TestNoSendTimeoutConfig verifies that the send timeout config doesn't exist +// in the unfixed version. This test should FAIL on the fixed version. +func TestNoSendTimeoutConfig(t *testing.T) { + t.Parallel() + ctx := context.Background() + + srv := util.StartServer(t, map[string]string{}) + defer srv.Close() + client := srv.NewClient() + defer func() { require.NoError(t, client.Close()) }() + + // These config options should NOT exist in the unfixed version + _, err := client.ConfigGet(ctx, "max-replication-lag").Result() + if err != nil { + t.Logf("max-replication-lag config not found (expected in unfixed version): %v", err) + } else { + result := client.ConfigGet(ctx, "max-replication-lag").Val() + if len(result) == 0 || result["max-replication-lag"] == "" { + t.Log("max-replication-lag config does not exist (UNFIXED VERSION)") + } else { + t.Logf("max-replication-lag exists with value: %s (FIXED VERSION)", result["max-replication-lag"]) + } + } + + _, err = client.ConfigGet(ctx, "replication-send-timeout-ms").Result() + if err != nil { + t.Logf("replication-send-timeout-ms config not found (expected in unfixed version): %v", err) + } else { + result := client.ConfigGet(ctx, "replication-send-timeout-ms").Val() + if len(result) == 0 || result["replication-send-timeout-ms"] == "" { + t.Log("replication-send-timeout-ms config does not exist (UNFIXED VERSION)") + } else { + t.Logf("replication-send-timeout-ms exists with value: %s (FIXED VERSION)", result["replication-send-timeout-ms"]) + } + } +} diff --git a/tests/gocase/util/client.go b/tests/gocase/util/client.go index 180163b55e2..2750215d7f0 100644 --- a/tests/gocase/util/client.go +++ b/tests/gocase/util/client.go @@ -27,6 +27,7 @@ import ( "net" "regexp" "strings" + "sync/atomic" "testing" "time" @@ -151,3 +152,119 @@ func SimpleTCPProxy(ctx context.Context, t testing.TB, to string, slowdown bool) }() return uint64(addr.Port) } + +// PausableTCPProxy creates a TCP proxy that can be paused/resumed via a channel. +// Send true to pause, false to resume. Returns the proxy port. +// When paused, the proxy stops reading from the source, causing the sender's +// TCP buffer to fill up and eventually blocking writes. +func PausableTCPProxy(ctx context.Context, t testing.TB, to string, pauseCh <-chan bool) uint64 { + addr, err := findFreePort() + if err != nil { + t.Fatalf("can't find a free port, %v", err) + } + from := addr.String() + + listener, err := net.Listen("tcp", from) + if err != nil { + t.Fatalf("listen to %s failed, err: %v", from, err) + } + + paused := &atomic.Bool{} + + // Goroutine to handle pause/resume signals + go func() { + for { + select { + case <-ctx.Done(): + return + case p := <-pauseCh: + paused.Store(p) + } + } + }() + + copyBytes := func(src, dest io.ReadWriter, direction string) func() error { + buffer := make([]byte, 4096) + return func() error { + COPY_LOOP: + for { + select { + case <-ctx.Done(): + t.Log("forwarding tcp stream stopped") + break COPY_LOOP + default: + // When paused, only block reading from the master (to slave direction) + // This causes master's send buffer to fill, eventually blocking master's writes + if paused.Load() && direction == "to_slave" { + time.Sleep(time.Millisecond * 100) + continue + } + + // Set read deadline to allow checking pause state periodically + if conn, ok := src.(net.Conn); ok { + conn.SetReadDeadline(time.Now().Add(200 * time.Millisecond)) + } + + n, err := src.Read(buffer) + if err != nil { + if errors.Is(err, io.EOF) { + break COPY_LOOP + } + if netErr, ok := err.(net.Error); ok && netErr.Timeout() { + continue + } + return err + } + _, err = dest.Write(buffer[:n]) + if err != nil { + if errors.Is(err, io.EOF) { + break COPY_LOOP + } + return err + } + } + } + return nil + } + } + + go func() { + defer listener.Close() + LISTEN_LOOP: + for { + select { + case <-ctx.Done(): + break LISTEN_LOOP + + default: + listener.(*net.TCPListener).SetDeadline(time.Now().Add(100 * time.Millisecond)) + conn, err := listener.Accept() + if err != nil { + if netErr, ok := err.(net.Error); ok && netErr.Timeout() { + continue + } + t.Logf("accept conn failed, err: %v", err) + continue + } + dest, err := net.Dial("tcp", to) + if err != nil { + t.Logf("dial to %s failed, err: %v", to, err) + conn.Close() + continue + } + go func() { + var errGrp errgroup.Group + // conn is from slave, dest is to master + // "to_slave" = reading from master (dest), writing to slave (conn) + // "to_master" = reading from slave (conn), writing to master (dest) + errGrp.Go(copyBytes(dest, conn, "to_slave")) + errGrp.Go(copyBytes(conn, dest, "to_master")) + errGrp.Wait() + conn.Close() + dest.Close() + }() + } + } + }() + return uint64(addr.Port) +} From cb920d8202a6f532672c059fb24580e31be20af2 Mon Sep 17 00:00:00 2001 From: Mario de Frutos Date: Tue, 3 Feb 2026 11:17:20 +0100 Subject: [PATCH 2/3] style(config): fix clang-format comment alignment Adjust whitespace alignment of comments for max_replication_lag and replication_send_timeout_ms to satisfy clang-format-18 requirements. --- src/config/config.h | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/config/config.h b/src/config/config.h index 22cbb404155..675bbd34c24 100644 --- a/src/config/config.h +++ b/src/config/config.h @@ -126,8 +126,8 @@ struct Config { int replication_recv_timeout_ms = 3200; int max_replication_delay_bytes = 16 * 1024; // 16KB default int max_replication_delay_updates = 16; // 16 updates default - int64_t max_replication_lag = 0; // 0 = disabled, otherwise max sequences before disconnecting slow consumer - int replication_send_timeout_ms = 30000; // 30 second timeout for socket sends to replicas + int64_t max_replication_lag = 0; // 0 = disabled, otherwise max sequences before disconnecting slow consumer + int replication_send_timeout_ms = 30000; // 30 second timeout for socket sends to replicas int max_db_size = 0; int max_replication_mb = 0; int max_io_mb = 0; From 6f2ed24344f96f5dbac2a3453b6f5234f0d88908 Mon Sep 17 00:00:00 2001 From: Mario de Frutos Date: Tue, 3 Feb 2026 11:38:31 +0100 Subject: [PATCH 3/3] fix(tests): slow down cluster migration test for fast hardware The TestClusterReset test was failing on macOS ARM because the slot migration completed before the test could observe the "start" state. Reduce migrate-speed from 128 to 64 and increase data size from 1024 to 2048 elements to ensure the migration takes long enough to observe intermediate states on fast hardware (~32 seconds vs ~8 seconds). --- tests/gocase/integration/cluster/cluster_test.go | 7 ++++--- .../integration/replication/slow_consumer_bug_test.go | 2 +- tests/gocase/util/client.go | 6 +++--- 3 files changed, 8 insertions(+), 7 deletions(-) diff --git a/tests/gocase/integration/cluster/cluster_test.go b/tests/gocase/integration/cluster/cluster_test.go index 489fab266a1..79008bf4a10 100644 --- a/tests/gocase/integration/cluster/cluster_test.go +++ b/tests/gocase/integration/cluster/cluster_test.go @@ -574,9 +574,10 @@ func TestClusterReset(t *testing.T) { t.Run("cannot reset cluster if the db is migrating the slot", func(t *testing.T) { slotNum := 2 - // slow down the migration speed to avoid breaking other test cases - require.NoError(t, rdb0.ConfigSet(ctx, "migrate-speed", "128").Err()) - for i := 0; i < 1024; i++ { + // slow down the migration speed to ensure we can observe the "start" state + // before migration completes (especially on fast hardware like macOS ARM) + require.NoError(t, rdb0.ConfigSet(ctx, "migrate-speed", "64").Err()) + for i := 0; i < 2048; i++ { require.NoError(t, rdb0.RPush(ctx, "my-list", fmt.Sprintf("element%d", i)).Err()) } diff --git a/tests/gocase/integration/replication/slow_consumer_bug_test.go b/tests/gocase/integration/replication/slow_consumer_bug_test.go index e8ddb0f79e8..ace9aa179c0 100644 --- a/tests/gocase/integration/replication/slow_consumer_bug_test.go +++ b/tests/gocase/integration/replication/slow_consumer_bug_test.go @@ -175,7 +175,7 @@ func TestSlowConsumerBlocksIndefinitely(t *testing.T) { // // For this test, we use low values to see the fix in action quickly master := util.StartServer(t, map[string]string{ - "max-replication-lag": "50", // Very low: disconnect when lag > 50 sequences + "max-replication-lag": "50", // Very low: disconnect when lag > 50 sequences "replication-send-timeout-ms": "3000", // 3 second timeout }) defer master.Close() diff --git a/tests/gocase/util/client.go b/tests/gocase/util/client.go index 2750215d7f0..49a4e929a79 100644 --- a/tests/gocase/util/client.go +++ b/tests/gocase/util/client.go @@ -202,7 +202,7 @@ func PausableTCPProxy(ctx context.Context, t testing.TB, to string, pauseCh <-ch // Set read deadline to allow checking pause state periodically if conn, ok := src.(net.Conn); ok { - conn.SetReadDeadline(time.Now().Add(200 * time.Millisecond)) + _ = conn.SetReadDeadline(time.Now().Add(200 * time.Millisecond)) } n, err := src.Read(buffer) @@ -237,7 +237,7 @@ func PausableTCPProxy(ctx context.Context, t testing.TB, to string, pauseCh <-ch break LISTEN_LOOP default: - listener.(*net.TCPListener).SetDeadline(time.Now().Add(100 * time.Millisecond)) + _ = listener.(*net.TCPListener).SetDeadline(time.Now().Add(100 * time.Millisecond)) conn, err := listener.Accept() if err != nil { if netErr, ok := err.(net.Error); ok && netErr.Timeout() { @@ -259,7 +259,7 @@ func PausableTCPProxy(ctx context.Context, t testing.TB, to string, pauseCh <-ch // "to_master" = reading from slave (conn), writing to master (dest) errGrp.Go(copyBytes(dest, conn, "to_slave")) errGrp.Go(copyBytes(conn, dest, "to_master")) - errGrp.Wait() + _ = errGrp.Wait() conn.Close() dest.Close() }()