Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
53 commits
Select commit Hold shift + click to select a range
7eb2044
Refactor coordinator logic and add sharded support
bootjp Feb 4, 2026
7916d7d
Refactor SSH key handling and update .gitignore
bootjp Feb 5, 2026
454b342
Fix go build command in Dockerfile
bootjp Feb 5, 2026
607edc7
Refactor error messages into package-level variables
bootjp Feb 5, 2026
a081a09
plaintextFix alignment of error variable declarations
bootjp Feb 5, 2026
122eff4
Update jepsen/src/elastickv/db.clj
bootjp Feb 5, 2026
f19cf76
Initial plan
Copilot Feb 5, 2026
4ea2ac6
Initial plan
Copilot Feb 5, 2026
13d2237
Update main.go
bootjp Feb 5, 2026
bcde8d2
Update kv/shard_store.go
bootjp Feb 5, 2026
f16c4bd
Fix ScanAt to only query groups with intersecting shard ranges
Copilot Feb 5, 2026
a35bc1a
Add .cache/ to .gitignore and remove from repository
Copilot Feb 5, 2026
18933b5
Fix resource leaks in newRaftGroup error paths
Copilot Feb 5, 2026
4b9881b
Add cache directories to .gitignore
Copilot Feb 5, 2026
ec0e6a5
Refactor cleanup logic to use helper function
Copilot Feb 5, 2026
c8c2d9f
Remove accidentally committed cache files
Copilot Feb 5, 2026
2d344c9
Merge branch 'feature/multi-raft' into copilot/sub-pr-304
bootjp Feb 5, 2026
f3f2f58
Merge branch 'feature/multi-raft' into copilot/sub-pr-304-again
bootjp Feb 5, 2026
5e43d88
Merge pull request #305 from bootjp/copilot/sub-pr-304
bootjp Feb 5, 2026
61712fb
Merge branch 'feature/multi-raft' into copilot/sub-pr-304-again
bootjp Feb 5, 2026
b989eab
Update multiraft_runtime.go
bootjp Feb 5, 2026
c812a8b
Add comments clarifying resource cleanup logic
Copilot Feb 5, 2026
21b8b9a
Merge pull request #306 from bootjp/copilot/sub-pr-304-again
bootjp Feb 5, 2026
2bf224c
Add RawLatestCommitTS RPC and update checksums
bootjp Feb 14, 2026
a0ffe1d
Merge branch 'main' into feature/multi-raft
bootjp Feb 14, 2026
bd431fd
Merge branch 'feature/multi-raft' of github.com:bootjp/elastickv into…
bootjp Feb 14, 2026
73fd50d
Fix sharded proxy reads and config parsing
bootjp Feb 14, 2026
ee4f377
Define sentinel errors for config parsing
bootjp Feb 14, 2026
c30684a
kv: share gRPC conn cache
bootjp Feb 14, 2026
81f31ea
shard_config: trim raftGroups ids
bootjp Feb 14, 2026
0a57a72
kv: export conn cache and reduce scan overfetch
bootjp Feb 14, 2026
d0a9902
adapter: fix EXEC sharding and reuse gRPC conns
bootjp Feb 14, 2026
868a912
kv: centralize max commit TS lookup
bootjp Feb 14, 2026
243551c
main: fix raftRedisMap defaults and close gRPC conn cache
bootjp Feb 14, 2026
25d84d1
kv: reject cross-shard transactions
bootjp Feb 14, 2026
8abb055
main: clean up raft groups on init failure
bootjp Feb 14, 2026
565637d
kv: add LeaderProxy tests
bootjp Feb 14, 2026
b7bcfa3
kv: propagate context through coordinator dispatch
bootjp Feb 14, 2026
64ff374
kv: close leader proxy conn cache
bootjp Feb 14, 2026
0e31949
main: keep single-group raft dir layout
bootjp Feb 14, 2026
c541c9f
adapter: verify leadership before local reads
bootjp Feb 14, 2026
73c7e06
Update kv/shard_store.go
bootjp Feb 14, 2026
659f8a3
kv: fix sharded ScanAt routing and limits
bootjp Feb 14, 2026
822f8fd
kv: dial gRPC connections outside cache lock
bootjp Feb 14, 2026
e3266a7
kv: codify shard-aware MVCC store requirement
bootjp Feb 14, 2026
9759f17
kv: use shard-aware store interface instead of concrete ShardStore ch…
bootjp Feb 14, 2026
f79a0de
adapter: remove store type assertions by injecting leader-routed MVCC…
bootjp Feb 14, 2026
4a4ddaf
kv: address latest review feedback
bootjp Feb 14, 2026
7ce2245
test: cover cross-shard txn rejection and harden cleanup
bootjp Feb 14, 2026
5770096
kv/adapter: harden request validation and cleanup
bootjp Feb 14, 2026
9501517
Initial plan
Copilot Feb 14, 2026
ddd51ad
Rename RawKvPair to RawKVPair for consistent naming
Copilot Feb 14, 2026
873c0c9
Merge pull request #312 from bootjp/copilot/sub-pr-304
bootjp Feb 14, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -29,3 +29,11 @@ jepsen/.lein-*
jepsen/.nrepl-port
.m2/
jepsen/store/

# Jepsen local SSH keys (generated locally; never commit)
jepsen/docker/id_rsa
jepsen/.ssh/

# Build and lint cache directories
.cache/
.golangci-cache/
2 changes: 1 addition & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ FROM golang:latest AS build
WORKDIR $GOPATH/src/app
COPY . .

RUN CGO_ENABLED=0 go build -o /app main.go
RUN CGO_ENABLED=0 go build -o /app .

FROM gcr.io/distroless/static:latest
COPY --from=build /app /app
Expand Down
8 changes: 4 additions & 4 deletions adapter/dynamodb.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ type DynamoDBServer struct {
httpServer *http.Server
}

func NewDynamoDBServer(listen net.Listener, st store.MVCCStore, coordinate *kv.Coordinate) *DynamoDBServer {
func NewDynamoDBServer(listen net.Listener, st store.MVCCStore, coordinate kv.Coordinator) *DynamoDBServer {
d := &DynamoDBServer{
listen: listen,
store: st,
Expand Down Expand Up @@ -85,7 +85,7 @@ func (d *DynamoDBServer) putItem(w http.ResponseWriter, r *http.Request) {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
if _, err = d.coordinator.Dispatch(reqs); err != nil {
if _, err = d.coordinator.Dispatch(r.Context(), reqs); err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
Expand Down Expand Up @@ -195,7 +195,7 @@ func (d *DynamoDBServer) updateItem(w http.ResponseWriter, r *http.Request) {
IsTxn: false,
Elems: []*kv.Elem[kv.OP]{elem},
}
if _, err = d.coordinator.Dispatch(req); err != nil {
if _, err = d.coordinator.Dispatch(r.Context(), req); err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
Expand All @@ -214,7 +214,7 @@ func (d *DynamoDBServer) transactWriteItems(w http.ResponseWriter, r *http.Reque
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
if _, err = d.coordinator.Dispatch(reqs); err != nil {
if _, err = d.coordinator.Dispatch(r.Context(), reqs); err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
Expand Down
178 changes: 116 additions & 62 deletions adapter/grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,14 @@ import (
"context"
"log/slog"
"os"
"sync"

"github.com/bootjp/elastickv/internal"
"github.com/bootjp/elastickv/kv"
pb "github.com/bootjp/elastickv/proto"
"github.com/bootjp/elastickv/store"
"github.com/cockroachdb/errors"
"github.com/spaolacci/murmur3"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
)

var _ pb.RawKVServer = (*GRPCServer)(nil)
Expand All @@ -24,96 +23,151 @@ type GRPCServer struct {
coordinator kv.Coordinator
store store.MVCCStore

closeStore bool
closeOnce sync.Once
closeErr error

pb.UnimplementedRawKVServer
pb.UnimplementedTransactionalKVServer
}

func NewGRPCServer(store store.MVCCStore, coordinate *kv.Coordinate) *GRPCServer {
return &GRPCServer{
type GRPCServerOption func(*GRPCServer)

func WithCloseStore() GRPCServerOption {
return func(s *GRPCServer) {
s.closeStore = true
}
}

func NewGRPCServer(store store.MVCCStore, coordinate kv.Coordinator, opts ...GRPCServerOption) *GRPCServer {
s := &GRPCServer{
log: slog.New(slog.NewJSONHandler(os.Stdout, &slog.HandlerOptions{
Level: slog.LevelWarn,
})),
grpcTranscoder: newGrpcGrpcTranscoder(),
coordinator: coordinate,
store: store,
}
for _, opt := range opts {
if opt == nil {
continue
}
opt(s)
}
return s
}

func (r GRPCServer) RawGet(ctx context.Context, req *pb.RawGetRequest) (*pb.RawGetResponse, error) {
readTS := req.GetTs()
if readTS == 0 {
readTS = snapshotTS(r.coordinator.Clock(), r.store)
func (r *GRPCServer) Close() error {
if r == nil {
return nil
}

if r.coordinator.IsLeader() {
v, err := r.store.GetAt(ctx, req.Key, readTS)
if err != nil {
switch {
case errors.Is(err, store.ErrKeyNotFound):
return &pb.RawGetResponse{
Value: nil,
}, nil
default:
return nil, errors.WithStack(err)
}
r.closeOnce.Do(func() {
if !r.closeStore || r.store == nil {
return
}
if err := r.store.Close(); err != nil {
r.closeErr = errors.WithStack(err)
}
r.log.InfoContext(ctx, "Get",
slog.String("key", string(req.Key)),
slog.String("value", string(v)))
})
return r.closeErr
}
Comment on lines 60 to 73
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

The Close method is a no-op, but the GRPCServer holds a store.MVCCStore which, in practice, is a *kv.LeaderRoutedStore. This store contains a GRPCConnCache that manages gRPC connections. Without closing the store, these connections will be leaked. The Close method should delegate to r.store.Close() to ensure proper resource cleanup.

func (r *GRPCServer) Close() error {
	if r == nil || r.store == nil {
		return nil
	}
	return r.store.Close()
}


func (r *GRPCServer) clock() *kv.HLC {
if r == nil || r.coordinator == nil {
return nil
}
return r.coordinator.Clock()
}

return &pb.RawGetResponse{
Value: v,
}, nil
func (r *GRPCServer) RawGet(ctx context.Context, req *pb.RawGetRequest) (*pb.RawGetResponse, error) {
readTS := req.GetTs()
if readTS == 0 {
readTS = snapshotTS(r.clock(), r.store)
}

v, err := r.tryLeaderGet(req.Key)
v, err := r.store.GetAt(ctx, req.Key, readTS)
if errors.Is(err, store.ErrKeyNotFound) {
return &pb.RawGetResponse{Value: nil}, nil
}
if err != nil {
return &pb.RawGetResponse{
Value: nil,
}, err
return nil, errors.WithStack(err)
}

r.log.InfoContext(ctx, "Get",
slog.String("key", string(req.Key)),
slog.String("value", string(v)))

return &pb.RawGetResponse{
Value: v,
}, nil
return &pb.RawGetResponse{Value: v}, nil
}

func (r GRPCServer) tryLeaderGet(key []byte) ([]byte, error) {
addr := r.coordinator.RaftLeader()
if addr == "" {
return nil, ErrLeaderNotFound
func (r *GRPCServer) RawLatestCommitTS(ctx context.Context, req *pb.RawLatestCommitTSRequest) (*pb.RawLatestCommitTSResponse, error) {
key := req.GetKey()
if len(key) == 0 {
return nil, errors.WithStack(kv.ErrInvalidRequest)
}

conn, err := grpc.NewClient(string(addr),
grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithDefaultCallOptions(grpc.WaitForReady(true)),
)
ts, exists, err := r.store.LatestCommitTS(ctx, key)
if err != nil {
return nil, errors.WithStack(err)
}
defer conn.Close()
return &pb.RawLatestCommitTSResponse{
Ts: ts,
Exists: exists,
}, nil
}

cli := pb.NewRawKVClient(conn)
ts := snapshotTS(r.coordinator.Clock(), r.store)
resp, err := cli.RawGet(context.Background(), &pb.RawGetRequest{Key: key, Ts: ts})
func (r *GRPCServer) RawScanAt(ctx context.Context, req *pb.RawScanAtRequest) (*pb.RawScanAtResponse, error) {
limit64 := req.GetLimit()
limit, err := rawScanLimit(limit64)
if err != nil {
return nil, errors.WithStack(err)
return &pb.RawScanAtResponse{Kv: nil}, err
}

return resp.Value, nil
readTS := req.GetTs()
if readTS == 0 {
readTS = snapshotTS(r.clock(), r.store)
}

res, err := r.store.ScanAt(ctx, req.StartKey, req.EndKey, limit, readTS)
if err != nil {
return &pb.RawScanAtResponse{Kv: nil}, errors.WithStack(err)
}

return &pb.RawScanAtResponse{Kv: rawKvPairs(res)}, nil
}

func rawScanLimit(limit64 int64) (int, error) {
if limit64 < 0 {
return 0, errors.WithStack(kv.ErrInvalidRequest)
}
maxInt64 := int64(^uint(0) >> 1)
if limit64 > maxInt64 {
return 0, errors.WithStack(internal.ErrIntOverflow)
}
return int(limit64), nil
}

func rawKvPairs(res []*store.KVPair) []*pb.RawKVPair {
out := make([]*pb.RawKVPair, 0, len(res))
for _, kvp := range res {
if kvp == nil {
continue
}
out = append(out, &pb.RawKVPair{
Key: kvp.Key,
Value: kvp.Value,
})
}
return out
}

func (r GRPCServer) RawPut(_ context.Context, req *pb.RawPutRequest) (*pb.RawPutResponse, error) {
func (r *GRPCServer) RawPut(ctx context.Context, req *pb.RawPutRequest) (*pb.RawPutResponse, error) {
m, err := r.grpcTranscoder.RawPutToRequest(req)
if err != nil {
return nil, errors.WithStack(err)
}

res, err := r.coordinator.Dispatch(m)
res, err := r.coordinator.Dispatch(ctx, m)
if err != nil {
return &pb.RawPutResponse{
CommitIndex: uint64(0),
Expand All @@ -127,13 +181,13 @@ func (r GRPCServer) RawPut(_ context.Context, req *pb.RawPutRequest) (*pb.RawPut
}, nil
}

func (r GRPCServer) RawDelete(ctx context.Context, req *pb.RawDeleteRequest) (*pb.RawDeleteResponse, error) {
func (r *GRPCServer) RawDelete(ctx context.Context, req *pb.RawDeleteRequest) (*pb.RawDeleteResponse, error) {
m, err := r.grpcTranscoder.RawDeleteToRequest(req)
if err != nil {
return nil, errors.WithStack(err)
}

res, err := r.coordinator.Dispatch(m)
res, err := r.coordinator.Dispatch(ctx, m)
if err != nil {
return &pb.RawDeleteResponse{
CommitIndex: uint64(0),
Expand All @@ -147,27 +201,27 @@ func (r GRPCServer) RawDelete(ctx context.Context, req *pb.RawDeleteRequest) (*p
}, nil
}

func (r GRPCServer) PreWrite(ctx context.Context, req *pb.PreWriteRequest) (*pb.PreCommitResponse, error) {
func (r *GRPCServer) PreWrite(ctx context.Context, req *pb.PreWriteRequest) (*pb.PreCommitResponse, error) {
return nil, kv.ErrNotImplemented
}

func (r GRPCServer) Commit(ctx context.Context, req *pb.CommitRequest) (*pb.CommitResponse, error) {
func (r *GRPCServer) Commit(ctx context.Context, req *pb.CommitRequest) (*pb.CommitResponse, error) {
return nil, kv.ErrNotImplemented
}

func (r GRPCServer) Rollback(ctx context.Context, req *pb.RollbackRequest) (*pb.RollbackResponse, error) {
func (r *GRPCServer) Rollback(ctx context.Context, req *pb.RollbackRequest) (*pb.RollbackResponse, error) {
return nil, kv.ErrNotImplemented
}

func (r GRPCServer) Put(ctx context.Context, req *pb.PutRequest) (*pb.PutResponse, error) {
func (r *GRPCServer) Put(ctx context.Context, req *pb.PutRequest) (*pb.PutResponse, error) {
reqs, err := r.grpcTranscoder.TransactionalPutToRequests(req)
if err != nil {
return nil, errors.WithStack(err)
}

r.log.InfoContext(ctx, "Put", slog.Any("reqs", reqs))

res, err := r.coordinator.Dispatch(reqs)
res, err := r.coordinator.Dispatch(ctx, reqs)
if err != nil {
return &pb.PutResponse{
CommitIndex: uint64(0),
Expand All @@ -180,13 +234,13 @@ func (r GRPCServer) Put(ctx context.Context, req *pb.PutRequest) (*pb.PutRespons
}, nil
}

func (r GRPCServer) Get(ctx context.Context, req *pb.GetRequest) (*pb.GetResponse, error) {
func (r *GRPCServer) Get(ctx context.Context, req *pb.GetRequest) (*pb.GetResponse, error) {
h := murmur3.New64()
if _, err := h.Write(req.Key); err != nil {
return nil, errors.WithStack(err)
}

readTS := snapshotTS(r.coordinator.Clock(), r.store)
readTS := snapshotTS(r.clock(), r.store)
v, err := r.store.GetAt(ctx, req.Key, readTS)
if err != nil {
switch {
Expand All @@ -206,15 +260,15 @@ func (r GRPCServer) Get(ctx context.Context, req *pb.GetRequest) (*pb.GetRespons
}, nil
}

func (r GRPCServer) Delete(ctx context.Context, req *pb.DeleteRequest) (*pb.DeleteResponse, error) {
func (r *GRPCServer) Delete(ctx context.Context, req *pb.DeleteRequest) (*pb.DeleteResponse, error) {
reqs, err := r.grpcTranscoder.TransactionalDeleteToRequests(req)
if err != nil {
return nil, errors.WithStack(err)
}

r.log.InfoContext(ctx, "Delete", slog.Any("reqs", reqs))

res, err := r.coordinator.Dispatch(reqs)
res, err := r.coordinator.Dispatch(ctx, reqs)
if err != nil {
return &pb.DeleteResponse{
CommitIndex: uint64(0),
Expand All @@ -227,14 +281,14 @@ func (r GRPCServer) Delete(ctx context.Context, req *pb.DeleteRequest) (*pb.Dele
}, nil
}

func (r GRPCServer) Scan(ctx context.Context, req *pb.ScanRequest) (*pb.ScanResponse, error) {
func (r *GRPCServer) Scan(ctx context.Context, req *pb.ScanRequest) (*pb.ScanResponse, error) {
limit, err := internal.Uint64ToInt(req.Limit)
if err != nil {
return &pb.ScanResponse{
Kv: nil,
}, errors.WithStack(err)
}
readTS := snapshotTS(r.coordinator.Clock(), r.store)
readTS := snapshotTS(r.clock(), r.store)
res, err := r.store.ScanAt(ctx, req.StartKey, req.EndKey, limit, readTS)
if err != nil {
return &pb.ScanResponse{
Expand Down
Loading
Loading