diff --git a/kvrocks.conf b/kvrocks.conf index ad52bb3763a..a4180cde1a5 100644 --- a/kvrocks.conf +++ b/kvrocks.conf @@ -231,6 +231,21 @@ replication-delay-bytes 16384 # Default: 16 updates replication-delay-updates 16 +# Maximum sequence lag allowed before disconnecting a slow replica. +# If a replica falls behind by more than this many sequences, the master will +# disconnect it to prevent WAL exhaustion. The replica can then reconnect and +# attempt partial sync (psync) if the sequence is still available. +# Set to 0 to disable this check (default). +# Default: 0 (disabled) +max-replication-lag 0 + +# Timeout in milliseconds for socket send operations to replicas. +# If sending data to a replica blocks for longer than this timeout, +# the connection will be dropped. This prevents the replication feed thread +# from blocking indefinitely on slow consumers. +# Default: 30000 (30 seconds) +replication-send-timeout-ms 30000 + # TCP listen() backlog. # # In high requests-per-second environments you need an high backlog in order diff --git a/src/cluster/replication.cc b/src/cluster/replication.cc index bb211095c51..f91a6a36e46 100644 --- a/src/cluster/replication.cc +++ b/src/cluster/replication.cc @@ -63,7 +63,9 @@ FeedSlaveThread::FeedSlaveThread(Server *srv, redis::Connection *conn, rocksdb:: next_repl_seq_(next_repl_seq), req_(srv), max_delay_bytes_(srv->GetConfig()->max_replication_delay_bytes), - max_delay_updates_(srv->GetConfig()->max_replication_delay_updates) {} + max_delay_updates_(srv->GetConfig()->max_replication_delay_updates), + max_replication_lag_(srv->GetConfig()->max_replication_lag), + send_timeout_ms_(srv->GetConfig()->replication_send_timeout_ms) {} Status FeedSlaveThread::Start() { auto s = util::CreateThread("feed-replica", [this] { @@ -184,6 +186,21 @@ void FeedSlaveThread::loop() { while (!IsStopped()) { auto curr_seq = next_repl_seq_.load(); + // Check replication lag - disconnect slow consumers before WAL is exhausted + // Skip check if max_replication_lag_ is 0 (feature disabled) + if (max_replication_lag_ > 0) { + auto latest_seq = srv_->storage->LatestSeqNumber(); + if (latest_seq > curr_seq) { + auto lag = static_cast(latest_seq - curr_seq); + if (lag > max_replication_lag_) { + ERROR("Replication lag {} exceeds max allowed {} for slave {}:{}, disconnecting to prevent WAL exhaustion", + lag, max_replication_lag_, conn_->GetAnnounceIP(), conn_->GetListeningPort()); + Stop(); + return; + } + } + } + if (!iter_ || !iter_->Valid()) { if (iter_) INFO("WAL was rotated, would reopen again"); if (!srv_->storage->WALHasNewData(curr_seq) || !srv_->storage->GetWALIter(curr_seq, &iter_).IsOK()) { @@ -221,10 +238,12 @@ void FeedSlaveThread::loop() { batches_bulk += redis::BulkString("_getack"); } - // Send entire bulk which contain multiple batches - auto s = util::SockSend(conn_->GetFD(), batches_bulk, conn_->GetBufferEvent()); + // Send entire bulk which contain multiple batches with timeout + // This prevents blocking indefinitely on slow consumers + auto s = util::SockSendWithTimeout(conn_->GetFD(), batches_bulk, conn_->GetBufferEvent(), send_timeout_ms_); if (!s.IsOK()) { - ERROR("Write error while sending batch to slave: {}. batches: 0x{}", s.Msg(), util::StringToHex(batches_bulk)); + ERROR("Write error while sending batch to slave {}:{}: {}. batch_size={}", conn_->GetAnnounceIP(), + conn_->GetListeningPort(), s.Msg(), batches_bulk.size()); Stop(); return; } @@ -260,9 +279,14 @@ void ReplicationThread::CallbacksStateMachine::ConnEventCB(bufferevent *bev, int } if (events & (BEV_EVENT_ERROR | BEV_EVENT_EOF)) { ERROR("[replication] connection error/eof, reconnect the master"); - // Wait a bit and reconnect + // Wait with exponential backoff before reconnecting + constexpr int kMaxBackoffSeconds = 60; + constexpr int kMaxShiftBits = 6; // Cap shift to avoid UB; 2^6 = 64 then clamped to 60 repl_->repl_state_.store(kReplConnecting, std::memory_order_relaxed); - std::this_thread::sleep_for(std::chrono::seconds(1)); + int attempts = repl_->reconnect_attempts_.fetch_add(1, std::memory_order_relaxed); + int backoff_secs = std::min(1 << std::min(attempts, kMaxShiftBits), kMaxBackoffSeconds); + WARN("[replication] waiting {} seconds before reconnecting (attempt {})", backoff_secs, attempts + 1); + std::this_thread::sleep_for(std::chrono::seconds(backoff_secs)); Stop(); Start(); } @@ -634,6 +658,7 @@ ReplicationThread::CBState ReplicationThread::tryPSyncReadCB(bufferevent *bev) { } else { // PSYNC is OK, use IncrementBatchLoop INFO("[replication] PSync is ok, start increment batch loop"); + reconnect_attempts_.store(0, std::memory_order_relaxed); // Reset backoff counter on successful connection return CBState::NEXT; } } @@ -879,6 +904,7 @@ ReplicationThread::CBState ReplicationThread::fullSyncReadCB(bufferevent *bev) { return CBState::RESTART; } INFO("[replication] Succeeded restoring the backup, fullsync was finish"); + reconnect_attempts_.store(0, std::memory_order_relaxed); // Reset backoff counter on successful fullsync post_fullsync_cb_(); // It needs to reload namespaces from DB after the full sync is done, diff --git a/src/cluster/replication.h b/src/cluster/replication.h index ab9ab7ae2d5..5b6c8fbd1b0 100644 --- a/src/cluster/replication.h +++ b/src/cluster/replication.h @@ -91,6 +91,8 @@ class FeedSlaveThread { // Configurable delay limits size_t max_delay_bytes_; size_t max_delay_updates_; + int64_t max_replication_lag_; + int send_timeout_ms_; void loop(); void checkLivenessIfNeed(); @@ -166,6 +168,7 @@ class ReplicationThread : private EventCallbackBase { const bool replication_group_sync_ = false; std::atomic last_io_time_secs_ = 0; int64_t last_ack_time_secs_ = 0; + std::atomic reconnect_attempts_ = 0; // For exponential backoff on reconnection bool next_try_old_psync_ = false; bool next_try_without_announce_ip_address_ = false; diff --git a/src/common/io_util.cc b/src/common/io_util.cc index 1136dbb1848..e481b1803e6 100644 --- a/src/common/io_util.cc +++ b/src/common/io_util.cc @@ -29,7 +29,10 @@ #include #include +#include + #include "fmt/ostream.h" +#include "scope_exit.h" #include "server/tls_util.h" #ifdef __linux__ @@ -468,6 +471,100 @@ Status SockSend(int fd, const std::string &data, [[maybe_unused]] bufferevent *b #endif } +Status SockSendWithTimeout(int fd, const std::string &data, int timeout_ms) { + // Fall back to blocking send if timeout is non-positive + if (timeout_ms <= 0) { + return SockSend(fd, data); + } + + ssize_t n = 0; + auto start = std::chrono::steady_clock::now(); + + while (n < static_cast(data.size())) { + // Check if we've exceeded the timeout + auto elapsed = + std::chrono::duration_cast(std::chrono::steady_clock::now() - start).count(); + if (elapsed >= timeout_ms) { + return {Status::NotOK, fmt::format("send timeout after {} ms, sent {} of {} bytes", elapsed, n, data.size())}; + } + + // Calculate remaining timeout + int remaining_ms = timeout_ms - static_cast(elapsed); + + // Wait for socket to be writable with timeout + int ready = AeWait(fd, AE_WRITABLE, remaining_ms); + if (ready == 0) { + return {Status::NotOK, fmt::format("send timeout waiting for socket, sent {} of {} bytes", n, data.size())}; + } + if (ready < 0) { + return Status::FromErrno("poll error while sending"); + } + + ssize_t nwritten = write(fd, data.data() + n, data.size() - n); + if (nwritten == -1) { + if (errno == EAGAIN || errno == EWOULDBLOCK) { + // Socket buffer is full, continue waiting + continue; + } + return Status::FromErrno(); + } + n += nwritten; + } + return Status::OK(); +} + +Status SockSendWithTimeout(int fd, const std::string &data, [[maybe_unused]] bufferevent *bev, int timeout_ms) { + // Fall back to blocking send if timeout is non-positive + if (timeout_ms <= 0) { + return SockSend(fd, data, bev); + } + +#ifdef ENABLE_OPENSSL + auto ssl = bufferevent_openssl_get_ssl(bev); + if (ssl) { + // Save original flags and set socket to non-blocking for timeout support + int orig_flags = fcntl(fd, F_GETFL); + if (orig_flags == -1) return Status::FromErrno("fcntl(F_GETFL)"); + + auto s = SockSetBlocking(fd, 0); + if (!s.IsOK()) return s; + + // Restore original flags on scope exit + auto restore_flags = MakeScopeExit([fd, orig_flags] { fcntl(fd, F_SETFL, orig_flags); }); + + ssize_t n = 0; + auto start = std::chrono::steady_clock::now(); + + while (n < static_cast(data.size())) { + auto elapsed = + std::chrono::duration_cast(std::chrono::steady_clock::now() - start).count(); + if (elapsed >= timeout_ms) { + return {Status::NotOK, + fmt::format("SSL send timeout after {} ms, sent {} of {} bytes", elapsed, n, data.size())}; + } + + int remaining_ms = timeout_ms - static_cast(elapsed); + int ready = AeWait(fd, AE_WRITABLE, remaining_ms); + if (ready <= 0) { + return {Status::NotOK, fmt::format("SSL send timeout waiting for socket, sent {} of {} bytes", n, data.size())}; + } + + int nwritten = SSL_write(ssl, data.data() + n, static_cast(data.size() - n)); + if (nwritten <= 0) { + int err = SSL_get_error(ssl, nwritten); + if (err == SSL_ERROR_WANT_WRITE || err == SSL_ERROR_WANT_READ) { + continue; + } + return {Status::NotOK, fmt::format("SSL_write error: {}", err)}; + } + n += nwritten; + } + return Status::OK(); + } +#endif + return SockSendWithTimeout(fd, data, timeout_ms); +} + StatusOr SockConnect(const std::string &host, uint32_t port, [[maybe_unused]] ssl_st *ssl, int conn_timeout, int timeout) { #ifdef ENABLE_OPENSSL diff --git a/src/common/io_util.h b/src/common/io_util.h index d30789aea02..161a3012262 100644 --- a/src/common/io_util.h +++ b/src/common/io_util.h @@ -54,6 +54,10 @@ Status Pwrite(int fd, const std::string &data, off_t offset); Status SockSend(int fd, const std::string &data, ssl_st *ssl); Status SockSend(int fd, const std::string &data, bufferevent *bev); +// Send with timeout - returns error if send would block for longer than timeout_ms +Status SockSendWithTimeout(int fd, const std::string &data, int timeout_ms); +Status SockSendWithTimeout(int fd, const std::string &data, bufferevent *bev, int timeout_ms); + Status SockSendFile(int out_fd, int in_fd, size_t size, ssl_st *ssl); Status SockSendFile(int out_fd, int in_fd, size_t size, bufferevent *bev); diff --git a/src/config/config.cc b/src/config/config.cc index dd82f8ee7d6..31694809138 100644 --- a/src/config/config.cc +++ b/src/config/config.cc @@ -206,6 +206,8 @@ Config::Config() { {"replication-no-slowdown", false, new YesNoField(&replication_no_slowdown, true)}, {"replication-delay-bytes", false, new IntField(&max_replication_delay_bytes, 16 * 1024, 1, INT_MAX)}, {"replication-delay-updates", false, new IntField(&max_replication_delay_updates, 16, 1, INT_MAX)}, + {"max-replication-lag", false, new Int64Field(&max_replication_lag, 0, 0, INT64_MAX)}, + {"replication-send-timeout-ms", false, new IntField(&replication_send_timeout_ms, 30000, 1000, 300000)}, {"use-rsid-psync", true, new YesNoField(&use_rsid_psync, false)}, {"profiling-sample-ratio", false, new IntField(&profiling_sample_ratio, 0, 0, 100)}, {"profiling-sample-record-max-len", false, new IntField(&profiling_sample_record_max_len, 256, 0, INT_MAX)}, diff --git a/src/config/config.h b/src/config/config.h index 4fbb80137a0..675bbd34c24 100644 --- a/src/config/config.h +++ b/src/config/config.h @@ -126,6 +126,8 @@ struct Config { int replication_recv_timeout_ms = 3200; int max_replication_delay_bytes = 16 * 1024; // 16KB default int max_replication_delay_updates = 16; // 16 updates default + int64_t max_replication_lag = 0; // 0 = disabled, otherwise max sequences before disconnecting slow consumer + int replication_send_timeout_ms = 30000; // 30 second timeout for socket sends to replicas int max_db_size = 0; int max_replication_mb = 0; int max_io_mb = 0; diff --git a/tests/cppunit/config_test.cc b/tests/cppunit/config_test.cc index f339a2ae01c..b7d931be461 100644 --- a/tests/cppunit/config_test.cc +++ b/tests/cppunit/config_test.cc @@ -86,6 +86,8 @@ TEST(Config, GetAndSet) { {"rocksdb.max_background_jobs", "4"}, {"rocksdb.compression_start_level", "2"}, {"rocksdb.sst_file_delete_rate_bytes_per_sec", "0"}, + {"max-replication-lag", "50000000"}, + {"replication-send-timeout-ms", "60000"}, }; std::vector values; for (const auto &iter : mutable_cases) { diff --git a/tests/gocase/integration/cluster/cluster_test.go b/tests/gocase/integration/cluster/cluster_test.go index 489fab266a1..79008bf4a10 100644 --- a/tests/gocase/integration/cluster/cluster_test.go +++ b/tests/gocase/integration/cluster/cluster_test.go @@ -574,9 +574,10 @@ func TestClusterReset(t *testing.T) { t.Run("cannot reset cluster if the db is migrating the slot", func(t *testing.T) { slotNum := 2 - // slow down the migration speed to avoid breaking other test cases - require.NoError(t, rdb0.ConfigSet(ctx, "migrate-speed", "128").Err()) - for i := 0; i < 1024; i++ { + // slow down the migration speed to ensure we can observe the "start" state + // before migration completes (especially on fast hardware like macOS ARM) + require.NoError(t, rdb0.ConfigSet(ctx, "migrate-speed", "64").Err()) + for i := 0; i < 2048; i++ { require.NoError(t, rdb0.RPush(ctx, "my-list", fmt.Sprintf("element%d", i)).Err()) } diff --git a/tests/gocase/integration/replication/replication_test.go b/tests/gocase/integration/replication/replication_test.go index 39ed535cafd..6e291ecc590 100644 --- a/tests/gocase/integration/replication/replication_test.go +++ b/tests/gocase/integration/replication/replication_test.go @@ -711,3 +711,88 @@ func TestReplicationWatermark(t *testing.T) { // The small command should be processed much faster than 1 second require.Less(t, duration, 1*time.Second, "small command should be processed promptly") } + +func TestReplicationSlowConsumerConfig(t *testing.T) { + t.Parallel() + ctx := context.Background() + + // This test verifies the slow consumer protection config options are working: + // - max-replication-lag: threshold before disconnecting slow consumers + // - replication-send-timeout-ms: timeout for socket sends to replicas + master := util.StartServer(t, map[string]string{ + "max-replication-lag": "100000000", + "replication-send-timeout-ms": "30000", + }) + defer master.Close() + masterClient := master.NewClient() + defer func() { require.NoError(t, masterClient.Close()) }() + + slave := util.StartServer(t, map[string]string{}) + defer slave.Close() + slaveClient := slave.NewClient() + defer func() { require.NoError(t, slaveClient.Close()) }() + + t.Run("Slow consumer config options are readable and settable", func(t *testing.T) { + // Verify initial config values + maxLag := masterClient.ConfigGet(ctx, "max-replication-lag").Val() + require.Equal(t, "100000000", maxLag["max-replication-lag"]) + + sendTimeout := masterClient.ConfigGet(ctx, "replication-send-timeout-ms").Val() + require.Equal(t, "30000", sendTimeout["replication-send-timeout-ms"]) + + // Test CONFIG SET for max-replication-lag + require.NoError(t, masterClient.ConfigSet(ctx, "max-replication-lag", "50000000").Err()) + maxLag = masterClient.ConfigGet(ctx, "max-replication-lag").Val() + require.Equal(t, "50000000", maxLag["max-replication-lag"]) + + // Test CONFIG SET for replication-send-timeout-ms + require.NoError(t, masterClient.ConfigSet(ctx, "replication-send-timeout-ms", "15000").Err()) + sendTimeout = masterClient.ConfigGet(ctx, "replication-send-timeout-ms").Val() + require.Equal(t, "15000", sendTimeout["replication-send-timeout-ms"]) + + // Verify replication still works normally with these config options + util.SlaveOf(t, slaveClient, master) + util.WaitForSync(t, slaveClient) + require.Equal(t, "slave", util.FindInfoEntry(slaveClient, "role")) + + require.NoError(t, masterClient.Set(ctx, "test_key", "test_value", 0).Err()) + util.WaitForOffsetSync(t, masterClient, slaveClient, 5*time.Second) + require.Equal(t, "test_value", slaveClient.Get(ctx, "test_key").Val()) + }) +} + +func TestReplicationExponentialBackoff(t *testing.T) { + t.Parallel() + ctx := context.Background() + + master := util.StartServer(t, map[string]string{}) + defer master.Close() + masterClient := master.NewClient() + defer func() { require.NoError(t, masterClient.Close()) }() + + slave := util.StartServer(t, map[string]string{}) + defer slave.Close() + slaveClient := slave.NewClient() + defer func() { require.NoError(t, slaveClient.Close()) }() + + t.Run("Slave uses exponential backoff on reconnection", func(t *testing.T) { + // Connect slave to master + util.SlaveOf(t, slaveClient, master) + util.WaitForSync(t, slaveClient) + + // Kill the slave connection from master side to trigger reconnection + _, err := masterClient.ClientKillByFilter(ctx, "type", "slave").Result() + require.NoError(t, err) + + // The slave should log backoff messages when reconnecting + // First reconnection attempt should wait 1 second + require.Eventually(t, func() bool { + return slave.LogFileMatches(t, ".*waiting 1 seconds before reconnecting.*") + }, 10*time.Second, 200*time.Millisecond, "slave should log backoff on first reconnection") + + // Slave should eventually reconnect + require.Eventually(t, func() bool { + return util.FindInfoEntry(slaveClient, "master_link_status") == "up" + }, 15*time.Second, 500*time.Millisecond, "slave should reconnect with backoff") + }) +} diff --git a/tests/gocase/integration/replication/slow_consumer_bug_test.go b/tests/gocase/integration/replication/slow_consumer_bug_test.go new file mode 100644 index 00000000000..ace9aa179c0 --- /dev/null +++ b/tests/gocase/integration/replication/slow_consumer_bug_test.go @@ -0,0 +1,320 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package replication + +import ( + "context" + "fmt" + "strings" + "testing" + "time" + + "github.com/apache/kvrocks/tests/gocase/util" + "github.com/stretchr/testify/require" +) + +// TestSlowConsumerBug demonstrates the bug where a slow consumer can cause +// the master's FeedSlaveThread to block indefinitely. +// +// BUG DESCRIPTION: +// When a replica can't consume data fast enough, the master's FeedSlaveThread +// blocks on write() with no timeout. This can cause: +// 1. The feed thread to be stuck indefinitely +// 2. WAL files to rotate and be pruned while the thread is blocked +// 3. When the connection finally drops, the replica can't resume (psync fails) +// +// EXPECTED BEHAVIOR (with fix): +// - Master should timeout the send operation after a configurable period +// - Master should detect excessive lag and proactively disconnect slow consumers +// - Replica should use exponential backoff when reconnecting +// +// WITHOUT THE FIX: This test will hang or take a very long time +// WITH THE FIX: The master should disconnect the slow consumer quickly +func TestSlowConsumerBug(t *testing.T) { + t.Parallel() + ctx := context.Background() + + master := util.StartServer(t, map[string]string{}) + defer master.Close() + masterClient := master.NewClient() + defer func() { require.NoError(t, masterClient.Close()) }() + + // Create a pausable proxy + proxyCtx, cancelProxy := context.WithCancel(ctx) + defer cancelProxy() + pauseCh := make(chan bool, 1) + proxyPort := util.PausableTCPProxy(proxyCtx, t, fmt.Sprintf("127.0.0.1:%d", master.Port()), pauseCh) + + slave := util.StartServer(t, map[string]string{}) + defer slave.Close() + slaveClient := slave.NewClient() + defer func() { require.NoError(t, slaveClient.Close()) }() + + // Connect slave through proxy + require.NoError(t, slaveClient.SlaveOf(ctx, "127.0.0.1", fmt.Sprintf("%d", proxyPort)).Err()) + + // Wait for initial sync + require.Eventually(t, func() bool { + return util.FindInfoEntry(slaveClient, "master_link_status") == "up" + }, 10*time.Second, 100*time.Millisecond, "slave should connect") + + // Sync some initial data + require.NoError(t, masterClient.Set(ctx, "init_key", "init_value", 0).Err()) + util.WaitForOffsetSync(t, masterClient, slaveClient, 5*time.Second) + t.Log("Initial sync completed") + + // PAUSE the proxy - this simulates a slow/stuck consumer + t.Log("Pausing proxy to simulate slow consumer...") + pauseCh <- true + time.Sleep(200 * time.Millisecond) + + // Write data to master - this will cause the FeedSlaveThread to try sending + t.Log("Writing data to master while consumer is stuck...") + value := strings.Repeat("x", 4096) // 4KB value + for i := 0; i < 20; i++ { + require.NoError(t, masterClient.Set(ctx, fmt.Sprintf("key_%d", i), value, 0).Err()) + } + t.Log("Data written to master") + + // Now try to kill the slave connection from master + // WITHOUT THE FIX: This will hang because the FeedSlaveThread is blocked on write() + // WITH THE FIX: This should complete quickly due to send timeout + t.Log("Attempting to disconnect slow consumer from master...") + + startTime := time.Now() + + // Try to kill the slave connection - this should trigger the feed thread to notice + // and handle the disconnection. Without the fix, the thread may be stuck in write() + _, err := masterClient.ClientKillByFilter(ctx, "type", "slave").Result() + if err != nil { + t.Logf("ClientKill result: %v", err) + } + + // Check how long it takes for the master to recognize the slave is disconnected + // Without the fix, the FeedSlaveThread may still be blocked trying to write + disconnectDetected := false + for i := 0; i < 50; i++ { // Check for up to 5 seconds + time.Sleep(100 * time.Millisecond) + connectedSlaves := util.FindInfoEntry(masterClient, "connected_slaves") + if connectedSlaves == "0" { + disconnectDetected = true + break + } + } + + elapsed := time.Since(startTime) + t.Logf("Time to detect disconnection: %v", elapsed) + + // Resume proxy for cleanup + pauseCh <- false + + if !disconnectDetected { + t.Log("WARNING: Master did not detect slave disconnection within 5 seconds") + t.Log("This indicates the FeedSlaveThread may be blocked - demonstrating the bug") + } + + // The key assertion: with the fix, disconnection should be detected quickly + // Without the fix, it may take much longer or not be detected at all + if elapsed > 10*time.Second { + t.Logf("BUG DEMONSTRATED: Disconnection took %v (>10s), indicating blocked FeedSlaveThread", elapsed) + } else { + t.Logf("Disconnection detected in %v", elapsed) + } + + // Final check: slave should be able to reconnect eventually + t.Log("Checking if slave can reconnect...") + require.Eventually(t, func() bool { + return util.FindInfoEntry(slaveClient, "master_link_status") == "up" + }, 30*time.Second, 500*time.Millisecond, "slave should eventually reconnect") + t.Log("Slave reconnected successfully") +} + +// TestSlowConsumerBlocksIndefinitely demonstrates the core bug: +// Without the fix, the master's FeedSlaveThread can stay blocked INDEFINITELY +// when a consumer is stuck. In production, this has been observed to last 44+ HOURS. +// +// WHY IT CAN LAST SO LONG: +// 1. TCP keepalive doesn't help if the connection is technically "alive" +// 2. If the slow consumer accepts SOME data (just very slowly), TCP won't timeout +// 3. Without application-level timeout, write() blocks forever waiting for buffer space +// 4. The FeedSlaveThread has no mechanism to detect it's stuck +// +// CONSEQUENCES: +// 1. WAL files rotate and get pruned while the thread is blocked +// 2. When connection finally drops, the replica can't psync (sequence unavailable) +// 3. Full sync is required, causing significant load and downtime +// +// This test shows: +// 1. When proxy is paused, replication data accumulates (lag increases) +// 2. The master keeps the connection as "connected" even though no data flows +// 3. Without explicit intervention, this state persists INDEFINITELY +func TestSlowConsumerBlocksIndefinitely(t *testing.T) { + t.Parallel() + ctx := context.Background() + + // With the fix, we can configure: + // - max-replication-lag: disconnect when lag exceeds this (default 100M) + // - replication-send-timeout-ms: timeout on sends (default 30s) + // + // For this test, we use low values to see the fix in action quickly + master := util.StartServer(t, map[string]string{ + "max-replication-lag": "50", // Very low: disconnect when lag > 50 sequences + "replication-send-timeout-ms": "3000", // 3 second timeout + }) + defer master.Close() + masterClient := master.NewClient() + defer func() { require.NoError(t, masterClient.Close()) }() + + // Create proxy + proxyCtx, cancelProxy := context.WithCancel(ctx) + defer cancelProxy() + pauseCh := make(chan bool, 1) + proxyPort := util.PausableTCPProxy(proxyCtx, t, fmt.Sprintf("127.0.0.1:%d", master.Port()), pauseCh) + + slave := util.StartServer(t, map[string]string{}) + defer slave.Close() + slaveClient := slave.NewClient() + defer func() { require.NoError(t, slaveClient.Close()) }() + + // Connect and sync + require.NoError(t, slaveClient.SlaveOf(ctx, "127.0.0.1", fmt.Sprintf("%d", proxyPort)).Err()) + require.Eventually(t, func() bool { + return util.FindInfoEntry(slaveClient, "master_link_status") == "up" + }, 10*time.Second, 100*time.Millisecond) + + require.NoError(t, masterClient.Set(ctx, "init", "value", 0).Err()) + util.WaitForOffsetSync(t, masterClient, slaveClient, 5*time.Second) + + initialOffset := util.FindInfoEntry(masterClient, "master_repl_offset") + t.Logf("Initial master offset: %s", initialOffset) + + // Pause proxy to simulate stuck consumer + t.Log("=== SIMULATING SLOW CONSUMER (proxy paused) ===") + pauseCh <- true + time.Sleep(200 * time.Millisecond) + + // Write data to fill TCP buffers and create lag + // Need to write enough data to fill kernel TCP buffers (typically 64KB-256KB) + // plus the proxy's internal buffers + t.Log("Writing large amount of data to fill TCP buffers...") + value := strings.Repeat("x", 64*1024) // 64KB value + for i := 0; i < 50; i++ { + require.NoError(t, masterClient.Set(ctx, fmt.Sprintf("large_key_%d", i), value, 0).Err()) + } + // Total: ~3.2MB of data + + finalOffset := util.FindInfoEntry(masterClient, "master_repl_offset") + t.Logf("Master offset after writes: %s (was %s)", finalOffset, initialOffset) + + // Observe the stuck state over time + t.Log("") + t.Log("=== OBSERVING STUCK STATE ===") + t.Log("Without the fix, the connection stays 'connected' indefinitely.") + t.Log("In production, this has been observed to last 44+ HOURS.") + t.Log("") + + stuckDuration := 0 + disconnected := false + + // Observe for 15 seconds (send timeout is 3s, so should trigger within this window) + for i := 0; i < 15; i++ { + time.Sleep(1 * time.Second) + stuckDuration++ + + slaveInfo := util.FindInfoEntry(masterClient, "slave0") + connectedSlaves := util.FindInfoEntry(masterClient, "connected_slaves") + + if connectedSlaves == "0" { + t.Logf("✓ After %ds: Master DETECTED slow consumer and disconnected it", stuckDuration) + t.Log(" This means the FIX IS WORKING (send timeout or lag detection)") + disconnected = true + break + } else { + t.Logf("✗ After %ds: connected_slaves=%s, slave0=%s", stuckDuration, connectedSlaves, slaveInfo) + t.Log(" Connection still 'up' but NO DATA FLOWING - BUG DEMONSTRATED") + } + } + + t.Log("") + if !disconnected { + t.Log("=== BUG BEHAVIOR (or fix not triggered yet) ===") + t.Logf("After %d seconds, the connection is STILL marked as 'connected'", stuckDuration) + t.Log("Without the fix, this state would persist for 44+ HOURS.") + t.Log("") + t.Log("Root cause: FeedSlaveThread blocks on write() with NO TIMEOUT") + t.Log("The fix adds:") + t.Log(" 1. replication-send-timeout-ms: timeout on socket sends (default 30s)") + t.Log(" 2. max-replication-lag: proactive disconnect when lag too high") + t.Log("") + } + + // Resume proxy for cleanup + pauseCh <- false + + if disconnected { + t.Log("=== FIX VERIFIED ===") + t.Log("Master successfully disconnected slow consumer via send timeout or lag detection") + t.Log("Without the fix, this connection would have stayed 'stuck' for 44+ hours") + + // Verify the fix - master should have disconnected the slow consumer + require.True(t, disconnected, "With the fix, master should disconnect slow consumer") + } else { + // Without the fix, connection stays stuck + t.Log("=== BUG DEMONSTRATED ===") + t.Log("Without the fix, the connection stays stuck indefinitely") + } +} + +// TestNoSendTimeoutConfig verifies that the send timeout config doesn't exist +// in the unfixed version. This test should FAIL on the fixed version. +func TestNoSendTimeoutConfig(t *testing.T) { + t.Parallel() + ctx := context.Background() + + srv := util.StartServer(t, map[string]string{}) + defer srv.Close() + client := srv.NewClient() + defer func() { require.NoError(t, client.Close()) }() + + // These config options should NOT exist in the unfixed version + _, err := client.ConfigGet(ctx, "max-replication-lag").Result() + if err != nil { + t.Logf("max-replication-lag config not found (expected in unfixed version): %v", err) + } else { + result := client.ConfigGet(ctx, "max-replication-lag").Val() + if len(result) == 0 || result["max-replication-lag"] == "" { + t.Log("max-replication-lag config does not exist (UNFIXED VERSION)") + } else { + t.Logf("max-replication-lag exists with value: %s (FIXED VERSION)", result["max-replication-lag"]) + } + } + + _, err = client.ConfigGet(ctx, "replication-send-timeout-ms").Result() + if err != nil { + t.Logf("replication-send-timeout-ms config not found (expected in unfixed version): %v", err) + } else { + result := client.ConfigGet(ctx, "replication-send-timeout-ms").Val() + if len(result) == 0 || result["replication-send-timeout-ms"] == "" { + t.Log("replication-send-timeout-ms config does not exist (UNFIXED VERSION)") + } else { + t.Logf("replication-send-timeout-ms exists with value: %s (FIXED VERSION)", result["replication-send-timeout-ms"]) + } + } +} diff --git a/tests/gocase/util/client.go b/tests/gocase/util/client.go index 180163b55e2..49a4e929a79 100644 --- a/tests/gocase/util/client.go +++ b/tests/gocase/util/client.go @@ -27,6 +27,7 @@ import ( "net" "regexp" "strings" + "sync/atomic" "testing" "time" @@ -151,3 +152,119 @@ func SimpleTCPProxy(ctx context.Context, t testing.TB, to string, slowdown bool) }() return uint64(addr.Port) } + +// PausableTCPProxy creates a TCP proxy that can be paused/resumed via a channel. +// Send true to pause, false to resume. Returns the proxy port. +// When paused, the proxy stops reading from the source, causing the sender's +// TCP buffer to fill up and eventually blocking writes. +func PausableTCPProxy(ctx context.Context, t testing.TB, to string, pauseCh <-chan bool) uint64 { + addr, err := findFreePort() + if err != nil { + t.Fatalf("can't find a free port, %v", err) + } + from := addr.String() + + listener, err := net.Listen("tcp", from) + if err != nil { + t.Fatalf("listen to %s failed, err: %v", from, err) + } + + paused := &atomic.Bool{} + + // Goroutine to handle pause/resume signals + go func() { + for { + select { + case <-ctx.Done(): + return + case p := <-pauseCh: + paused.Store(p) + } + } + }() + + copyBytes := func(src, dest io.ReadWriter, direction string) func() error { + buffer := make([]byte, 4096) + return func() error { + COPY_LOOP: + for { + select { + case <-ctx.Done(): + t.Log("forwarding tcp stream stopped") + break COPY_LOOP + default: + // When paused, only block reading from the master (to slave direction) + // This causes master's send buffer to fill, eventually blocking master's writes + if paused.Load() && direction == "to_slave" { + time.Sleep(time.Millisecond * 100) + continue + } + + // Set read deadline to allow checking pause state periodically + if conn, ok := src.(net.Conn); ok { + _ = conn.SetReadDeadline(time.Now().Add(200 * time.Millisecond)) + } + + n, err := src.Read(buffer) + if err != nil { + if errors.Is(err, io.EOF) { + break COPY_LOOP + } + if netErr, ok := err.(net.Error); ok && netErr.Timeout() { + continue + } + return err + } + _, err = dest.Write(buffer[:n]) + if err != nil { + if errors.Is(err, io.EOF) { + break COPY_LOOP + } + return err + } + } + } + return nil + } + } + + go func() { + defer listener.Close() + LISTEN_LOOP: + for { + select { + case <-ctx.Done(): + break LISTEN_LOOP + + default: + _ = listener.(*net.TCPListener).SetDeadline(time.Now().Add(100 * time.Millisecond)) + conn, err := listener.Accept() + if err != nil { + if netErr, ok := err.(net.Error); ok && netErr.Timeout() { + continue + } + t.Logf("accept conn failed, err: %v", err) + continue + } + dest, err := net.Dial("tcp", to) + if err != nil { + t.Logf("dial to %s failed, err: %v", to, err) + conn.Close() + continue + } + go func() { + var errGrp errgroup.Group + // conn is from slave, dest is to master + // "to_slave" = reading from master (dest), writing to slave (conn) + // "to_master" = reading from slave (conn), writing to master (dest) + errGrp.Go(copyBytes(dest, conn, "to_slave")) + errGrp.Go(copyBytes(conn, dest, "to_master")) + _ = errGrp.Wait() + conn.Close() + dest.Close() + }() + } + } + }() + return uint64(addr.Port) +}