Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
830 changes: 826 additions & 4 deletions Cargo.lock

Large diffs are not rendered by default.

2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ members = [
"cli",
"rust/hyperstack-server",
"rust/hyperstack-sdk",
"rust/hyperstack-auth",
"rust/hyperstack-auth-server",
"stacks/sdk/rust",
]
exclude = [
Expand Down
120 changes: 83 additions & 37 deletions cli/src/commands/stream/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,11 @@ fn build_state(args: &StreamArgs, view: &str, url: &str) -> Result<StreamState>
.map(|s| {
let s = s.trim().to_lowercase();
// Normalize "create" → "upsert" to match op normalization at comparison time
if s == "create" { "upsert".to_string() } else { s }
if s == "create" {
"upsert".to_string()
} else {
s
}
})
.collect::<HashSet<_>>()
});
Expand Down Expand Up @@ -89,7 +93,14 @@ pub async fn stream(url: String, view: &str, args: &StreamArgs) -> Result<()> {

// Emit NoDna connected event only after successful WebSocket handshake
if let OutputMode::NoDna = state.output_mode {
output::emit_no_dna_event(&mut state.out, "connected", view, &serde_json::json!({"url": url}), 0, 0)?;
output::emit_no_dna_event(
&mut state.out,
"connected",
view,
&serde_json::json!({"url": url}),
0,
0,
)?;
}

let (mut ws_tx, mut ws_rx) = ws.split();
Expand All @@ -105,7 +116,8 @@ pub async fn stream(url: String, view: &str, args: &StreamArgs) -> Result<()> {

// Ping interval
let ping_period = std::time::Duration::from_secs(30);
let mut ping_interval = tokio::time::interval_at(tokio::time::Instant::now() + ping_period, ping_period);
let mut ping_interval =
tokio::time::interval_at(tokio::time::Instant::now() + ping_period, ping_period);

// Duration timer for --save --duration (as a select! arm for precise timing)
let duration_future = async {
Expand Down Expand Up @@ -219,16 +231,22 @@ pub async fn stream(url: String, view: &str, args: &StreamArgs) -> Result<()> {
if let OutputMode::NoDna = state.output_mode {
// Ensure snapshot_complete is emitted before disconnected if it wasn't already
if !snapshot_complete && received_snapshot {
output::emit_no_dna_event(&mut state.out,
"snapshot_complete", view,
output::emit_no_dna_event(
&mut state.out,
"snapshot_complete",
view,
&serde_json::json!({"entity_count": state.entity_count}),
state.update_count, state.entity_count,
state.update_count,
state.entity_count,
)?;
}
output::emit_no_dna_event(&mut state.out,
"disconnected", view,
output::emit_no_dna_event(
&mut state.out,
"disconnected",
view,
&serde_json::json!(null),
state.update_count, state.entity_count,
state.update_count,
state.entity_count,
)?;
}

Expand All @@ -244,10 +262,13 @@ pub async fn replay(player: SnapshotPlayer, view: &str, args: &StreamArgs) -> Re

// Emit NoDna connected event with replay source indicator
if let OutputMode::NoDna = state.output_mode {
output::emit_no_dna_event(&mut state.out,
"connected", view,
output::emit_no_dna_event(
&mut state.out,
"connected",
view,
&serde_json::json!({"url": player.header.url, "source": "replay"}),
0, 0,
0,
0,
)?;
}

Expand All @@ -256,8 +277,16 @@ pub async fn replay(player: SnapshotPlayer, view: &str, args: &StreamArgs) -> Re

for snapshot_frame in &player.frames {
let was_snapshot = snapshot_frame.frame.is_snapshot();
if was_snapshot { received_snapshot = true; }
maybe_emit_snapshot_complete(&mut state, view, &mut snapshot_complete, received_snapshot, was_snapshot)?;
if was_snapshot {
received_snapshot = true;
}
maybe_emit_snapshot_complete(
&mut state,
view,
&mut snapshot_complete,
received_snapshot,
was_snapshot,
)?;
if process_frame(snapshot_frame.frame.clone(), view, &mut state)? {
break;
}
Expand All @@ -269,16 +298,22 @@ pub async fn replay(player: SnapshotPlayer, view: &str, args: &StreamArgs) -> Re

if let OutputMode::NoDna = state.output_mode {
if !snapshot_complete && received_snapshot {
output::emit_no_dna_event(&mut state.out,
"snapshot_complete", view,
output::emit_no_dna_event(
&mut state.out,
"snapshot_complete",
view,
&serde_json::json!({"entity_count": state.entity_count}),
state.update_count, state.entity_count,
state.update_count,
state.entity_count,
)?;
}
output::emit_no_dna_event(&mut state.out,
"disconnected", view,
output::emit_no_dna_event(
&mut state.out,
"disconnected",
view,
&serde_json::json!(null),
state.update_count, state.entity_count,
state.update_count,
state.entity_count,
)?;
}

Expand Down Expand Up @@ -354,22 +389,21 @@ fn maybe_emit_snapshot_complete(
if !was_snapshot && received_snapshot && !*snapshot_complete {
*snapshot_complete = true;
if let OutputMode::NoDna = state.output_mode {
output::emit_no_dna_event(&mut state.out,
"snapshot_complete", view,
output::emit_no_dna_event(
&mut state.out,
"snapshot_complete",
view,
&serde_json::json!({"entity_count": state.entity_count}),
state.update_count, state.entity_count,
state.update_count,
state.entity_count,
)?;
}
}
Ok(())
}

/// Process a frame. Returns true if the stream should end (--first matched).
fn process_frame(
frame: Frame,
view: &str,
state: &mut StreamState,
) -> Result<bool> {
fn process_frame(frame: Frame, view: &str, state: &mut StreamState) -> Result<bool> {
// Record frame if --save is active
if let Some(recorder) = &mut state.recorder {
recorder.record(&frame);
Expand Down Expand Up @@ -420,7 +454,9 @@ fn process_frame(
// entity_count is a running tally — NoDna entity_update events during
// snapshot delivery report the count at that point, not the final total.
// The final count is available in the snapshot_complete event.
state.entities.insert(entity.key.clone(), entity.data.clone());
state
.entities
.insert(entity.key.clone(), entity.data.clone());
state.entity_count = state.entities.len() as u64;
if let Some(store) = &mut state.store {
store.upsert(&entity.key, entity.data.clone(), "snapshot", None);
Expand All @@ -446,7 +482,8 @@ fn process_frame(
if let Some(store) = &mut state.store {
store.patch(&frame.key, &frame.data, &frame.append, frame.seq.clone());
}
let entry = state.entities
let entry = state
.entities
.entry(frame.key.clone())
.or_insert_with(|| serde_json::json!({}));
deep_merge_with_append(entry, &frame.data, &frame.append, "");
Expand All @@ -459,7 +496,10 @@ fn process_frame(
Operation::Delete => {
// Note: if the entity was never seen (e.g. --no-snapshot), last_state is null
// and field-based --where filters will not match, silently dropping the delete.
let last_state = state.entities.remove(&frame.key).unwrap_or(serde_json::json!(null));
let last_state = state
.entities
.remove(&frame.key)
.unwrap_or(serde_json::json!(null));
if let Some(store) = &mut state.store {
store.delete(&frame.key);
}
Expand All @@ -477,10 +517,13 @@ fn process_frame(
output::print_count(state.update_count)?;
} else {
match state.output_mode {
OutputMode::NoDna => output::emit_no_dna_event(&mut state.out,
"entity_update", view,
OutputMode::NoDna => output::emit_no_dna_event(
&mut state.out,
"entity_update",
view,
&serde_json::json!({"key": frame.key, "op": "delete", "data": null}),
state.update_count, state.entity_count,
state.update_count,
state.entity_count,
)?,
_ => output::print_delete(&mut state.out, view, &frame.key)?,
}
Expand Down Expand Up @@ -518,10 +561,13 @@ fn emit_entity(
output::print_count(state.update_count)?;
} else {
match state.output_mode {
OutputMode::NoDna => output::emit_no_dna_event(&mut state.out,
"entity_update", view,
OutputMode::NoDna => output::emit_no_dna_event(
&mut state.out,
"entity_update",
view,
&serde_json::json!({"key": key, "op": op, "data": output_data}),
state.update_count, state.entity_count,
state.update_count,
state.entity_count,
)?,
_ => output::print_entity_update(&mut state.out, view, key, op, &output_data)?,
}
Expand Down
25 changes: 14 additions & 11 deletions cli/src/commands/stream/filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,9 +98,7 @@ fn value_eq(value: &Value, expected: &str) -> bool {
false
}
}
Value::Bool(b) => {
(expected == "true" && *b) || (expected == "false" && !b)
}
Value::Bool(b) => (expected == "true" && *b) || (expected == "false" && !b),
Value::Null => expected == "null",
_ => {
let s = serde_json::to_string(value).unwrap_or_default();
Expand Down Expand Up @@ -201,22 +199,30 @@ fn make_not_eq(value: &str) -> Result<FilterOp> {
}

fn make_gt(value: &str) -> Result<FilterOp> {
let n: f64 = value.parse().map_err(|_| anyhow::anyhow!("Expected number after '>', got '{}'", value))?;
let n: f64 = value
.parse()
.map_err(|_| anyhow::anyhow!("Expected number after '>', got '{}'", value))?;
Ok(FilterOp::Gt(n))
}

fn make_gte(value: &str) -> Result<FilterOp> {
let n: f64 = value.parse().map_err(|_| anyhow::anyhow!("Expected number after '>=', got '{}'", value))?;
let n: f64 = value
.parse()
.map_err(|_| anyhow::anyhow!("Expected number after '>=', got '{}'", value))?;
Ok(FilterOp::Gte(n))
}

fn make_lt(value: &str) -> Result<FilterOp> {
let n: f64 = value.parse().map_err(|_| anyhow::anyhow!("Expected number after '<', got '{}'", value))?;
let n: f64 = value
.parse()
.map_err(|_| anyhow::anyhow!("Expected number after '<', got '{}'", value))?;
Ok(FilterOp::Lt(n))
}

fn make_lte(value: &str) -> Result<FilterOp> {
let n: f64 = value.parse().map_err(|_| anyhow::anyhow!("Expected number after '<=', got '{}'", value))?;
let n: f64 = value
.parse()
.map_err(|_| anyhow::anyhow!("Expected number after '<=', got '{}'", value))?;
Ok(FilterOp::Lte(n))
}

Expand Down Expand Up @@ -314,10 +320,7 @@ mod tests {

#[test]
fn test_multiple_filters_and() {
let f = Filter::parse(&[
"age>18".to_string(),
"name=alice".to_string(),
]).unwrap();
let f = Filter::parse(&["age>18".to_string(), "name=alice".to_string()]).unwrap();
assert!(f.matches(&json!({"age": 25, "name": "alice"})));
assert!(!f.matches(&json!({"age": 25, "name": "bob"})));
assert!(!f.matches(&json!({"age": 15, "name": "alice"})));
Expand Down
19 changes: 8 additions & 11 deletions cli/src/commands/stream/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,12 @@ pub struct StreamArgs {
pub duration: Option<u64>,

/// Replay a previously saved snapshot file instead of connecting live
#[arg(long, conflicts_with = "url", conflicts_with = "tui", conflicts_with = "duration")]
#[arg(
long,
conflicts_with = "url",
conflicts_with = "tui",
conflicts_with = "duration"
)]
pub load: Option<String>,

/// Show update history for the specified --key entity
Expand Down Expand Up @@ -197,10 +202,7 @@ pub fn build_subscription(view: &str, args: &StreamArgs) -> Subscription {

fn validate_ws_url(url: &str) -> Result<()> {
if !url.starts_with("ws://") && !url.starts_with("wss://") {
bail!(
"Invalid URL scheme. Expected ws:// or wss://, got: {}",
url
);
bail!("Invalid URL scheme. Expected ws:// or wss://, got: {}", url);
}
Ok(())
}
Expand Down Expand Up @@ -274,12 +276,7 @@ fn list_stacks(config: Option<&HyperstackConfig>) -> String {
Some(config) if !config.stacks.is_empty() => config
.stacks
.iter()
.map(|s| {
s.name
.as_deref()
.unwrap_or(&s.stack)
.to_string()
})
.map(|s| s.name.as_deref().unwrap_or(&s.stack).to_string())
.collect::<Vec<_>>()
.join(", "),
_ => "(none — create hyperstack.toml with [[stacks]] entries)".to_string(),
Expand Down
Loading
Loading