diff --git a/Cargo.lock b/Cargo.lock index eec95384..0589886c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -581,6 +581,21 @@ version = "1.11.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b35204fbdc0b3f4446b89fc1ac2cf84a8a68971995d0bf2e925ec7cd960f9cb3" +[[package]] +name = "cassowary" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "df8670b8c7b9dae1793364eafadf7239c40d669904660c5960d74cfd80b46a53" + +[[package]] +name = "castaway" +version = "0.2.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dec551ab6e7578819132c713a93c022a05d60159dc86e7a7050223577484c55a" +dependencies = [ + "rustversion", +] + [[package]] name = "cc" version = "1.2.51" @@ -694,6 +709,20 @@ dependencies = [ "windows-sys 0.59.0", ] +[[package]] +name = "compact_str" +version = "0.8.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3b79c4069c6cad78e2e0cdfcbd26275770669fb39fd308a752dc110e83b9af32" +dependencies = [ + "castaway", + "cfg-if", + "itoa", + "rustversion", + "ryu", + "static_assertions", +] + [[package]] name = "console" version = "0.15.11" @@ -703,7 +732,7 @@ dependencies = [ "encode_unicode", "libc", "once_cell", - "unicode-width", + "unicode-width 0.2.0", "windows-sys 0.59.0", ] @@ -833,6 +862,31 @@ version = "0.8.21" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d0a5c400df2834b80a4c3327b3aad3a4c4cd4de0629063962b03235697506a28" +[[package]] +name = "crossterm" +version = "0.28.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "829d955a0bb380ef178a640b91779e3987da38c9aea133b20614cfed8cdea9c6" +dependencies = [ + "bitflags 2.10.0", + "crossterm_winapi", + "mio", + "parking_lot 0.12.5", + "rustix 0.38.44", + "signal-hook", + "signal-hook-mio", + "winapi", +] + +[[package]] +name = "crossterm_winapi" +version = "0.9.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "acdd7c62a3665c7f6830a51635d9ac9b23ed385797f70a83bb8bafe9c572ab2b" +dependencies = [ + "winapi", +] + [[package]] name = "crunchy" version = "0.2.4" @@ -901,6 +955,40 @@ dependencies = [ "syn 2.0.113", ] +[[package]] +name = "darling" +version = "0.23.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "25ae13da2f202d56bd7f91c25fba009e7717a1e4a1cc98a76d844b65ae912e9d" +dependencies = [ + "darling_core", + "darling_macro", +] + +[[package]] +name = "darling_core" +version = "0.23.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9865a50f7c335f53564bb694ef660825eb8610e0a53d3e11bf1b0d3df31e03b0" +dependencies = [ + "ident_case", + "proc-macro2", + "quote", + "strsim", + "syn 2.0.113", +] + +[[package]] +name = "darling_macro" +version = "0.23.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ac3984ec7bd6cfa798e62b4a642426a5be0e68f9401cfc2a01e3fa9ea2fcdb8d" +dependencies = [ + "darling_core", + "quote", + "syn 2.0.113", +] + [[package]] name = "dashmap" version = "6.1.0" @@ -1660,18 +1748,24 @@ dependencies = [ "clap_complete", "colored", "console", + "crossterm", "dialoguer", "dirs", "flate2", + "futures-util", "hyperstack-idl", "hyperstack-interpreter", + "hyperstack-sdk", "indicatif", + "ratatui", "regex", "reqwest 0.11.27", "rpassword", "serde", "serde_json", "tar", + "tokio", + "tokio-tungstenite 0.21.0", "toml 0.8.23", "uuid", ] @@ -1904,6 +1998,12 @@ dependencies = [ "zerovec", ] +[[package]] +name = "ident_case" +version = "1.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b9e0384b61958566e926dc50660321d12159025e767c18e043daf26b70104c39" + [[package]] name = "idna" version = "1.1.0" @@ -1954,10 +2054,19 @@ dependencies = [ "console", "number_prefix", "portable-atomic", - "unicode-width", + "unicode-width 0.2.0", "web-time", ] +[[package]] +name = "indoc" +version = "2.0.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "79cf5c93f93228cf8efb3ba362535fb11199ac548a09ce117c9b1adc3030d706" +dependencies = [ + "rustversion", +] + [[package]] name = "inout" version = "0.1.4" @@ -1967,6 +2076,19 @@ dependencies = [ "generic-array", ] +[[package]] +name = "instability" +version = "0.3.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5eb2d60ef19920a3a9193c3e371f726ec1dafc045dac788d0fb3704272458971" +dependencies = [ + "darling", + "indoc", + "proc-macro2", + "quote", + "syn 2.0.113", +] + [[package]] name = "instant" version = "0.1.13" @@ -2007,6 +2129,15 @@ dependencies = [ "either", ] +[[package]] +name = "itertools" +version = "0.13.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "413ee7dfc52ee1a4949ceeb7dbc8a33f2d6c088194d9f922fb8318faf1f01186" +dependencies = [ + "either", +] + [[package]] name = "itertools" version = "0.14.0" @@ -2130,6 +2261,12 @@ dependencies = [ "libsecp256k1-core", ] +[[package]] +name = "linux-raw-sys" +version = "0.4.15" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d26c52dbd32dccf2d10cac7725f8eae5296885fb5703b261f7d0a0739ec807ab" + [[package]] name = "linux-raw-sys" version = "0.11.0" @@ -2243,6 +2380,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a69bcab0ad47271a0234d9422b131806bf3968021e5dc9328caf2d4cd58557fc" dependencies = [ "libc", + "log", "wasi 0.11.1+wasi-snapshot-preview1", "windows-sys 0.61.2", ] @@ -2561,6 +2699,12 @@ dependencies = [ "windows-link", ] +[[package]] +name = "paste" +version = "1.0.15" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "57c0d7b74b563b49d38dae00a0c37d4d6de9b432382b2892f0574ddcae73fd0a" + [[package]] name = "pbkdf2" version = "0.11.0" @@ -3044,6 +3188,27 @@ dependencies = [ "rand_core 0.5.1", ] +[[package]] +name = "ratatui" +version = "0.29.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "eabd94c2f37801c20583fc49dd5cd6b0ba68c716787c2dd6ed18571e1e63117b" +dependencies = [ + "bitflags 2.10.0", + "cassowary", + "compact_str", + "crossterm", + "indoc", + "instability", + "itertools 0.13.0", + "lru", + "paste", + "strum", + "unicode-segmentation", + "unicode-truncate", + "unicode-width 0.2.0", +] + [[package]] name = "redox_syscall" version = "0.2.16" @@ -3240,6 +3405,19 @@ dependencies = [ "semver", ] +[[package]] +name = "rustix" +version = "0.38.44" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fdb5bc1ae2baa591800df16c9ca78619bf65c0488b41b96ccec5d11220d8c154" +dependencies = [ + "bitflags 2.10.0", + "errno", + "libc", + "linux-raw-sys 0.4.15", + "windows-sys 0.59.0", +] + [[package]] name = "rustix" version = "1.1.3" @@ -3249,7 +3427,7 @@ dependencies = [ "bitflags 2.10.0", "errno", "libc", - "linux-raw-sys", + "linux-raw-sys 0.11.0", "windows-sys 0.61.2", ] @@ -3594,6 +3772,27 @@ version = "1.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0fda2ff0d084019ba4d7c6f371c95d8fd75ce3524c3cb8fb653a3023f6323e64" +[[package]] +name = "signal-hook" +version = "0.3.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d881a16cf4426aa584979d30bd82cb33429027e42122b169753d6ef1085ed6e2" +dependencies = [ + "libc", + "signal-hook-registry", +] + +[[package]] +name = "signal-hook-mio" +version = "0.2.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b75a19a7a740b25bc7944bdee6172368f988763b744e3d4dfe753f6b4ece40cc" +dependencies = [ + "libc", + "mio", + "signal-hook", +] + [[package]] name = "signal-hook-registry" version = "1.4.8" @@ -5158,12 +5357,40 @@ version = "1.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6ce2be8dc25455e1f91df71bfa12ad37d7af1092ae736f3a6cd0e37bc7810596" +[[package]] +name = "static_assertions" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a2eb9349b6444b326872e140eb1cf5e7c522154d69e7a0ffb0fb81c06b37543f" + [[package]] name = "strsim" version = "0.11.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7da8b5736845d9f2fcb837ea5d9e2628564b3b043a70948a3f0b778838c5fb4f" +[[package]] +name = "strum" +version = "0.26.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8fec0f0aef304996cf250b31b5a10dee7980c85da9d759361292b8bca5a18f06" +dependencies = [ + "strum_macros", +] + +[[package]] +name = "strum_macros" +version = "0.26.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4c6bee85a5a24955dc440386795aa378cd9cf82acd5f764469152d2270e581be" +dependencies = [ + "heck", + "proc-macro2", + "quote", + "rustversion", + "syn 2.0.113", +] + [[package]] name = "subtle" version = "2.6.1" @@ -5265,7 +5492,7 @@ dependencies = [ "fastrand", "getrandom 0.3.4", "once_cell", - "rustix", + "rustix 1.1.3", "windows-sys 0.61.2", ] @@ -5284,7 +5511,7 @@ version = "0.4.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "60b8cb979cb11c32ce1603f8137b22262a9d131aaa5c37b5678025f22b8becd0" dependencies = [ - "rustix", + "rustix 1.1.3", "windows-sys 0.60.2", ] @@ -6024,11 +6251,34 @@ version = "1.0.22" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9312f7c4f6ff9069b165498234ce8be658059c6728633667c526e27dc2cf1df5" +[[package]] +name = "unicode-segmentation" +version = "1.12.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f6ccf251212114b54433ec949fd6a7841275f9ada20dddd2f29e9ceea4501493" + +[[package]] +name = "unicode-truncate" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b3644627a5af5fa321c95b9b235a72fd24cd29c648c2c379431e6628655627bf" +dependencies = [ + "itertools 0.13.0", + "unicode-segmentation", + "unicode-width 0.1.14", +] + [[package]] name = "unicode-width" -version = "0.2.2" +version = "0.1.14" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7dd6e30e90baa6f72411720665d41d89b9a3d039dc45b8faea1ddd07f617f6af" + +[[package]] +name = "unicode-width" +version = "0.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b4ac048d71ede7ee76d585517add45da530660ef4390e49b098733c6e897f254" +checksum = "1fc81956842c57dac11422a97c3b8195a1ff727f06e85c84ed2e8aa277c9a0fd" [[package]] name = "universal-hash" @@ -6613,7 +6863,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "32e45ad4206f6d2479085147f02bc2ef834ac85886624a23575ae137c8aa8156" dependencies = [ "libc", - "rustix", + "rustix 1.1.3", ] [[package]] diff --git a/cli/Cargo.toml b/cli/Cargo.toml index b30b810b..a2f83034 100644 --- a/cli/Cargo.toml +++ b/cli/Cargo.toml @@ -19,6 +19,8 @@ path = "src/main.rs" default = [] # Enable local development mode (uses http://localhost:3000 instead of production API) local = [] +# Enable interactive TUI for `hs stream --tui` +tui = ["ratatui", "crossterm"] [dependencies] clap = { version = "4.5", features = ["derive", "cargo"] } @@ -33,6 +35,7 @@ indicatif = "0.17" console = "0.15" hyperstack-interpreter = { version = "0.5.10", path = "../interpreter" } hyperstack-idl = { path = "../hyperstack-idl", version = "0.1.5" } +hyperstack-sdk = { path = "../rust/hyperstack-sdk", version = "0.5.10" } reqwest = { version = "0.11", default-features = false, features = ["json", "blocking", "rustls-tls"] } dirs = "5.0" rpassword = "7.3" @@ -41,4 +44,9 @@ flate2 = "1.0" tar = "0.4" uuid = { version = "1.0", features = ["v4"] } regex = "1.10" +tokio = { version = "1.0", features = ["rt-multi-thread", "sync", "time", "macros", "signal"] } +futures-util = { version = "0.3", features = ["sink"] } +tokio-tungstenite = { version = "0.21", default-features = false, features = ["connect", "rustls-tls-webpki-roots"] } +ratatui = { version = "0.29", optional = true } +crossterm = { version = "0.28", optional = true } diff --git a/cli/src/commands/mod.rs b/cli/src/commands/mod.rs index b855471f..5ade0524 100644 --- a/cli/src/commands/mod.rs +++ b/cli/src/commands/mod.rs @@ -7,5 +7,6 @@ pub mod idl; pub mod sdk; pub mod stack; pub mod status; +pub mod stream; pub mod telemetry; pub mod up; diff --git a/cli/src/commands/stream/client.rs b/cli/src/commands/stream/client.rs new file mode 100644 index 00000000..d20b3528 --- /dev/null +++ b/cli/src/commands/stream/client.rs @@ -0,0 +1,535 @@ +use anyhow::{Context, Result}; +use futures_util::{SinkExt, StreamExt}; +use hyperstack_sdk::{ + deep_merge_with_append, parse_frame, parse_snapshot_entities, try_parse_subscribed_frame, + ClientMessage, Frame, Operation, +}; +use std::collections::{HashMap, HashSet}; +use tokio_tungstenite::{connect_async, tungstenite::Message}; + +use super::filter::{self, Filter}; +use super::output::{self, OutputMode}; +use super::snapshot::{SnapshotPlayer, SnapshotRecorder}; +use super::store::EntityStore; +use super::StreamArgs; + +struct StreamState { + entities: HashMap, + store: Option, + filter: Filter, + select_fields: Option>>, + allowed_ops: Option>, + output_mode: OutputMode, + first: bool, + count_only: bool, + update_count: u64, + entity_count: u64, + recorder: Option, + out: output::StdoutWriter, +} + +fn build_state(args: &StreamArgs, view: &str, url: &str) -> Result { + let filter = Filter::parse(&args.filters)?; + let select_fields = args.select.as_deref().map(filter::parse_select); + let allowed_ops = args.ops.as_deref().map(|ops| { + ops.split(',') + .map(|s| { + let s = s.trim().to_lowercase(); + // Normalize "create" → "upsert" to match op normalization at comparison time + if s == "create" { "upsert".to_string() } else { s } + }) + .collect::>() + }); + + let output_mode = if args.raw { + OutputMode::Raw + } else if args.no_dna { + OutputMode::NoDna + } else { + OutputMode::Merged + }; + + let recorder = args.save.as_ref().map(|_| SnapshotRecorder::new(view, url)); + + let use_store = args.history || args.at.is_some() || args.diff; + if use_store && args.key.is_none() { + eprintln!("Warning: --history/--at/--diff require --key; history will not be output."); + } + let store = if use_store { + Some(EntityStore::new()) + } else { + None + }; + + Ok(StreamState { + entities: HashMap::new(), + store, + filter, + select_fields, + allowed_ops, + output_mode, + first: args.first, + count_only: args.count, + update_count: 0, + entity_count: 0, + recorder, + out: output::StdoutWriter::new(), + }) +} + +pub async fn stream(url: String, view: &str, args: &StreamArgs) -> Result<()> { + // Validate args and build state before connecting (fails fast on bad --where regex etc.) + let mut state = build_state(args, view, &url)?; + + let (ws, _) = connect_async(&url) + .await + .with_context(|| format!("Failed to connect to {}", url))?; + + eprintln!("Connected."); + + // Emit NoDna connected event only after successful WebSocket handshake + if let OutputMode::NoDna = state.output_mode { + output::emit_no_dna_event(&mut state.out, "connected", view, &serde_json::json!({"url": url}), 0, 0)?; + } + + let (mut ws_tx, mut ws_rx) = ws.split(); + + // Build and send subscription + let sub = super::build_subscription(view, args); + let msg = serde_json::to_string(&ClientMessage::Subscribe(sub)) + .context("Failed to serialize subscribe message")?; + ws_tx + .send(Message::Text(msg)) + .await + .context("Failed to send subscribe message")?; + + // Ping interval + let ping_period = std::time::Duration::from_secs(30); + let mut ping_interval = tokio::time::interval_at(tokio::time::Instant::now() + ping_period, ping_period); + + // Duration timer for --save --duration (as a select! arm for precise timing) + let duration_future = async { + if let Some(secs) = args.duration { + tokio::time::sleep(std::time::Duration::from_secs(secs)).await; + } else { + std::future::pending::<()>().await; + } + }; + tokio::pin!(duration_future); + + // Handle Ctrl+C + let shutdown = tokio::signal::ctrl_c(); + tokio::pin!(shutdown); + + let mut snapshot_complete = false; + // When --no-snapshot, treat as if snapshot was already received so + // snapshot_complete fires on the first live frame + let mut received_snapshot = args.no_snapshot; + + loop { + tokio::select! { + msg = ws_rx.next() => { + match msg { + Some(Ok(Message::Binary(bytes))) => { + match parse_frame(&bytes) { + Ok(frame) => { + if frame.operation() == Operation::Subscribed { + eprintln!("Subscribed to {}", view); + continue; + } + let was_snapshot = frame.is_snapshot(); + if was_snapshot { received_snapshot = true; } + maybe_emit_snapshot_complete(&mut state, view, &mut snapshot_complete, received_snapshot, was_snapshot)?; + if process_frame(frame, view, &mut state)? { + break; + } + } + Err(e) => { + if try_parse_subscribed_frame(&bytes).is_some() { + eprintln!("Subscribed to {}", view); + } else { + eprintln!("Warning: failed to parse binary frame: {}", e); + } + } + } + } + Some(Ok(Message::Text(text))) => { + // Single-pass parse: try Frame directly, check for subscribed via operation() + match serde_json::from_str::(&text) { + Ok(frame) if frame.operation() == Operation::Subscribed => { + eprintln!("Subscribed to {}", view); + } + Ok(frame) => { + let was_snapshot = frame.is_snapshot(); + if was_snapshot { received_snapshot = true; } + maybe_emit_snapshot_complete(&mut state, view, &mut snapshot_complete, received_snapshot, was_snapshot)?; + if process_frame(frame, view, &mut state)? { + break; + } + } + Err(e) => eprintln!("Warning: failed to parse text frame: {}", e), + } + } + Some(Ok(Message::Ping(payload))) => { + let _ = ws_tx.send(Message::Pong(payload)).await; + } + Some(Ok(Message::Close(_))) => { + eprintln!("Connection closed by server."); + break; + } + Some(Err(e)) => { + eprintln!("WebSocket error: {}", e); + break; + } + None => { + eprintln!("Connection closed."); + break; + } + _ => {} + } + } + _ = ping_interval.tick() => { + if let Ok(msg) = serde_json::to_string(&ClientMessage::Ping) { + let _ = ws_tx.send(Message::Text(msg)).await; + } + } + _ = &mut duration_future => { + eprintln!("Duration reached, stopping..."); + let _ = ws_tx.close().await; + break; + } + _ = &mut shutdown => { + eprintln!("\nDisconnecting..."); + let _ = ws_tx.close().await; + break; + } + } + } + + // Save snapshot if --save was specified + if let (Some(save_path), Some(recorder)) = (&args.save, &state.recorder) { + recorder.save(save_path)?; + } + + // Clear the overwriting count line before post-stream output + if state.count_only { + output::finalize_count(); + } + + if let OutputMode::NoDna = state.output_mode { + // Ensure snapshot_complete is emitted before disconnected if it wasn't already + if !snapshot_complete && received_snapshot { + output::emit_no_dna_event(&mut state.out, + "snapshot_complete", view, + &serde_json::json!({"entity_count": state.entity_count}), + state.update_count, state.entity_count, + )?; + } + output::emit_no_dna_event(&mut state.out, + "disconnected", view, + &serde_json::json!(null), + state.update_count, state.entity_count, + )?; + } + + // Output history/at/diff after stream ends (for non-interactive agent use) + output_history_if_requested(&state, args)?; + + Ok(()) +} + +/// Replay frames from a saved snapshot file through the same processing pipeline. +pub async fn replay(player: SnapshotPlayer, view: &str, args: &StreamArgs) -> Result<()> { + let mut state = build_state(args, view, &player.header.url)?; + + // Emit NoDna connected event with replay source indicator + if let OutputMode::NoDna = state.output_mode { + output::emit_no_dna_event(&mut state.out, + "connected", view, + &serde_json::json!({"url": player.header.url, "source": "replay"}), + 0, 0, + )?; + } + + let mut snapshot_complete = false; + let mut received_snapshot = args.no_snapshot; + + for snapshot_frame in &player.frames { + let was_snapshot = snapshot_frame.frame.is_snapshot(); + if was_snapshot { received_snapshot = true; } + maybe_emit_snapshot_complete(&mut state, view, &mut snapshot_complete, received_snapshot, was_snapshot)?; + if process_frame(snapshot_frame.frame.clone(), view, &mut state)? { + break; + } + } + + if state.count_only { + output::finalize_count(); + } + + if let OutputMode::NoDna = state.output_mode { + if !snapshot_complete && received_snapshot { + output::emit_no_dna_event(&mut state.out, + "snapshot_complete", view, + &serde_json::json!({"entity_count": state.entity_count}), + state.update_count, state.entity_count, + )?; + } + output::emit_no_dna_event(&mut state.out, + "disconnected", view, + &serde_json::json!(null), + state.update_count, state.entity_count, + )?; + } + + output_history_if_requested(&state, args)?; + + eprintln!("Replay complete: {} updates processed.", state.update_count); + Ok(()) +} + +/// After the stream ends, output --history / --at / --diff results for the specified --key. +fn output_history_if_requested(state: &StreamState, args: &StreamArgs) -> Result<()> { + let store = match &state.store { + Some(s) => s, + None => return Ok(()), + }; + + let key = match &args.key { + Some(k) => k.as_str(), + None => { + if args.history || args.at.is_some() || args.diff { + eprintln!("Warning: --history/--at/--diff require --key to specify which entity"); + } + return Ok(()); + } + }; + + if args.diff && args.history { + eprintln!("Warning: --history is ignored when --diff is specified. Remove --diff to see full history."); + } + + if args.diff { + let index = args.at.unwrap_or(0); + if let Some(diff) = store.diff_at(key, index) { + let line = serde_json::to_string_pretty(&diff)?; + println!("{}", line); + } else { + eprintln!("No history entry at index {} for key '{}'", index, key); + } + } else if let Some(index) = args.at { + if let Some(entry) = store.at(key, index) { + let output = serde_json::json!({ + "key": key, + "index": index, + "op": entry.op, + "seq": entry.seq, + "state": entry.state, + }); + let line = serde_json::to_string_pretty(&output)?; + println!("{}", line); + } else { + eprintln!("No history entry at index {} for key '{}'", index, key); + } + } else if args.history { + if let Some(history) = store.history(key) { + let line = serde_json::to_string_pretty(&history)?; + println!("{}", line); + } else { + eprintln!("No history found for key '{}'", key); + } + } + + Ok(()) +} + +/// Emit snapshot_complete NoDna event if transitioning from snapshot to live frames. +fn maybe_emit_snapshot_complete( + state: &mut StreamState, + view: &str, + snapshot_complete: &mut bool, + received_snapshot: bool, + was_snapshot: bool, +) -> Result<()> { + if !was_snapshot && received_snapshot && !*snapshot_complete { + *snapshot_complete = true; + if let OutputMode::NoDna = state.output_mode { + output::emit_no_dna_event(&mut state.out, + "snapshot_complete", view, + &serde_json::json!({"entity_count": state.entity_count}), + state.update_count, state.entity_count, + )?; + } + } + Ok(()) +} + +/// Process a frame. Returns true if the stream should end (--first matched). +fn process_frame( + frame: Frame, + view: &str, + state: &mut StreamState, +) -> Result { + // Record frame if --save is active + if let Some(recorder) = &mut state.recorder { + recorder.record(&frame); + } + + let op = frame.operation(); + let op_str = &frame.op; + + // Check if this op type is allowed by --ops (but always process snapshots + // for entity state — just suppress their output) + let ops_allowed = match &state.allowed_ops { + Some(allowed) => { + // Normalize create → upsert since they're semantically identical + let effective_op = match op { + Operation::Snapshot => "snapshot".to_string(), + Operation::Create => "upsert".to_string(), + _ => op_str.to_lowercase(), + }; + allowed.contains(effective_op.as_str()) + } + None => true, + }; + + if let OutputMode::Raw = state.output_mode { + if !ops_allowed { + return Ok(false); + } + // Note: in raw mode, --where filters against the raw frame.data which is + // an array for snapshot frames. Field-level filters (e.g. --where "info.name=X") + // will not match snapshot batch arrays — use merged mode for field filtering. + if !state.filter.is_empty() && !state.filter.matches(&frame.data) { + return Ok(false); + } + state.update_count += 1; + if state.count_only { + output::print_count(state.update_count)?; + } else { + output::print_raw_frame(&mut state.out, &frame)?; + } + return Ok(state.first); + } + + match op { + Operation::Snapshot => { + let snapshot_entities = parse_snapshot_entities(&frame.data); + for entity in snapshot_entities { + // Always populate entity state (needed for correct patch merging). + // entity_count is a running tally — NoDna entity_update events during + // snapshot delivery report the count at that point, not the final total. + // The final count is available in the snapshot_complete event. + state.entities.insert(entity.key.clone(), entity.data.clone()); + state.entity_count = state.entities.len() as u64; + if let Some(store) = &mut state.store { + store.upsert(&entity.key, entity.data.clone(), "snapshot", None); + } + // --first: exits on the first matching entity (even within a snapshot batch). + // update_count will be 1 in the emitted event, which is correct. + if ops_allowed && emit_entity(state, view, &entity.key, "snapshot", &entity.data)? { + return Ok(true); + } + } + } + Operation::Upsert | Operation::Create => { + state.entities.insert(frame.key.clone(), frame.data.clone()); + if let Some(store) = &mut state.store { + store.upsert(&frame.key, frame.data.clone(), op_str, frame.seq.clone()); + } + state.entity_count = state.entities.len() as u64; + if ops_allowed && emit_entity(state, view, &frame.key, op_str, &frame.data)? { + return Ok(true); + } + } + Operation::Patch => { + if let Some(store) = &mut state.store { + store.patch(&frame.key, &frame.data, &frame.append, frame.seq.clone()); + } + let entry = state.entities + .entry(frame.key.clone()) + .or_insert_with(|| serde_json::json!({})); + deep_merge_with_append(entry, &frame.data, &frame.append, ""); + let merged = entry.clone(); + state.entity_count = state.entities.len() as u64; + if ops_allowed && emit_entity(state, view, &frame.key, "patch", &merged)? { + return Ok(true); + } + } + Operation::Delete => { + // Note: if the entity was never seen (e.g. --no-snapshot), last_state is null + // and field-based --where filters will not match, silently dropping the delete. + let last_state = state.entities.remove(&frame.key).unwrap_or(serde_json::json!(null)); + if let Some(store) = &mut state.store { + store.delete(&frame.key); + } + state.entity_count = state.entities.len() as u64; + + if !ops_allowed { + return Ok(false); + } + if !state.filter.is_empty() && !state.filter.matches(&last_state) { + return Ok(false); + } + + state.update_count += 1; + if state.count_only { + output::print_count(state.update_count)?; + } else { + match state.output_mode { + OutputMode::NoDna => output::emit_no_dna_event(&mut state.out, + "entity_update", view, + &serde_json::json!({"key": frame.key, "op": "delete", "data": null}), + state.update_count, state.entity_count, + )?, + _ => output::print_delete(&mut state.out, view, &frame.key)?, + } + } + if state.first { + return Ok(true); + } + } + Operation::Subscribed => {} + } + + Ok(false) +} + +/// Emit an entity through filter + select + output. Returns true if --first should trigger. +fn emit_entity( + state: &mut StreamState, + view: &str, + key: &str, + op: &str, + data: &serde_json::Value, +) -> Result { + if !state.filter.is_empty() && !state.filter.matches(data) { + return Ok(false); + } + + state.update_count += 1; + + let output_data = match &state.select_fields { + Some(fields) => filter::select_fields(data, fields), + None => data.clone(), + }; + + if state.count_only { + output::print_count(state.update_count)?; + } else { + match state.output_mode { + OutputMode::NoDna => output::emit_no_dna_event(&mut state.out, + "entity_update", view, + &serde_json::json!({"key": key, "op": op, "data": output_data}), + state.update_count, state.entity_count, + )?, + _ => output::print_entity_update(&mut state.out, view, key, op, &output_data)?, + } + } + + if state.first { + return Ok(true); + } + + Ok(false) +} diff --git a/cli/src/commands/stream/filter.rs b/cli/src/commands/stream/filter.rs new file mode 100644 index 00000000..0c1c1d55 --- /dev/null +++ b/cli/src/commands/stream/filter.rs @@ -0,0 +1,387 @@ +use anyhow::{bail, Result}; +use regex::Regex; +use serde_json::Value; + +#[derive(Debug, Clone)] +pub struct Filter { + pub predicates: Vec, +} + +#[derive(Debug, Clone)] +pub struct Predicate { + pub path: Vec, + pub op: FilterOp, +} + +#[derive(Debug, Clone)] +pub enum FilterOp { + Eq(String), + NotEq(String), + Gt(f64), + Gte(f64), + Lt(f64), + Lte(f64), + Regex(Regex), + NotRegex(Regex), + Exists, + NotExists, +} + +impl Filter { + pub fn parse(exprs: &[String]) -> Result { + let predicates = exprs + .iter() + .map(|expr| parse_predicate(expr)) + .collect::>>()?; + Ok(Filter { predicates }) + } + + pub fn is_empty(&self) -> bool { + self.predicates.is_empty() + } + + pub fn matches(&self, value: &Value) -> bool { + self.predicates.iter().all(|p| p.matches(value)) + } +} + +impl Predicate { + fn matches(&self, value: &Value) -> bool { + let resolved = resolve_path(value, &self.path); + + match &self.op { + FilterOp::Exists => resolved.is_some() && !resolved.unwrap().is_null(), + FilterOp::NotExists => resolved.is_none() || resolved.unwrap().is_null(), + FilterOp::Eq(expected) => match resolved { + Some(v) => value_eq(v, expected), + None => false, + }, + FilterOp::NotEq(expected) => match resolved { + Some(v) => !value_eq(v, expected), + None => true, + }, + FilterOp::Gt(n) => resolved.and_then(as_f64).is_some_and(|v| v > *n), + FilterOp::Gte(n) => resolved.and_then(as_f64).is_some_and(|v| v >= *n), + FilterOp::Lt(n) => resolved.and_then(as_f64).is_some_and(|v| v < *n), + FilterOp::Lte(n) => resolved.and_then(as_f64).is_some_and(|v| v <= *n), + FilterOp::Regex(re) => resolved + .and_then(|v| v.as_str()) + .is_some_and(|s| re.is_match(s)), + FilterOp::NotRegex(re) => match resolved.and_then(|v| v.as_str()) { + Some(s) => !re.is_match(s), + None => true, // absent/non-string: "does not match regex" — consistent with NotEq + }, + } + } +} + +fn resolve_path<'a>(value: &'a Value, path: &[String]) -> Option<&'a Value> { + let mut current = value; + for segment in path { + current = current.get(segment)?; + } + Some(current) +} + +fn value_eq(value: &Value, expected: &str) -> bool { + match value { + Value::String(s) => s == expected, + Value::Number(n) => { + // Try exact string match first (avoids f64 rounding for e.g. "1.1") + if n.to_string() == expected { + return true; + } + // Fallback: exact f64 bitwise equality (string match above handles most cases) + if let (Some(lhs), Ok(rhs)) = (n.as_f64(), expected.parse::()) { + lhs == rhs + } else { + false + } + } + Value::Bool(b) => { + (expected == "true" && *b) || (expected == "false" && !b) + } + Value::Null => expected == "null", + _ => { + let s = serde_json::to_string(value).unwrap_or_default(); + s == expected + } + } +} + +fn as_f64(value: &Value) -> Option { + match value { + Value::Number(n) => n.as_f64(), + Value::String(s) => s.parse::().ok(), + _ => None, + } +} + +fn parse_predicate(expr: &str) -> Result { + let expr = expr.trim(); + + if expr.is_empty() { + bail!("Empty filter expression; expected field=value, field>N, field~regex, etc."); + } + + // Existence: field? or field!? + if let Some(field) = expr.strip_suffix("!?") { + return Ok(Predicate { + path: parse_path(field), + op: FilterOp::NotExists, + }); + } + if let Some(field) = expr.strip_suffix('?') { + return Ok(Predicate { + path: parse_path(field), + op: FilterOp::Exists, + }); + } + + // Two-char operators checked first so ">=" isn't misread as ">" with value "=...". + // Operator is matched at its first occurrence, so the value portion (after the operator) + // may contain operator characters (e.g. --where "name=a=b" → field="name", value="a=b"). + // This is intentional: the split is on the first operator found, rest is the value. + for (op_str, make_op) in &[ + ("!=", make_not_eq as fn(&str) -> Result), + (">=", make_gte as fn(&str) -> Result), + ("<=", make_lte as fn(&str) -> Result), + ("!~", make_not_regex as fn(&str) -> Result), + ] { + if let Some(idx) = expr.find(op_str) { + let field = &expr[..idx]; + if field.is_empty() { + bail!("Missing field name before '{}' in: '{}'", op_str, expr); + } + let value = &expr[idx + op_str.len()..]; + return Ok(Predicate { + path: parse_path(field), + op: make_op(value)?, + }); + } + } + + // Single-char operators: =, >, <, ~ + for (op_char, make_op) in &[ + ('>', make_gt as fn(&str) -> Result), + ('<', make_lt as fn(&str) -> Result), + ('~', make_regex as fn(&str) -> Result), + ('=', make_eq as fn(&str) -> Result), + ] { + if let Some(idx) = expr.find(*op_char) { + let field = &expr[..idx]; + if field.is_empty() { + bail!("Missing field name before '{}' in: '{}'", op_char, expr); + } + let value = &expr[idx + 1..]; + return Ok(Predicate { + path: parse_path(field), + op: make_op(value)?, + }); + } + } + + bail!( + "Invalid filter expression: '{}'\n\ + Expected: field=value, field>N, field~regex, field?, etc.", + expr + ) +} + +fn parse_path(field: &str) -> Vec { + field.split('.').map(|s| s.to_string()).collect() +} + +fn make_eq(value: &str) -> Result { + Ok(FilterOp::Eq(value.to_string())) +} + +fn make_not_eq(value: &str) -> Result { + Ok(FilterOp::NotEq(value.to_string())) +} + +fn make_gt(value: &str) -> Result { + let n: f64 = value.parse().map_err(|_| anyhow::anyhow!("Expected number after '>', got '{}'", value))?; + Ok(FilterOp::Gt(n)) +} + +fn make_gte(value: &str) -> Result { + let n: f64 = value.parse().map_err(|_| anyhow::anyhow!("Expected number after '>=', got '{}'", value))?; + Ok(FilterOp::Gte(n)) +} + +fn make_lt(value: &str) -> Result { + let n: f64 = value.parse().map_err(|_| anyhow::anyhow!("Expected number after '<', got '{}'", value))?; + Ok(FilterOp::Lt(n)) +} + +fn make_lte(value: &str) -> Result { + let n: f64 = value.parse().map_err(|_| anyhow::anyhow!("Expected number after '<=', got '{}'", value))?; + Ok(FilterOp::Lte(n)) +} + +fn make_regex(value: &str) -> Result { + let re = Regex::new(value).map_err(|e| anyhow::anyhow!("Invalid regex '{}': {}", value, e))?; + Ok(FilterOp::Regex(re)) +} + +fn make_not_regex(value: &str) -> Result { + let re = Regex::new(value).map_err(|e| anyhow::anyhow!("Invalid regex '{}': {}", value, e))?; + Ok(FilterOp::NotRegex(re)) +} + +/// Project specific fields from a JSON value. +/// Returns a new object with only the selected dot-paths. +/// Uses full dot-path as key to avoid collisions (e.g. "a.id" and "b.id" +/// produce {"a.id": ..., "b.id": ...} instead of silently overwriting). +pub fn select_fields(value: &Value, fields: &[Vec]) -> Value { + let mut result = serde_json::Map::new(); + for path in fields { + if let Some(v) = resolve_path(value, path) { + let key = if path.len() == 1 { + path[0].clone() + } else { + path.join(".") + }; + result.insert(key, v.clone()); + } + } + Value::Object(result) +} + +pub fn parse_select(select: &str) -> Vec> { + select + .split(',') + .map(|s| s.trim().split('.').map(|p| p.to_string()).collect()) + .collect() +} + +#[cfg(test)] +mod tests { + use super::*; + use serde_json::json; + + #[test] + fn test_eq_string() { + let f = Filter::parse(&["name=alice".to_string()]).unwrap(); + assert!(f.matches(&json!({"name": "alice"}))); + assert!(!f.matches(&json!({"name": "bob"}))); + } + + #[test] + fn test_eq_number() { + let f = Filter::parse(&["age=30".to_string()]).unwrap(); + assert!(f.matches(&json!({"age": 30}))); + assert!(!f.matches(&json!({"age": 31}))); + } + + #[test] + fn test_gt() { + let f = Filter::parse(&["score>100".to_string()]).unwrap(); + assert!(f.matches(&json!({"score": 150}))); + assert!(!f.matches(&json!({"score": 50}))); + assert!(!f.matches(&json!({"score": 100}))); + } + + #[test] + fn test_nested_path() { + let f = Filter::parse(&["user.name=alice".to_string()]).unwrap(); + assert!(f.matches(&json!({"user": {"name": "alice"}}))); + assert!(!f.matches(&json!({"user": {"name": "bob"}}))); + } + + #[test] + fn test_exists() { + let f = Filter::parse(&["email?".to_string()]).unwrap(); + assert!(f.matches(&json!({"email": "a@b.com"}))); + assert!(!f.matches(&json!({"name": "alice"}))); + assert!(!f.matches(&json!({"email": null}))); + } + + #[test] + fn test_not_exists() { + let f = Filter::parse(&["email!?".to_string()]).unwrap(); + assert!(!f.matches(&json!({"email": "a@b.com"}))); + assert!(f.matches(&json!({"name": "alice"}))); + } + + #[test] + fn test_regex() { + let f = Filter::parse(&["name~^ali".to_string()]).unwrap(); + assert!(f.matches(&json!({"name": "alice"}))); + assert!(!f.matches(&json!({"name": "bob"}))); + } + + #[test] + fn test_multiple_filters_and() { + let f = Filter::parse(&[ + "age>18".to_string(), + "name=alice".to_string(), + ]).unwrap(); + assert!(f.matches(&json!({"age": 25, "name": "alice"}))); + assert!(!f.matches(&json!({"age": 25, "name": "bob"}))); + assert!(!f.matches(&json!({"age": 15, "name": "alice"}))); + } + + #[test] + fn test_select_fields() { + let v = json!({"name": "alice", "age": 30, "nested": {"x": 1}}); + let fields = parse_select("name,nested.x"); + let result = select_fields(&v, &fields); + assert_eq!(result, json!({"name": "alice", "nested.x": 1})); + } + + #[test] + fn test_select_fields_no_collision() { + let v = json!({"a": {"id": 1}, "b": {"id": 2}}); + let fields = parse_select("a.id,b.id"); + let result = select_fields(&v, &fields); + assert_eq!(result, json!({"a.id": 1, "b.id": 2})); + } + + #[test] + fn test_not_eq() { + let f = Filter::parse(&["name!=alice".to_string()]).unwrap(); + assert!(!f.matches(&json!({"name": "alice"}))); + assert!(f.matches(&json!({"name": "bob"}))); + // Absent field: != should return true + assert!(f.matches(&json!({"age": 30}))); + } + + #[test] + fn test_gte() { + let f = Filter::parse(&["score>=100".to_string()]).unwrap(); + assert!(f.matches(&json!({"score": 100}))); + assert!(f.matches(&json!({"score": 150}))); + assert!(!f.matches(&json!({"score": 99}))); + } + + #[test] + fn test_lte() { + let f = Filter::parse(&["score<=100".to_string()]).unwrap(); + assert!(f.matches(&json!({"score": 100}))); + assert!(f.matches(&json!({"score": 50}))); + assert!(!f.matches(&json!({"score": 101}))); + } + + #[test] + fn test_not_regex() { + let f = Filter::parse(&["name!~^ali".to_string()]).unwrap(); + assert!(!f.matches(&json!({"name": "alice"}))); + assert!(f.matches(&json!({"name": "bob"}))); + // Absent field: !~ should return true (consistent with !=) + assert!(f.matches(&json!({"age": 30}))); + } + + #[test] + fn test_two_char_operator_precedence() { + // Ensure >= is not parsed as > with value "=100" + let f = Filter::parse(&["score>=100".to_string()]).unwrap(); + assert!(f.matches(&json!({"score": 100}))); + + // Ensure != is not parsed as ! with something else + let f = Filter::parse(&["name!=x".to_string()]).unwrap(); + assert!(f.matches(&json!({"name": "y"}))); + assert!(!f.matches(&json!({"name": "x"}))); + } +} diff --git a/cli/src/commands/stream/mod.rs b/cli/src/commands/stream/mod.rs new file mode 100644 index 00000000..afdd3257 --- /dev/null +++ b/cli/src/commands/stream/mod.rs @@ -0,0 +1,287 @@ +mod client; +mod filter; +mod output; +mod snapshot; +mod store; +#[cfg(feature = "tui")] +mod tui; + +use anyhow::{bail, Context, Result}; +use clap::Args; +use hyperstack_sdk::Subscription; + +use crate::config::HyperstackConfig; + +#[derive(Args)] +pub struct StreamArgs { + /// View to subscribe to: EntityName/mode (e.g. OreRound/latest) + pub view: Option, + + /// Entity key to watch (for state-mode subscriptions) + #[arg(short, long)] + pub key: Option, + + /// WebSocket URL override + #[arg(long)] + pub url: Option, + + /// Stack name (resolves URL from hyperstack.toml) + #[arg(short, long)] + pub stack: Option, + + /// Output raw WebSocket frames instead of merged entities + #[arg(long)] + pub raw: bool, + + /// NO_DNA agent-friendly envelope format + #[arg(long)] + pub no_dna: bool, + + /// Filter expression: field=value, field>N, field~regex (repeatable, ANDed). + /// Note: field? treats null as absent (returns false for null values) + #[arg(long = "where", value_name = "EXPR")] + pub filters: Vec, + + /// Select specific fields to output (comma-separated dot paths). Nested paths are + /// flattened to literal keys, e.g. --select "info.name" outputs {"info.name": "..."} + #[arg(long)] + pub select: Option, + + /// Exit after first entity matches filter criteria + #[arg(long)] + pub first: bool, + + /// Filter by operation type (comma-separated: snapshot,upsert,patch,delete). + /// "upsert" also matches "create". Snapshot entities are always tracked for + /// state merging but only emitted when "snapshot" is in the allowed set + #[arg(long)] + pub ops: Option, + + /// Show running count of entities/updates only + #[arg(long)] + pub count: bool, + + /// Max entities in snapshot + #[arg(long)] + pub take: Option, + + /// Skip N entities in snapshot + #[arg(long)] + pub skip: Option, + + /// Disable initial snapshot + #[arg(long)] + pub no_snapshot: bool, + + /// Resume from cursor (seq value) + #[arg(long)] + pub after: Option, + + /// Record frames to a JSON snapshot file + #[arg(long)] + pub save: Option, + + /// Auto-stop the stream after N seconds + #[arg(long)] + pub duration: Option, + + /// Replay a previously saved snapshot file instead of connecting live + #[arg(long, conflicts_with = "url", conflicts_with = "tui", conflicts_with = "duration")] + pub load: Option, + + /// Show update history for the specified --key entity + #[arg(long)] + pub history: bool, + + /// Show entity at a specific history index (0 = latest) + #[arg(long)] + pub at: Option, + + /// Show diff between consecutive updates + #[arg(long)] + pub diff: bool, + + /// Interactive TUI mode + #[arg(long, short = 'i')] + pub tui: bool, +} + +pub fn run(args: StreamArgs, config_path: &str) -> Result<()> { + // --load mode: replay from file, no WebSocket needed + // (--load + --tui conflict is enforced by clap at the arg level) + if let Some(load_path) = &args.load { + let player = snapshot::SnapshotPlayer::load(load_path)?; + let default_view = player.header.view.clone(); + let view = args.view.as_deref().unwrap_or(&default_view); + let rt = tokio::runtime::Runtime::new().context("Failed to create async runtime")?; + return rt.block_on(client::replay(player, view, &args)); + } + + let view = match args.view.as_deref() { + Some(v) => v, + None => bail!(" argument is required (e.g. OreRound/latest)"), + }; + + let url = resolve_url(&args, config_path, view)?; + + let rt = tokio::runtime::Runtime::new().context("Failed to create async runtime")?; + + if args.tui { + if args.duration.is_some() { + bail!("--duration has no effect in TUI mode; stop with 'q' or Ctrl+C."); + } + if args.count { + bail!("--count is incompatible with TUI mode."); + } + if args.save.is_some() { + bail!("--save is not yet supported in TUI mode; use 's' inside the TUI to save."); + } + if args.history || args.at.is_some() || args.diff { + bail!("--history/--at/--diff are not supported in TUI mode; use h/l keys to browse history."); + } + if args.raw { + bail!("--raw is incompatible with TUI mode; omit --tui to use raw output."); + } + if args.no_dna { + bail!("--no-dna is incompatible with TUI mode; omit --tui to use NO_DNA output."); + } + if !args.filters.is_empty() { + bail!("--where is not supported in TUI mode; use '/' inside the TUI to filter."); + } + if args.select.is_some() { + bail!("--select is not supported in TUI mode."); + } + if args.ops.is_some() { + bail!("--ops is not supported in TUI mode."); + } + if args.first { + bail!("--first is not supported in TUI mode."); + } + #[cfg(feature = "tui")] + { + return rt.block_on(tui::run_tui(url, view, &args)); + } + #[cfg(not(feature = "tui"))] + { + bail!( + "TUI mode requires the 'tui' feature.\n\ + Install with: cargo install hyperstack-cli --features tui" + ); + } + } + + eprintln!("Connecting to {} ...", url); + + rt.block_on(client::stream(url, view, &args)) +} + +pub fn build_subscription(view: &str, args: &StreamArgs) -> Subscription { + let mut sub = Subscription::new(view); + if let Some(key) = &args.key { + sub = sub.with_key(key.clone()); + } + if let Some(take) = args.take { + sub = sub.with_take(take); + } + if let Some(skip) = args.skip { + sub = sub.with_skip(skip); + } + if args.no_snapshot { + sub = sub.with_snapshot(false); + } + if let Some(after) = &args.after { + sub = sub.after(after.clone()); + } + sub +} + +fn validate_ws_url(url: &str) -> Result<()> { + if !url.starts_with("ws://") && !url.starts_with("wss://") { + bail!( + "Invalid URL scheme. Expected ws:// or wss://, got: {}", + url + ); + } + Ok(()) +} + +fn resolve_url(args: &StreamArgs, config_path: &str, view: &str) -> Result { + // 1. Explicit --url + if let Some(url) = &args.url { + validate_ws_url(url)?; + return Ok(url.clone()); + } + + let config = HyperstackConfig::load_optional(config_path)?; + + // 2. Explicit --stack name + if let Some(stack_name) = &args.stack { + if let Some(config) = &config { + if let Some(stack) = config.find_stack(stack_name) { + if let Some(url) = &stack.url { + validate_ws_url(url)?; + return Ok(url.clone()); + } + bail!( + "Stack '{}' found in config but has no url set.\n\ + Set it in hyperstack.toml or use --url to specify the WebSocket URL.", + stack_name + ); + } + } + bail!( + "Stack '{}' not found in {}.\n\ + Available stacks: {}", + stack_name, + config_path, + list_stacks(config.as_ref()), + ); + } + + // 3. Auto-match entity name from view + let entity_name = view.split('/').next().unwrap_or(view); + if let Some(config) = &config { + if let Some(stack) = config.find_stack(entity_name) { + if let Some(url) = &stack.url { + validate_ws_url(url)?; + return Ok(url.clone()); + } + } + // Only auto-select if there's exactly one stack with a URL (unambiguous) + let stacks_with_urls: Vec<_> = config.stacks.iter().filter(|s| s.url.is_some()).collect(); + if stacks_with_urls.len() == 1 { + let stack = stacks_with_urls[0]; + let name = stack.name.as_deref().unwrap_or(&stack.stack); + let url = stack.url.clone().unwrap(); + validate_ws_url(&url)?; + eprintln!("Using stack '{}' (only stack with a URL)", name); + return Ok(url); + } + } + + bail!( + "Could not determine WebSocket URL.\n\n\ + Specify one of:\n \ + --url wss://your-stack.stack.usehyperstack.com\n \ + --stack (resolves from hyperstack.toml)\n\n\ + Available stacks: {}", + list_stacks(config.as_ref()), + ) +} + +fn list_stacks(config: Option<&HyperstackConfig>) -> String { + match config { + Some(config) if !config.stacks.is_empty() => config + .stacks + .iter() + .map(|s| { + s.name + .as_deref() + .unwrap_or(&s.stack) + .to_string() + }) + .collect::>() + .join(", "), + _ => "(none — create hyperstack.toml with [[stacks]] entries)".to_string(), + } +} diff --git a/cli/src/commands/stream/output.rs b/cli/src/commands/stream/output.rs new file mode 100644 index 00000000..025322a0 --- /dev/null +++ b/cli/src/commands/stream/output.rs @@ -0,0 +1,110 @@ +use anyhow::Result; +use hyperstack_sdk::Frame; +use std::io::{self, BufWriter, Write}; + +pub enum OutputMode { + Raw, + Merged, + NoDna, +} + +/// Buffered stdout writer. Holds a single lock for the lifetime of the stream. +/// Flushes on drop. +pub struct StdoutWriter { + inner: BufWriter, +} + +impl StdoutWriter { + pub fn new() -> Self { + Self { + inner: BufWriter::new(io::stdout()), + } + } + + pub fn writeln(&mut self, line: &str) -> Result<()> { + writeln!(self.inner, "{}", line)?; + self.inner.flush()?; + Ok(()) + } +} + +impl Drop for StdoutWriter { + fn drop(&mut self) { + let _ = self.inner.flush(); + } +} + +/// Print a raw WebSocket frame as a single JSON line to stdout. +pub fn print_raw_frame(out: &mut StdoutWriter, frame: &Frame) -> Result<()> { + let line = serde_json::to_string(frame)?; + out.writeln(&line) +} + +/// Print a merged entity update as a single JSON line to stdout. +pub fn print_entity_update( + out: &mut StdoutWriter, + view: &str, + key: &str, + op: &str, + data: &serde_json::Value, +) -> Result<()> { + let output = serde_json::json!({ + "view": view, + "key": key, + "op": op, + "data": data, + }); + let line = serde_json::to_string(&output)?; + out.writeln(&line) +} + +/// Print an entity deletion as a single JSON line to stdout. +pub fn print_delete(out: &mut StdoutWriter, view: &str, key: &str) -> Result<()> { + let output = serde_json::json!({ + "view": view, + "key": key, + "op": "delete", + "data": null, + }); + let line = serde_json::to_string(&output)?; + out.writeln(&line) +} + +/// Print a running update count to stderr (overwrites line). +pub fn print_count(count: u64) -> Result<()> { + eprint!("\rUpdates: {} ", count); // trailing spaces clear leftover chars + std::io::stderr().flush()?; + Ok(()) +} + +/// Move to a new line after overwriting count display. +pub fn finalize_count() { + eprintln!(); +} + +/// Emit a NO_DNA envelope event as a single JSON line to stdout. +pub fn emit_no_dna_event( + out: &mut StdoutWriter, + action: &str, + view: &str, + data: &serde_json::Value, + update_count: u64, + entity_count: u64, +) -> Result<()> { + let output = serde_json::json!({ + "schema": "no-dna/v1", + "tool": "hs-stream", + "action": action, + "status": if action == "disconnected" || action == "error" { "done" } else { "streaming" }, + "data": { + "view": view, + "payload": data, + }, + "meta": { + "update_count": update_count, + "entities_tracked": entity_count, + }, + }); + let line = serde_json::to_string(&output)?; + out.writeln(&line) +} diff --git a/cli/src/commands/stream/snapshot.rs b/cli/src/commands/stream/snapshot.rs new file mode 100644 index 00000000..585504a9 --- /dev/null +++ b/cli/src/commands/stream/snapshot.rs @@ -0,0 +1,193 @@ +use anyhow::{Context, Result}; +use hyperstack_sdk::Frame; +use serde::{Deserialize, Serialize}; +use std::fs; +use std::io::{self, Write}; + +#[derive(Debug, Serialize, Deserialize)] +pub struct SnapshotHeader { + pub version: u32, + pub view: String, + pub url: String, + pub captured_at: String, + pub duration_ms: u64, + pub frame_count: u64, +} + +#[derive(Debug, Serialize, Deserialize)] +pub struct SnapshotFrame { + pub ts: u64, + pub frame: Frame, +} + +pub struct SnapshotRecorder { + frames: Vec, + view: String, + url: String, + start_time: std::time::Instant, + start_timestamp: chrono::DateTime, + limit_warned: bool, +} + +impl SnapshotRecorder { + pub fn new(view: &str, url: &str) -> Self { + Self { + frames: Vec::new(), + view: view.to_string(), + url: url.to_string(), + start_time: std::time::Instant::now(), + start_timestamp: chrono::Utc::now(), + limit_warned: false, + } + } + + const MAX_FRAMES: usize = 100_000; + + pub fn record(&mut self, frame: &Frame) { + if self.frames.len() >= Self::MAX_FRAMES { + if !self.limit_warned { + eprintln!( + "Warning: snapshot recorder reached {} frames limit. Further frames will be dropped. \ + Use --duration to limit recording time.", + Self::MAX_FRAMES + ); + self.limit_warned = true; + } + return; + } + let ts = self.start_time.elapsed().as_millis() as u64; + self.frames.push(SnapshotFrame { + ts, + frame: frame.clone(), + }); + } + + #[cfg(feature = "tui")] + pub fn record_with_ts(&mut self, frame: &Frame, ts_ms: u64) { + if self.frames.len() >= Self::MAX_FRAMES { + return; + } + self.frames.push(SnapshotFrame { + ts: ts_ms, + frame: frame.clone(), + }); + } + + pub fn save(&self, path: &str) -> Result<()> { + // Compute duration from frame timestamps (first to last), falling back to elapsed + let duration_ms = if self.frames.len() >= 2 { + self.frames.last().unwrap().ts - self.frames.first().unwrap().ts + } else { + self.start_time.elapsed().as_millis() as u64 + }; + let header = SnapshotHeader { + version: 1, + view: self.view.clone(), + url: self.url.clone(), + captured_at: self.start_timestamp.to_rfc3339(), + duration_ms, + frame_count: self.frames.len() as u64, + }; + + // Stream-serialize to tmp file to avoid holding the entire JSON in memory. + let dest = std::path::Path::new(path); + let parent = dest.parent().unwrap_or_else(|| std::path::Path::new(".")); + let file_name = dest.file_name().unwrap_or_default(); + let tmp_path = parent.join(format!("{}.tmp", file_name.to_string_lossy())).to_string_lossy().into_owned(); + { + let file = fs::File::create(&tmp_path) + .with_context(|| format!("Failed to create snapshot file: {}", tmp_path))?; + let mut writer = io::BufWriter::new(file); + + // Write header fields + writeln!(writer, "{{")?; + writeln!(writer, " \"version\": {},", header.version)?; + writeln!(writer, " \"view\": {},", serde_json::to_string(&header.view)?)?; + writeln!(writer, " \"url\": {},", serde_json::to_string(&header.url)?)?; + writeln!(writer, " \"captured_at\": {},", serde_json::to_string(&header.captured_at)?)?; + writeln!(writer, " \"duration_ms\": {},", header.duration_ms)?; + writeln!(writer, " \"frame_count\": {},", header.frame_count)?; + + // Stream frames array one entry at a time + writeln!(writer, " \"frames\": [")?; + for (i, frame) in self.frames.iter().enumerate() { + let frame_json = serde_json::to_string(frame)?; + if i > 0 { + writeln!(writer, ",")?; + } + write!(writer, " {}", frame_json)?; + } + writeln!(writer, "\n ]")?; + writeln!(writer, "}}")?; + writer.flush()?; + } + // Attempt remove; if it fails, let rename itself fail with a clear error + // (don't silently swallow remove errors that may mask the true state). + #[cfg(windows)] + if dest.exists() { + fs::remove_file(path) + .with_context(|| format!("Failed to remove existing snapshot at {}", path))?; + } + fs::rename(&tmp_path, path) + .map_err(|e| { + // Best-effort cleanup of the tmp file before propagating + let _ = fs::remove_file(&tmp_path); + anyhow::anyhow!("Failed to rename snapshot to {}: {}", path, e) + })?; + + eprintln!( + "Saved {} frames ({:.1}s) to {}", + self.frames.len(), + duration_ms as f64 / 1000.0, + path + ); + Ok(()) + } +} + +pub struct SnapshotPlayer { + pub header: SnapshotHeader, + pub frames: Vec, +} + +/// Combined struct for single-pass deserialization (avoids cloning the entire JSON) +#[derive(Deserialize)] +struct SnapshotFile { + #[serde(flatten)] + header: SnapshotHeader, + #[serde(default)] + frames: Vec, +} + +impl SnapshotPlayer { + pub fn load(path: &str) -> Result { + let contents = fs::read_to_string(path) + .with_context(|| format!("Failed to read snapshot file: {}", path))?; + + let file: SnapshotFile = serde_json::from_str(&contents) + .with_context(|| format!("Failed to parse snapshot file: {}", path))?; + + if file.header.version != 1 { + anyhow::bail!( + "Unsupported snapshot version {} in {}. This CLI supports version 1.", + file.header.version, + path + ); + } + + if file.frames.is_empty() { + eprintln!("Warning: snapshot file {} has no 'frames' key — replaying 0 frames.", path); + } + let frames = file.frames; + + eprintln!( + "Loaded snapshot: {} frames, {:.1}s, view={}, captured={}", + frames.len(), + file.header.duration_ms as f64 / 1000.0, + file.header.view, + file.header.captured_at, + ); + + Ok(Self { header: file.header, frames }) + } +} diff --git a/cli/src/commands/stream/store.rs b/cli/src/commands/stream/store.rs new file mode 100644 index 00000000..5350003b --- /dev/null +++ b/cli/src/commands/stream/store.rs @@ -0,0 +1,318 @@ +use hyperstack_sdk::deep_merge_with_append; +use serde_json::Value; +use std::collections::{HashMap, VecDeque}; + +const DEFAULT_MAX_HISTORY: usize = 1000; + +pub struct EntityStore { + entities: HashMap, + max_history: usize, +} + +pub struct EntityRecord { + pub current: Value, + pub history: VecDeque, +} + +#[derive(Clone)] +pub struct HistoryEntry { + pub seq: Option, + pub op: String, + pub state: Value, + pub patch: Option, +} + +#[allow(dead_code)] +impl EntityStore { + pub fn new() -> Self { + Self { + entities: HashMap::new(), + max_history: DEFAULT_MAX_HISTORY, + } + } + + pub fn entity_count(&self) -> usize { + self.entities.len() + } + + pub fn get(&self, key: &str) -> Option<&EntityRecord> { + self.entities.get(key) + } + + /// Apply an upsert/create operation. Returns the full entity state. + pub fn upsert(&mut self, key: &str, data: Value, op: &str, seq: Option) -> &Value { + let record = self.entities.entry(key.to_string()).or_insert_with(|| { + EntityRecord { + current: Value::Null, + history: VecDeque::new(), + } + }); + + record.current = data.clone(); + record.history.push_back(HistoryEntry { + seq, + op: op.to_string(), + state: data, + patch: None, + }); + + if record.history.len() > self.max_history { + record.history.pop_front(); + } + + &record.current + } + + /// Apply a patch operation. Returns the merged entity state. + pub fn patch( + &mut self, + key: &str, + patch_data: &Value, + append_paths: &[String], + seq: Option, + ) -> &Value { + let record = self.entities.entry(key.to_string()).or_insert_with(|| { + EntityRecord { + current: serde_json::json!({}), + history: VecDeque::new(), + } + }); + + let raw_patch = patch_data.clone(); + deep_merge_with_append(&mut record.current, patch_data, append_paths, ""); + + record.history.push_back(HistoryEntry { + seq, + op: "patch".to_string(), + state: record.current.clone(), + patch: Some(raw_patch), + }); + + if record.history.len() > self.max_history { + record.history.pop_front(); + } + + &record.current + } + + /// Mark an entity as deleted, retaining its history for post-stream analysis. + pub fn delete(&mut self, key: &str) { + if let Some(record) = self.entities.get_mut(key) { + let deleted_state = serde_json::json!({"_deleted": true}); + record.history.push_back(HistoryEntry { + seq: None, + op: "delete".to_string(), + state: deleted_state.clone(), + patch: None, + }); + record.current = deleted_state; + if record.history.len() > self.max_history { + record.history.pop_front(); + } + } + } + + /// Get entity state at a specific history index (0 = latest). + pub fn at(&self, key: &str, index: usize) -> Option<&HistoryEntry> { + let record = self.entities.get(key)?; + if index >= record.history.len() { + return None; + } + let actual_idx = record.history.len().checked_sub(index.checked_add(1)?)?; + record.history.get(actual_idx) + } + + /// Get entity state at an absolute VecDeque index. + pub fn at_absolute(&self, key: &str, abs_idx: usize) -> Option<&HistoryEntry> { + let record = self.entities.get(key)?; + record.history.get(abs_idx) + } + + /// Get the history length for a key. + pub fn history_len(&self, key: &str) -> usize { + self.entities.get(key).map(|r| r.history.len()).unwrap_or(0) + } + + /// Get the diff between two consecutive history entries. + /// Returns (added/changed fields, removed fields). + pub fn diff_at(&self, key: &str, index: usize) -> Option { + let record = self.entities.get(key)?; + if record.history.is_empty() { + return None; + } + + let actual_idx = record.history.len().checked_sub(index.checked_add(1)?)?; + let entry = record.history.get(actual_idx)?; + + // If this entry has a raw patch, use it directly + if let Some(patch) = &entry.patch { + return Some(serde_json::json!({ + "op": entry.op, + "index": index, + "total": record.history.len(), + "patch": patch, + "state": entry.state, + })); + } + + // Otherwise diff against previous state + let previous = if actual_idx > 0 { + &record.history.get(actual_idx - 1)?.state + } else { + &Value::Null + }; + + let changes = compute_diff(previous, &entry.state); + Some(serde_json::json!({ + "op": entry.op, + "index": index, + "total": record.history.len(), + "changes": changes, + "state": entry.state, + })) + } + + /// Get the full history for an entity as a JSON array. + /// Entries are ordered newest-first. The `index` field matches `at(key, index)`. + pub fn history(&self, key: &str) -> Option { + let record = self.entities.get(key)?; + let entries: Vec = record + .history + .iter() + .enumerate() + .rev() + .map(|(i, entry)| { + let rev_idx = record.history.len() - 1 - i; + serde_json::json!({ + "index": rev_idx, + "op": entry.op, + "seq": entry.seq, + "state": entry.state, + }) + }) + .collect(); + Some(Value::Array(entries)) + } +} + +/// Compute a shallow (top-level only) diff between two JSON values. +/// For nested objects, reports the entire sub-object as changed. Patch operations +/// use the raw patch instead of this diff, so this only affects upsert/snapshot history. +fn compute_diff(old: &Value, new: &Value) -> Value { + match (old, new) { + (Value::Object(old_map), Value::Object(new_map)) => { + let mut diff = serde_json::Map::new(); + + for (key, new_val) in new_map { + match old_map.get(key) { + Some(old_val) if old_val != new_val => { + diff.insert( + key.clone(), + serde_json::json!({ + "from": old_val, + "to": new_val, + }), + ); + } + None => { + diff.insert( + key.clone(), + serde_json::json!({ + "added": new_val, + }), + ); + } + _ => {} + } + } + + for key in old_map.keys() { + if !new_map.contains_key(key) { + diff.insert( + key.clone(), + serde_json::json!({ + "removed": old_map.get(key), + }), + ); + } + } + + Value::Object(diff) + } + _ if old != new => { + serde_json::json!({ + "from": old, + "to": new, + }) + } + _ => Value::Null, + } +} + +#[cfg(test)] +mod tests { + use super::*; + use serde_json::json; + + #[test] + fn test_upsert_and_history() { + let mut store = EntityStore::new(); + store.upsert("k1", json!({"a": 1}), "upsert", None); + store.upsert("k1", json!({"a": 2}), "upsert", None); + + assert_eq!(store.get("k1").unwrap().current, json!({"a": 2})); + assert_eq!(store.get("k1").unwrap().history.len(), 2); + + let at0 = store.at("k1", 0).unwrap(); + assert_eq!(at0.state, json!({"a": 2})); + + let at1 = store.at("k1", 1).unwrap(); + assert_eq!(at1.state, json!({"a": 1})); + } + + #[test] + fn test_patch() { + let mut store = EntityStore::new(); + store.upsert("k1", json!({"a": 1, "b": 2}), "upsert", None); + store.patch("k1", &json!({"a": 10}), &[], None); + + assert_eq!(store.get("k1").unwrap().current, json!({"a": 10, "b": 2})); + assert_eq!(store.get("k1").unwrap().history.len(), 2); + } + + #[test] + fn test_diff() { + let mut store = EntityStore::new(); + store.upsert("k1", json!({"a": 1, "b": 2}), "upsert", None); + store.patch("k1", &json!({"a": 10}), &[], None); + + let diff = store.diff_at("k1", 0).unwrap(); + // Latest entry is a patch, so it should include the raw patch + assert_eq!(diff["patch"], json!({"a": 10})); + } + + #[test] + fn test_delete() { + let mut store = EntityStore::new(); + store.upsert("k1", json!({"a": 1}), "upsert", None); + store.delete("k1"); + // Entity is retained with tombstone for history access + let record = store.get("k1").expect("deleted entity should be retained"); + assert_eq!(record.current, json!({"_deleted": true})); + assert_eq!(record.history.len(), 2); // upsert + delete + assert_eq!(record.history.back().unwrap().op, "delete"); + } + + #[test] + fn test_compute_diff() { + let old = json!({"a": 1, "b": 2, "c": 3}); + let new = json!({"a": 1, "b": 5, "d": 4}); + let diff = compute_diff(&old, &new); + + assert!(diff.get("a").is_none()); // unchanged + assert_eq!(diff["b"]["from"], json!(2)); + assert_eq!(diff["b"]["to"], json!(5)); + assert_eq!(diff["c"]["removed"], json!(3)); + assert_eq!(diff["d"]["added"], json!(4)); + } +} diff --git a/cli/src/commands/stream/tui/app.rs b/cli/src/commands/stream/tui/app.rs new file mode 100644 index 00000000..376ca684 --- /dev/null +++ b/cli/src/commands/stream/tui/app.rs @@ -0,0 +1,806 @@ +use hyperstack_sdk::{parse_snapshot_entities, Frame, Operation}; +use ratatui::widgets::ListState; +use serde_json::Value; +use std::collections::{HashSet, VecDeque}; +use std::fmt::Write as FmtWrite; + +use crate::commands::stream::snapshot::SnapshotRecorder; +use crate::commands::stream::store::EntityStore; + +const MAX_STATUS_AGE_MS: u128 = 3000; + +/// Pretty-print JSON with compact inline arrays when they fit within max_width. +pub fn compact_pretty(value: &Value, max_width: usize) -> String { + let mut out = String::new(); + write_value(&mut out, value, 0, max_width); + out +} + +fn write_value(out: &mut String, value: &Value, indent: usize, max_width: usize) { + match value { + Value::Object(map) => { + if map.is_empty() { + out.push_str("{}"); + return; + } + out.push_str("{\n"); + let inner = indent + 2; + for (i, (k, v)) in map.iter().enumerate() { + write_indent(out, inner); + let _ = write!(out, "\"{}\": ", k); + write_value(out, v, inner, max_width); + if i + 1 < map.len() { + out.push(','); + } + out.push('\n'); + } + write_indent(out, indent); + out.push('}'); + } + Value::Array(arr) => { + if arr.is_empty() { + out.push_str("[]"); + return; + } + // Try compact form: [elem1, elem2, ...] + let compact = serde_json::to_string(value).unwrap_or_default(); + if indent + compact.len() <= max_width { + out.push_str(&compact); + return; + } + // Fall back to expanded form + out.push_str("[\n"); + let inner = indent + 2; + for (i, v) in arr.iter().enumerate() { + write_indent(out, inner); + write_value(out, v, inner, max_width); + if i + 1 < arr.len() { + out.push(','); + } + out.push('\n'); + } + write_indent(out, indent); + out.push(']'); + } + _ => { + let s = serde_json::to_string(value).unwrap_or_default(); + out.push_str(&s); + } + } +} + +fn write_indent(out: &mut String, n: usize) { + for _ in 0..n { + out.push(' '); + } +} + +pub enum TuiAction { + Quit, + NextEntity, + PrevEntity, + FocusDetail, + BackToList, + HistoryForward, + HistoryBack, + HistoryOldest, + HistoryNewest, + ToggleDiff, + ToggleRaw, + TogglePause, + StartFilter, + SaveSnapshot, + FilterChar(char), + FilterBackspace, + FilterClear, + FilterDeleteWord, + // Detail pane scroll + ScrollDetailDown, + ScrollDetailUp, + ScrollDetailTop, + ScrollDetailBottom, + ScrollDetailHalfDown, + ScrollDetailHalfUp, + // Sorting + CycleSortMode, + ToggleSortDirection, + // Vim motions + GotoTop, + GotoBottom, + HalfPageDown, + HalfPageUp, + NextMatch, +} + +#[derive(Clone, Copy, PartialEq)] +pub enum ViewMode { + List, + Detail, +} + +#[derive(Clone, PartialEq)] +pub enum SortMode { + Insertion, + Field(String), +} + +#[derive(Clone, Copy, PartialEq)] +pub enum SortDirection { + Ascending, + Descending, +} + +#[allow(dead_code)] +pub struct App { + pub view: String, + pub url: String, + pub view_mode: ViewMode, + pub entity_keys: Vec, + entity_key_set: HashSet, + pub selected_index: usize, + pub history_position: usize, + /// Absolute VecDeque index when browsing history (position > 0). + /// Stays stable as new frames arrive. None when viewing latest. + history_anchor: Option, + pub show_diff: bool, + pub show_raw: bool, + pub paused: bool, + pub disconnected: bool, + pub filter_input_active: bool, + pub filter_text: String, + pub status_message: String, + pub status_time: std::time::Instant, + pub update_count: u64, + pub scroll_offset: u16, + pub visible_rows: usize, + pub terminal_width: u16, + pub sort_mode: SortMode, + pub sort_direction: SortDirection, + pub pending_count: Option, + pub pending_g: bool, + pub list_state: ListState, + store: EntityStore, + raw_frames: VecDeque<(std::time::Instant, Frame)>, + stream_start: std::time::Instant, + pub dropped_frames: std::sync::Arc, + filtered_cache: Option>, +} + +impl App { + pub fn new(view: String, url: String, dropped_frames: std::sync::Arc) -> Self { + Self { + view: view.clone(), + url: url.clone(), + view_mode: ViewMode::List, + entity_keys: Vec::new(), + entity_key_set: HashSet::new(), + selected_index: 0, + history_position: 0, + history_anchor: None, + show_diff: false, + show_raw: false, + paused: false, + disconnected: false, + filter_input_active: false, + filter_text: String::new(), + status_message: "Connected".to_string(), + status_time: std::time::Instant::now(), + update_count: 0, + scroll_offset: 0, + visible_rows: 30, + terminal_width: 120, + sort_mode: SortMode::Insertion, + sort_direction: SortDirection::Descending, + pending_count: None, + pending_g: false, + list_state: ListState::default().with_selected(Some(0)), + store: EntityStore::new(), + raw_frames: VecDeque::new(), + stream_start: std::time::Instant::now(), + dropped_frames, + filtered_cache: None, + } + } + + fn invalidate_filter_cache(&mut self) { + self.filtered_cache = None; + } + + /// Compensate history_anchor when the selected entity's history grows. + /// If a pop_front happened (len didn't grow despite a push), decrement anchor. + fn compensate_history_anchor(&mut self, updated_key: &str, len_before: usize) { + if let Some(anchor) = self.history_anchor { + if let Some(selected) = self.selected_key() { + if selected == updated_key { + let len_after = self.store.history_len(updated_key); + // pop_front happened if length didn't increase + if len_after == len_before { + if anchor == 0 { + // The entry we were viewing was evicted + self.set_status("History entry evicted"); + // Stay at oldest available + } else { + self.history_anchor = Some(anchor - 1); + } + } + // No pop: anchor stays valid (new entry appended to back) + } + } + } + } + + pub fn apply_frame(&mut self, frame: Frame) { + // Invalidation is cheap (sets to None). The cache is only rebuilt once per + // render tick in ensure_filtered_cache(), not per-frame, since we drain all + // frames before drawing. + self.invalidate_filter_cache(); + + // Always collect raw frames so toggling on shows recent data + let raw_frame = frame.clone(); + let op = frame.operation(); + + match op { + Operation::Snapshot => { + let entities = parse_snapshot_entities(&frame.data); + let count = entities.len() as u64; + for entity in entities { + self.store.upsert(&entity.key, entity.data, "snapshot", None); + if self.entity_key_set.insert(entity.key.clone()) { + self.entity_keys.push(entity.key); + } + } + self.update_count += count; + } + Operation::Upsert | Operation::Create => { + let key = frame.key.clone(); + let seq = frame.seq.clone(); + let len_before = self.store.history_len(&key); + self.store + .upsert(&key, frame.data, &frame.op, seq); + self.compensate_history_anchor(&key, len_before); + if self.entity_key_set.insert(key.clone()) { + self.entity_keys.push(key); + } + self.update_count += 1; + } + Operation::Patch => { + let key = frame.key.clone(); + let seq = frame.seq.clone(); + let len_before = self.store.history_len(&key); + self.store + .patch(&key, &frame.data, &frame.append, seq); + self.compensate_history_anchor(&key, len_before); + if self.entity_key_set.insert(key.clone()) { + self.entity_keys.push(key); + } + self.update_count += 1; + } + Operation::Delete => { + let deleted_pos = self.entity_keys.iter().position(|k| k == &frame.key); + self.store.delete(&frame.key); + self.entity_key_set.remove(&frame.key); + self.entity_keys.retain(|k| k != &frame.key); + self.update_count += 1; + // If deleted entity was before cursor, shift cursor back to preserve selection + if let Some(pos) = deleted_pos { + if pos < self.selected_index && self.selected_index > 0 { + self.selected_index -= 1; + } + } + if self.selected_index >= self.entity_keys.len() && !self.entity_keys.is_empty() { + self.selected_index = self.entity_keys.len() - 1; + } + self.history_position = 0; + self.history_anchor = None; + self.scroll_offset = 0; + } + Operation::Subscribed => { + self.set_status("Subscribed"); + } + } + + self.raw_frames.push_back((std::time::Instant::now(), raw_frame)); + while self.raw_frames.len() > 1000 { + self.raw_frames.pop_front(); + } + } + + /// Take and reset the pending count prefix (e.g. "10j" → 10). Returns 1 if no count. + fn take_count(&mut self) -> usize { + let n = self.pending_count.unwrap_or(1); + self.pending_count = None; + self.pending_g = false; + n + } + + pub fn handle_action(&mut self, action: TuiAction) { + self.ensure_filtered_cache(); + // Reset pending_g after every action (including GotoTop) + self.pending_g = false; + + match action { + TuiAction::Quit => {} + TuiAction::ScrollDetailDown => { + let n = self.take_count(); + self.scroll_offset = self.scroll_offset.saturating_add(n as u16) + .min(self.max_scroll_offset()); + } + TuiAction::ScrollDetailUp => { + let n = self.take_count(); + self.scroll_offset = self.scroll_offset.saturating_sub(n as u16); + } + TuiAction::ScrollDetailTop => { + self.pending_count = None; + self.scroll_offset = 0; + } + TuiAction::ScrollDetailBottom => { + self.pending_count = None; + self.scroll_offset = self.max_scroll_offset(); + } + TuiAction::ScrollDetailHalfDown => { + let half = (self.visible_rows / 2).max(1); + self.scroll_offset = self.scroll_offset.saturating_add(half as u16) + .min(self.max_scroll_offset()); + } + TuiAction::ScrollDetailHalfUp => { + let half = (self.visible_rows / 2).max(1); + self.scroll_offset = self.scroll_offset.saturating_sub(half as u16); + } + TuiAction::NextEntity => { + let n = self.take_count(); + let count = self.filtered_keys().len(); + if count > 0 { + self.selected_index = (self.selected_index + n).min(count - 1); + self.history_position = 0; + self.history_anchor = None; + self.scroll_offset = 0; + } + } + TuiAction::PrevEntity => { + let n = self.take_count(); + self.selected_index = self.selected_index.saturating_sub(n); + self.history_position = 0; + self.history_anchor = None; + self.scroll_offset = 0; + } + TuiAction::FocusDetail => { + self.view_mode = ViewMode::Detail; + self.scroll_offset = 0; + } + TuiAction::BackToList => { + if self.filter_input_active { + self.filter_input_active = false; + } else { + self.view_mode = ViewMode::List; + self.scroll_offset = 0; + } + } + TuiAction::HistoryBack => { + if let Some(key) = self.selected_key() { + let hist_len = self.store.history_len(&key); + if hist_len == 0 { /* no-op */ } + else if let Some(anchor) = self.history_anchor { + // Already browsing — move anchor backward (toward older) + if anchor > 0 { + self.history_anchor = Some(anchor - 1); + self.history_position += 1; + } + } else if hist_len >= 2 { + // Start browsing — anchor to second-to-last entry + self.history_anchor = Some(hist_len - 2); + self.history_position = 1; + } + } + self.scroll_offset = 0; + } + TuiAction::HistoryForward => { + if let Some(key) = self.selected_key() { + let hist_len = self.store.history_len(&key); + if let Some(anchor) = self.history_anchor { + if anchor + 1 >= hist_len { + // Reached latest — clear anchor + self.history_anchor = None; + self.history_position = 0; + self.history_anchor = None; + } else { + self.history_anchor = Some(anchor + 1); + self.history_position = self.history_position.saturating_sub(1); + } + } + } + self.scroll_offset = 0; + } + TuiAction::HistoryOldest => { + if let Some(key) = self.selected_key() { + let hist_len = self.store.history_len(&key); + if hist_len > 0 { + self.history_anchor = Some(0); + self.history_position = hist_len.saturating_sub(1); + } + } + self.scroll_offset = 0; + } + TuiAction::HistoryNewest => { + self.history_position = 0; + self.history_anchor = None; + self.history_anchor = None; + self.scroll_offset = 0; + } + TuiAction::ToggleDiff => { + self.show_diff = !self.show_diff; + self.set_status(if self.show_diff { "Diff view ON" } else { "Diff view OFF" }); + } + TuiAction::ToggleRaw => { + self.show_raw = !self.show_raw; + self.set_status(if self.show_raw { "Raw frames ON" } else { "Raw frames OFF" }); + } + TuiAction::CycleSortMode => { + self.sort_mode = match &self.sort_mode { + SortMode::Insertion => SortMode::Field("_seq".to_string()), + SortMode::Field(_) => SortMode::Insertion, + }; + self.invalidate_filter_cache(); + let label = match &self.sort_mode { + SortMode::Insertion => "Sort: insertion order".to_string(), + SortMode::Field(f) => format!("Sort: {} {}", f, match self.sort_direction { + SortDirection::Ascending => "asc", + SortDirection::Descending => "desc", + }), + }; + self.set_status(&label); + } + TuiAction::ToggleSortDirection => { + self.sort_direction = match self.sort_direction { + SortDirection::Ascending => SortDirection::Descending, + SortDirection::Descending => SortDirection::Ascending, + }; + self.invalidate_filter_cache(); + let label = match &self.sort_mode { + SortMode::Insertion => "Sort direction toggled (no effect in insertion order)".to_string(), + SortMode::Field(f) => format!("Sort: {} {}", f, match self.sort_direction { + SortDirection::Ascending => "asc", + SortDirection::Descending => "desc", + }), + }; + self.set_status(&label); + } + TuiAction::TogglePause => { + self.paused = !self.paused; + self.set_status(if self.paused { "PAUSED" } else { "Resumed" }); + } + TuiAction::StartFilter => { + self.filter_input_active = true; + self.filter_text.clear(); + } + TuiAction::SaveSnapshot => { + // Note: this does synchronous file I/O on the runtime thread. Acceptable + // because raw_frames is capped at 1000 entries. For larger caps, consider + // spawning onto a blocking thread. + let mut recorder = SnapshotRecorder::new(&self.view, &self.url); + for (arrival_time, frame) in &self.raw_frames { + let ts_ms = arrival_time.duration_since(self.stream_start).as_millis() as u64; + recorder.record_with_ts(frame, ts_ms); + } + let filename = format!("hs-stream-{}.json", chrono::Utc::now().format("%Y%m%d-%H%M%S%.3f")); + match recorder.save(&filename) { + Ok(_) => self.set_status(&format!("Saved to {}", filename)), + Err(e) => self.set_status(&format!("Save failed: {}", e)), + } + } + TuiAction::FilterChar(c) => { + self.filter_text.push(c); + self.invalidate_filter_cache(); + self.clamp_selection(); + } + TuiAction::FilterBackspace => { + self.filter_text.pop(); + self.invalidate_filter_cache(); + self.clamp_selection(); + } + TuiAction::FilterClear => { + self.filter_text.clear(); + self.invalidate_filter_cache(); + self.clamp_selection(); + } + TuiAction::FilterDeleteWord => { + // Delete back to previous word boundary (or start) + let trimmed = self.filter_text.trim_end(); + if let Some(pos) = trimmed.rfind(|c: char| c.is_whitespace()) { + self.filter_text.truncate(pos + 1); + } else { + self.filter_text.clear(); + } + self.invalidate_filter_cache(); + self.clamp_selection(); + } + TuiAction::GotoTop => { + self.pending_count = None; + self.selected_index = 0; + self.history_position = 0; + self.history_anchor = None; + self.scroll_offset = 0; + } + TuiAction::GotoBottom => { + self.pending_count = None; + let count = self.filtered_keys().len(); + if count > 0 { + self.selected_index = count - 1; + } + self.history_position = 0; + self.history_anchor = None; + self.scroll_offset = 0; + } + TuiAction::HalfPageDown => { + let n = self.take_count(); + let half = self.visible_rows / 2; + let count = self.filtered_keys().len(); + if count > 0 { + self.selected_index = (self.selected_index + half * n).min(count - 1); + } + self.history_position = 0; + self.history_anchor = None; + self.scroll_offset = 0; + } + TuiAction::HalfPageUp => { + let n = self.take_count(); + let half = self.visible_rows / 2; + self.selected_index = self.selected_index.saturating_sub(half * n); + self.history_position = 0; + self.history_anchor = None; + self.scroll_offset = 0; + } + TuiAction::NextMatch => { + if self.filter_text.is_empty() { + return; + } + let n = self.take_count(); + let keys = self.filtered_keys(); + let count = keys.len(); + if count > 0 { + self.selected_index = (self.selected_index + n) % count; + } + self.history_position = 0; + self.history_anchor = None; + self.scroll_offset = 0; + } + } + self.list_state.select(Some(self.selected_index)); + } + + fn clamp_selection(&mut self) { + self.ensure_filtered_cache(); + let count = self.filtered_keys().len(); + if count == 0 { + self.selected_index = 0; + } else if self.selected_index >= count { + self.selected_index = count - 1; + } + self.history_position = 0; + self.history_anchor = None; + self.scroll_offset = 0; + self.list_state.select(Some(self.selected_index)); + } + + /// Maximum scroll offset for the detail pane (total lines - visible height). + fn max_scroll_offset(&self) -> u16 { + let total_lines = self.selected_entity_data() + .map(|s| s.lines().count()) + .unwrap_or(0); + // visible_rows approximates the detail pane height (minus borders) + let visible = self.visible_rows.saturating_sub(2); + if total_lines > visible { + (total_lines - visible) as u16 + } else { + 0 + } + } + + pub fn selected_key(&self) -> Option { + let keys = self.filtered_keys(); + keys.get(self.selected_index).map(|s| s.to_string()) + } + + pub fn selected_entity_data(&self) -> Option { + let key = self.selected_key()?; + + // Raw mode: show the most recent raw frame for this entity key. + // Snapshot frames have key="" (entities are in data array), so fall back + // to showing the merged state with a note for snapshot-only entities. + if self.show_raw { + if let Some((_, raw)) = self.raw_frames.iter().rev().find(|(_, f)| f.key == key) { + return Some(serde_json::to_string_pretty(raw).unwrap_or_default()); + } + // Entity was ingested via snapshot batch — no individual raw frame exists + let record = self.store.get(&key)?; + let fallback = serde_json::json!({ + "_note": "Received via snapshot batch (no individual raw frame)", + "key": key, + "data": record.current, + }); + return Some(serde_json::to_string_pretty(&fallback).unwrap_or_default()); + } + + if self.show_diff { + // Use anchor-based index if available for stable diff view + if let Some(anchor) = self.history_anchor { + let entry = self.store.at_absolute(&key, anchor)?; + // Compute diff manually against previous entry + if anchor > 0 { + if let Some(prev) = self.store.at_absolute(&key, anchor - 1) { + let diff = serde_json::json!({ + "op": entry.op, + "state": entry.state, + "patch": entry.patch, + "previous_state": prev.state, + }); + return Some(serde_json::to_string_pretty(&diff).unwrap_or_default()); + } + } + return Some(serde_json::to_string_pretty(&serde_json::json!({ + "op": entry.op, + "state": entry.state, + "patch": entry.patch, + })).unwrap_or_default()); + } + let diff = self.store.diff_at(&key, self.history_position)?; + return Some(serde_json::to_string_pretty(&diff).unwrap_or_default()); + } + + let w = self.terminal_width as usize; + + // Use anchor for stable history browsing during streaming + if let Some(anchor) = self.history_anchor { + let entry = self.store.at_absolute(&key, anchor)?; + return Some(compact_pretty(&entry.state, w)); + } + + if self.history_position > 0 { + let entry = self.store.at(&key, self.history_position)?; + return Some(compact_pretty(&entry.state, w)); + } + + let record = self.store.get(&key)?; + Some(compact_pretty(&record.current, w)) + } + + pub fn selected_history_len(&self) -> usize { + self.selected_key() + .and_then(|k| self.store.get(&k)) + .map(|r| r.history.len()) + .unwrap_or(0) + } + + pub fn status(&self) -> &str { + if self.status_time.elapsed().as_millis() < MAX_STATUS_AGE_MS { + &self.status_message + } else if self.paused { + "PAUSED" + } else { + "Streaming" + } + } + + fn set_status(&mut self, msg: &str) { + self.status_message = msg.to_string(); + self.status_time = std::time::Instant::now(); + } + + pub fn set_disconnected(&mut self) { + self.disconnected = true; + self.set_status("Disconnected"); + } + + /// Returns cached filtered keys. + /// Panics in debug builds if `ensure_filtered_cache()` was not called first. + pub fn filtered_keys(&self) -> &[String] { + debug_assert!(self.filtered_cache.is_some(), "filtered_keys() called without ensure_filtered_cache()"); + self.filtered_cache.as_deref().unwrap_or(&[]) + } + + /// Rebuild the filter cache if invalidated. + pub fn ensure_filtered_cache(&mut self) { + if self.filtered_cache.is_some() { + return; + } + let mut result = if self.filter_text.is_empty() { + self.entity_keys.clone() + } else { + let lower = self.filter_text.to_lowercase(); + self.entity_keys + .iter() + .filter(|k| { + if k.to_lowercase().contains(&lower) { + return true; + } + if let Some(record) = self.store.get(k) { + return value_contains_str(&record.current, &lower); + } + false + }) + .cloned() + .collect() + }; + // Apply sort if not insertion order + if let SortMode::Field(ref path) = self.sort_mode { + let path = path.clone(); + let dir = self.sort_direction; + let store = &self.store; + result.sort_by(|a, b| { + let va = store.get(a).and_then(|r| resolve_dot_path(&r.current, &path)); + let vb = store.get(b).and_then(|r| resolve_dot_path(&r.current, &path)); + let cmp = compare_json_values(va, vb); + match dir { + SortDirection::Ascending => cmp, + SortDirection::Descending => cmp.reverse(), + } + }); + } + + self.filtered_cache = Some(result); + } +} + +/// Resolve a dot-path like "_seq" or "info.name" into a JSON value. +fn resolve_dot_path<'a>(value: &'a Value, path: &str) -> Option<&'a Value> { + let mut current = value; + for segment in path.split('.') { + current = current.get(segment)?; + } + if current.is_null() { None } else { Some(current) } +} + +/// Compare two optional JSON values. Numbers compare numerically, strings +/// lexicographically, null/missing sorts last. +fn compare_json_values(a: Option<&Value>, b: Option<&Value>) -> std::cmp::Ordering { + match (a, b) { + (None, None) => std::cmp::Ordering::Equal, + (None, Some(_)) => std::cmp::Ordering::Greater, // missing sorts last + (Some(_), None) => std::cmp::Ordering::Less, + (Some(va), Some(vb)) => { + // Try numeric comparison first + if let (Some(na), Some(nb)) = (as_f64(va), as_f64(vb)) { + return na.partial_cmp(&nb).unwrap_or(std::cmp::Ordering::Equal); + } + // Fall back to string comparison + let sa = value_to_sort_string(va); + let sb = value_to_sort_string(vb); + sa.cmp(&sb) + } + } +} + +fn as_f64(v: &Value) -> Option { + match v { + Value::Number(n) => n.as_f64(), + Value::String(s) => s.parse::().ok(), + _ => None, + } +} + +fn value_to_sort_string(v: &Value) -> String { + match v { + Value::String(s) => s.clone(), + _ => serde_json::to_string(v).unwrap_or_default(), + } +} + +/// Recursively search all values in a JSON tree for a substring match. +fn value_contains_str(value: &Value, needle: &str) -> bool { + match value { + Value::String(s) => s.to_lowercase().contains(needle), + Value::Number(n) => n.to_string().contains(needle), + Value::Bool(b) => { + let s = if *b { "true" } else { "false" }; + s.contains(needle) + } + Value::Object(map) => { + map.iter().any(|(k, v)| { + k.to_lowercase().contains(needle) || value_contains_str(v, needle) + }) + } + Value::Array(arr) => { + arr.iter().any(|v| value_contains_str(v, needle)) + } + Value::Null => false, + } +} diff --git a/cli/src/commands/stream/tui/mod.rs b/cli/src/commands/stream/tui/mod.rs new file mode 100644 index 00000000..b4e4e4bc --- /dev/null +++ b/cli/src/commands/stream/tui/mod.rs @@ -0,0 +1,343 @@ +mod app; +mod ui; + +use anyhow::{Context, Result}; +use crossterm::{ + event::{self, Event, KeyCode, KeyModifiers}, + execute, + terminal::{disable_raw_mode, enable_raw_mode, EnterAlternateScreen, LeaveAlternateScreen}, +}; +use futures_util::{SinkExt, StreamExt}; +use hyperstack_sdk::{parse_frame, try_parse_subscribed_frame, ClientMessage, Frame}; +use ratatui::{backend::CrosstermBackend, Terminal}; +use std::io; +use std::sync::atomic::{AtomicU64, Ordering}; +use std::sync::Arc; +use tokio::sync::mpsc; +use tokio_tungstenite::{connect_async, tungstenite::Message}; + +use self::app::{App, TuiAction, ViewMode}; +use super::StreamArgs; + +pub async fn run_tui(url: String, view: &str, args: &StreamArgs) -> Result<()> { + // Connect WebSocket + let (ws, _) = connect_async(&url) + .await + .with_context(|| format!("Failed to connect to {}", url))?; + + let (mut ws_tx, mut ws_rx) = ws.split(); + + // Subscribe + let sub = crate::commands::stream::build_subscription(view, args); + let msg = serde_json::to_string(&ClientMessage::Subscribe(sub))?; + ws_tx.send(Message::Text(msg)).await?; + + // Channel for frames from WS task + // 10k buffer accommodates large snapshot batches during pause. Overflow + // frames are dropped and counted in the "Dropped: N" header indicator. + let (frame_tx, mut frame_rx) = mpsc::channel::(10_000); + + // Shutdown signal for graceful WebSocket close + let (shutdown_tx, mut shutdown_rx) = tokio::sync::oneshot::channel::<()>(); + + // Dropped frame counter (shared with WS task) + let dropped_frames = Arc::new(AtomicU64::new(0)); + let dropped_frames_ws = Arc::clone(&dropped_frames); + + // Spawn WS reader task + let ws_handle = tokio::spawn(async move { + let ping_period = std::time::Duration::from_secs(30); + let mut ping_interval = tokio::time::interval_at(tokio::time::Instant::now() + ping_period, ping_period); + loop { + tokio::select! { + _ = &mut shutdown_rx => { + let _ = ws_tx.close().await; + break; + } + msg = ws_rx.next() => { + match msg { + Some(Ok(Message::Binary(bytes))) => { + match parse_frame(&bytes) { + Ok(frame) => { + if frame_tx.try_send(frame).is_err() { + dropped_frames_ws.fetch_add(1, Ordering::Relaxed); + } + } + Err(_) => { + // Subscribed frames have a different shape (no `entity` field) + if try_parse_subscribed_frame(&bytes).is_some() { + let subscribed = Frame { + mode: hyperstack_sdk::Mode::List, + entity: String::new(), + op: "subscribed".to_string(), + key: String::new(), + data: serde_json::Value::Null, + append: Vec::new(), + seq: None, + }; + let _ = frame_tx.try_send(subscribed); + } + } + } + } + Some(Ok(Message::Text(text))) => { + if let Ok(frame) = serde_json::from_str::(&text) { + if frame_tx.try_send(frame).is_err() { + dropped_frames_ws.fetch_add(1, Ordering::Relaxed); + } + } + } + Some(Ok(Message::Ping(payload))) => { + let _ = ws_tx.send(Message::Pong(payload)).await; + } + Some(Ok(Message::Close(_))) | Some(Err(_)) | None => break, + _ => {} + } + } + _ = ping_interval.tick() => { + if let Ok(msg) = serde_json::to_string(&ClientMessage::Ping) { + let _ = ws_tx.send(Message::Text(msg)).await; + } + } + } + } + }); + + // Setup terminal with panic hook to restore on crash. + // We store the original hook in a Mutex so we can reclaim it on normal exit. + let original_hook = Arc::new(std::sync::Mutex::new(Some(std::panic::take_hook()))); + let hook_clone = Arc::clone(&original_hook); + std::panic::set_hook(Box::new(move |panic_info| { + let _ = disable_raw_mode(); + let _ = execute!(io::stdout(), LeaveAlternateScreen); + if let Ok(guard) = hook_clone.lock() { + if let Some(ref orig) = *guard { + orig(panic_info); + } + } + })); + + enable_raw_mode()?; + let terminal_setup = || -> Result>> { + let mut stdout = io::stdout(); + execute!(stdout, EnterAlternateScreen)?; + let backend = CrosstermBackend::new(stdout); + Ok(Terminal::new(backend)?) + }; + let mut terminal = match terminal_setup() { + Ok(t) => t, + Err(e) => { + let _ = disable_raw_mode(); + let _ = execute!(io::stdout(), LeaveAlternateScreen); + return Err(e); + } + }; + + let mut app = App::new(view.to_string(), url.clone(), Arc::clone(&dropped_frames)); + + // Main loop: poll terminal events + receive frames + let tick_rate = std::time::Duration::from_millis(50); + let result = run_loop(&mut terminal, &mut app, &mut frame_rx, tick_rate).await; + + // Restore terminal (always attempt all steps) + let _ = disable_raw_mode(); + let _ = execute!( + terminal.backend_mut(), + LeaveAlternateScreen, + ); + let _ = terminal.show_cursor(); + + // Signal graceful shutdown, then wait briefly for the task to close + let _ = shutdown_tx.send(()); + let _ = tokio::time::timeout(std::time::Duration::from_secs(2), ws_handle).await; + + // Restore original panic hook (ours is only needed while TUI is active). + // Note: if run_loop panics, this block is unreachable and the TUI hook stays + // installed. This is acceptable since the process terminates on panic anyway. + let _ = std::panic::take_hook(); // drop our TUI hook + if let Ok(mut guard) = original_hook.lock() { + if let Some(hook) = guard.take() { + std::panic::set_hook(hook); + } + } + + result +} + +async fn run_loop( + terminal: &mut Terminal>, + app: &mut App, + frame_rx: &mut mpsc::Receiver, + tick_rate: std::time::Duration, +) -> Result<()> { + loop { + // Update visible rows from terminal size (minus header/timeline/status/borders) + let term_size = terminal.size()?; + // 3 fixed rows (header + timeline + status) + 2 border rows = 5 + app.visible_rows = term_size.height.saturating_sub(5) as usize; + app.terminal_width = term_size.width; + + terminal.draw(|f| ui::draw(f, app))?; + + // Drain available frames (non-blocking). When paused, leave + // frames in the channel so they're applied on resume. + if !app.paused { + loop { + match frame_rx.try_recv() { + Ok(frame) => app.apply_frame(frame), + Err(mpsc::error::TryRecvError::Disconnected) => { + app.set_disconnected(); + break; + } + Err(mpsc::error::TryRecvError::Empty) => break, + } + } + } + + // Poll for terminal events with timeout + if event::poll(tick_rate)? { + if let Event::Key(key) = event::read()? { + // When filter input is active, capture all keys for typing + let action = if app.filter_input_active { + match key.code { + KeyCode::Esc => TuiAction::BackToList, + KeyCode::Enter => TuiAction::BackToList, + KeyCode::Char('c') if key.modifiers.contains(KeyModifiers::CONTROL) => { + TuiAction::Quit + } + KeyCode::Char('u') if key.modifiers.contains(KeyModifiers::CONTROL) => { + TuiAction::FilterClear + } + KeyCode::Char('w') if key.modifiers.contains(KeyModifiers::CONTROL) => { + TuiAction::FilterDeleteWord + } + // Ignore other control/alt combos — don't insert them as text + KeyCode::Char(_) if key.modifiers.intersects(KeyModifiers::CONTROL | KeyModifiers::ALT) => { + continue + } + KeyCode::Char(c) => TuiAction::FilterChar(c), + KeyCode::Backspace => TuiAction::FilterBackspace, + _ => continue, + } + } else { + // Number prefix accumulation (vim count) + if let KeyCode::Char(c @ '0'..='9') = key.code { + // Don't treat '0' as count start (could be "go to beginning" in future) + if c != '0' || app.pending_count.is_some() { + let digit = c as usize - '0' as usize; + let current = app.pending_count.unwrap_or(0); + app.pending_count = Some((current.saturating_mul(10).saturating_add(digit)).min(99_999)); + app.pending_g = false; + continue; + } + } + + match key.code { + KeyCode::Char('q') => TuiAction::Quit, + KeyCode::Char('c') if key.modifiers.contains(KeyModifiers::CONTROL) => { + TuiAction::Quit + } + // In Detail mode: j/k scroll the JSON pane; arrows still navigate entities + KeyCode::Char('j') => { + if app.view_mode == ViewMode::Detail { + TuiAction::ScrollDetailDown + } else { + TuiAction::NextEntity + } + } + KeyCode::Char('k') => { + if app.view_mode == ViewMode::Detail { + TuiAction::ScrollDetailUp + } else { + TuiAction::PrevEntity + } + } + KeyCode::Down => TuiAction::NextEntity, + KeyCode::Up => TuiAction::PrevEntity, + KeyCode::Char('G') => { + if app.view_mode == ViewMode::Detail { + TuiAction::ScrollDetailBottom + } else { + TuiAction::GotoBottom + } + } + KeyCode::Char('g') => { + if app.pending_g { + // gg = go to top (of list or detail pane) + if app.view_mode == ViewMode::Detail { + TuiAction::ScrollDetailTop + } else { + TuiAction::GotoTop + } + } else { + app.pending_g = true; + app.pending_count = None; + continue; + } + } + KeyCode::Char('d') if key.modifiers.contains(KeyModifiers::CONTROL) => { + if app.view_mode == ViewMode::Detail { + TuiAction::ScrollDetailHalfDown + } else { + TuiAction::HalfPageDown + } + } + KeyCode::Char('u') if key.modifiers.contains(KeyModifiers::CONTROL) => { + if app.view_mode == ViewMode::Detail { + TuiAction::ScrollDetailHalfUp + } else { + TuiAction::HalfPageUp + } + } + KeyCode::Char('e') if key.modifiers.contains(KeyModifiers::CONTROL) => { + TuiAction::ScrollDetailDown + } + KeyCode::Char('y') if key.modifiers.contains(KeyModifiers::CONTROL) => { + TuiAction::ScrollDetailUp + } + KeyCode::PageDown => TuiAction::ScrollDetailDown, + KeyCode::PageUp => TuiAction::ScrollDetailUp, + KeyCode::Char('n') => TuiAction::NextMatch, + KeyCode::Enter => TuiAction::FocusDetail, + KeyCode::Esc => { + app.pending_count = None; + app.pending_g = false; + TuiAction::BackToList + } + KeyCode::Right | KeyCode::Char('l') => TuiAction::HistoryForward, + KeyCode::Left | KeyCode::Char('h') => { + if app.pending_g { + app.pending_g = false; + continue; + } + TuiAction::HistoryBack + } + KeyCode::Home => TuiAction::HistoryOldest, + KeyCode::End => TuiAction::HistoryNewest, + KeyCode::Char('d') => TuiAction::ToggleDiff, + KeyCode::Char('r') => TuiAction::ToggleRaw, + KeyCode::Char('p') => TuiAction::TogglePause, + KeyCode::Char('/') => TuiAction::StartFilter, + KeyCode::Char('s') => TuiAction::CycleSortMode, + KeyCode::Char('o') => TuiAction::ToggleSortDirection, + KeyCode::Char('S') => TuiAction::SaveSnapshot, + _ => { + app.pending_count = None; + app.pending_g = false; + continue; + } + } + }; + + if let TuiAction::Quit = action { + break; + } + app.handle_action(action); + } + // Resize and other events are handled implicitly: + // layout is recalculated from terminal.size() at the top of each loop iteration + } + } + + Ok(()) +} diff --git a/cli/src/commands/stream/tui/ui.rs b/cli/src/commands/stream/tui/ui.rs new file mode 100644 index 00000000..2fdf0417 --- /dev/null +++ b/cli/src/commands/stream/tui/ui.rs @@ -0,0 +1,333 @@ +use ratatui::{ + layout::{Constraint, Direction, Layout, Rect}, + style::{Color, Modifier, Style}, + text::{Line, Span}, + widgets::{Block, Borders, List, ListItem, Paragraph, Wrap}, + Frame, +}; + +use super::app::{App, SortDirection, SortMode, ViewMode}; + +pub fn draw(f: &mut Frame, app: &mut App) { + app.ensure_filtered_cache(); + let chunks = Layout::default() + .direction(Direction::Vertical) + .constraints([ + Constraint::Length(1), // Header + Constraint::Min(0), // Main content + Constraint::Length(1), // Timeline + Constraint::Length(1), // Status bar + ]) + .split(f.area()); + + draw_header(f, app, chunks[0]); + + match app.view_mode { + ViewMode::List => draw_split_view(f, app, chunks[1]), + ViewMode::Detail => draw_detail_view(f, app, chunks[1]), + } + + draw_timeline(f, app, chunks[2]); + draw_status_bar(f, app, chunks[3]); +} + +fn draw_header(f: &mut Frame, app: &App, area: Rect) { + let status = if app.disconnected { + Span::styled(" DISCONNECTED ", Style::default().fg(Color::Red).add_modifier(Modifier::BOLD)) + } else if app.paused { + Span::styled(" PAUSED ", Style::default().fg(Color::Red).add_modifier(Modifier::BOLD)) + } else { + Span::styled(" LIVE ", Style::default().fg(Color::Green).add_modifier(Modifier::BOLD)) + }; + + let dropped = app.dropped_frames.load(std::sync::atomic::Ordering::Relaxed); + let mut spans = vec![ + Span::styled("hs stream ", Style::default().fg(Color::Cyan).add_modifier(Modifier::BOLD)), + Span::styled(&app.view, Style::default().fg(Color::White).add_modifier(Modifier::BOLD)), + Span::raw(" "), + status, + Span::raw(" "), + Span::styled( + format!("Updates: {}", app.update_count), + Style::default().fg(Color::DarkGray), + ), + ]; + if dropped > 0 { + spans.push(Span::raw(" ")); + spans.push(Span::styled( + format!("Dropped: {}", dropped), + Style::default().fg(Color::Red), + )); + } + let header = Line::from(spans); + + f.render_widget(Paragraph::new(header), area); +} + +fn draw_split_view(f: &mut Frame, app: &mut App, area: Rect) { + let chunks = Layout::default() + .direction(Direction::Horizontal) + .constraints([Constraint::Percentage(30), Constraint::Percentage(70)]) + .split(area); + + draw_entity_list(f, app, chunks[0]); + draw_entity_detail(f, app, chunks[1]); +} + +fn draw_entity_list(f: &mut Frame, app: &mut App, area: Rect) { + let keys = app.filtered_keys(); + let items: Vec = keys + .iter() + .enumerate() + .map(|(i, key)| { + let style = if i == app.selected_index { + Style::default() + .fg(Color::Cyan) + .add_modifier(Modifier::BOLD) + } else { + Style::default() + }; + let prefix = if i == app.selected_index { "> " } else { " " }; + ListItem::new(format!("{}{}", prefix, truncate_key(key, area.width as usize - 3))) + .style(style) + }) + .collect(); + + let title = if app.filter_input_active { + format!("Entities [/{}]", app.filter_text) + } else if !app.filter_text.is_empty() { + format!("Entities ({}/{}) [/{}]", keys.len(), app.entity_keys.len(), app.filter_text) + } else { + format!("Entities ({})", keys.len()) + }; + + let list = List::new(items) + .block( + Block::default() + .title(title) + .borders(Borders::ALL) + .border_style(Style::default().fg(Color::DarkGray)), + ) + .highlight_style(Style::default().fg(Color::Cyan).add_modifier(Modifier::BOLD)); + + f.render_stateful_widget(list, area, &mut app.list_state); +} + +fn draw_entity_detail(f: &mut Frame, app: &App, area: Rect) { + let content = app.selected_entity_data().unwrap_or_else(|| { + if app.entity_keys.is_empty() { + "Waiting for data...".to_string() + } else { + "Select an entity".to_string() + } + }); + + let title = match app.selected_key() { + Some(key) => { + let mode = if app.show_diff { + " [diff]" + } else if app.history_position > 0 { + " [history]" + } else { + "" + }; + format!("{}{}", truncate_key(&key, area.width as usize - 10), mode) + } + None => "Detail".to_string(), + }; + + // Apply simple JSON syntax coloring + let lines: Vec = content + .lines() + .skip(app.scroll_offset as usize) + .map(|line| colorize_json_line(line)) + .collect(); + + // Count total lines for scroll indicator + let total_lines = content.lines().count(); + let visible_height = area.height.saturating_sub(2) as usize; // minus borders + let current_line = app.scroll_offset as usize + 1; + let scroll_info = if total_lines > visible_height { + format!(" [line {}/{}]", current_line, total_lines) + } else { + String::new() + }; + + let block_title = format!("{}{}", title, scroll_info); + let border_color = if app.view_mode == ViewMode::Detail { + Color::Yellow // highlight border in detail mode + } else { + Color::Cyan + }; + + let detail = Paragraph::new(lines) + .block( + Block::default() + .title(block_title) + .borders(Borders::ALL) + .border_style(Style::default().fg(border_color)), + ) + .wrap(Wrap { trim: false }); + + f.render_widget(detail, area); +} + +fn draw_detail_view(f: &mut Frame, app: &App, area: Rect) { + draw_entity_detail(f, app, area); +} + +fn draw_timeline(f: &mut Frame, app: &App, area: Rect) { + let history_len = app.selected_history_len(); + let pos = app.history_position; + let list_len = app.filtered_keys().len(); + let list_pos = if list_len > 0 { app.selected_index + 1 } else { 0 }; + + let mut spans = vec![ + Span::styled( + format!(" Row {}/{}", list_pos, list_len), + Style::default().fg(Color::DarkGray), + ), + Span::styled(" │ ", Style::default().fg(Color::DarkGray)), + ]; + + if history_len == 0 { + spans.push(Span::styled("Entity history: no data", Style::default().fg(Color::DarkGray))); + } else { + spans.push(Span::styled("[|<] ", Style::default().fg(if pos < history_len - 1 { Color::White } else { Color::DarkGray }))); + spans.push(Span::styled("[<] ", Style::default().fg(if pos < history_len - 1 { Color::White } else { Color::DarkGray }))); + spans.push(Span::styled( + format!("version {}/{} ", history_len - pos, history_len), + Style::default().fg(Color::Yellow).add_modifier(Modifier::BOLD), + )); + spans.push(Span::styled("[>] ", Style::default().fg(if pos > 0 { Color::White } else { Color::DarkGray }))); + spans.push(Span::styled("[>|]", Style::default().fg(if pos > 0 { Color::White } else { Color::DarkGray }))); + spans.push(Span::raw(" ")); + spans.push(if app.show_diff { + Span::styled("[d]iff ON", Style::default().fg(Color::Green)) + } else { + Span::styled("[d]iff", Style::default().fg(Color::DarkGray)) + }); + } + + f.render_widget(Paragraph::new(Line::from(spans)), area); +} + +fn draw_status_bar(f: &mut Frame, app: &App, area: Rect) { + let status = Line::from(vec![ + Span::styled( + format!(" {} ", app.status()), + Style::default().fg(Color::DarkGray), + ), + Span::raw(" | "), + Span::styled("q", Style::default().fg(Color::Yellow)), + Span::styled("uit ", Style::default().fg(Color::DarkGray)), + Span::styled("p", Style::default().fg(Color::Yellow)), + Span::styled("ause ", Style::default().fg(Color::DarkGray)), + Span::styled("d", Style::default().fg(Color::Yellow)), + Span::styled("iff ", Style::default().fg(Color::DarkGray)), + Span::styled("r", Style::default().fg(Color::Yellow)), + Span::styled("aw ", Style::default().fg(Color::DarkGray)), + Span::styled("/", Style::default().fg(Color::Yellow)), + Span::styled("filter ", Style::default().fg(Color::DarkGray)), + Span::styled("s", Style::default().fg(Color::Yellow)), + Span::styled("ort ", Style::default().fg(Color::DarkGray)), + Span::styled("o", Style::default().fg(Color::Yellow)), + Span::styled("rder ", Style::default().fg(Color::DarkGray)), + Span::styled("S", Style::default().fg(Color::Yellow)), + Span::styled("ave ", Style::default().fg(Color::DarkGray)), + Span::styled("h/l", Style::default().fg(Color::Yellow)), + Span::styled(" history ", Style::default().fg(Color::DarkGray)), + match &app.sort_mode { + SortMode::Insertion => Span::raw(""), + SortMode::Field(f) => Span::styled( + format!(" [{}{}]", + f, + match app.sort_direction { + SortDirection::Ascending => "↑", + SortDirection::Descending => "↓", + } + ), + Style::default().fg(Color::Cyan), + ), + }, + ]); + + f.render_widget(Paragraph::new(status), area); +} + +fn truncate_key(key: &str, max_len: usize) -> String { + if key.chars().count() <= max_len { + key.to_string() + } else if max_len > 3 { + let end = key + .char_indices() + .nth(max_len - 3) + .map(|(i, _)| i) + .unwrap_or(key.len()); + format!("{}...", &key[..end]) + } else { + let end = key + .char_indices() + .nth(max_len) + .map(|(i, _)| i) + .unwrap_or(key.len()); + key[..end].to_string() + } +} + +/// Simple heuristic JSON syntax coloring for serde_json::to_string_pretty output. +/// Assumes keys are always properly quoted/escaped (guaranteed by serde_json). +fn colorize_json_line(line: &str) -> Line<'_> { + let trimmed = line.trim(); + + // Key-value lines + if trimmed.starts_with('"') { + // Use "\": " (with trailing space) to avoid matching colons inside key names. + // serde_json pretty-print always uses ": " as the key-value separator. + if let Some(colon_pos) = trimmed.find("\": ") { + let key_end = colon_pos + 1; + let indent = &line[..line.len() - trimmed.len()]; + let key = &trimmed[..key_end]; + let rest = &trimmed[key_end..]; + return Line::from(vec![ + Span::raw(indent), + Span::styled(key, Style::default().fg(Color::Cyan)), + colorize_value(rest), + ]); + } + } + + // String values (in arrays) + if (trimmed.starts_with('"') && trimmed.ends_with('"')) + || (trimmed.starts_with('"') && trimmed.ends_with("\",")) + { + return Line::from(Span::styled(line, Style::default().fg(Color::Green))); + } + + // Braces and brackets + if trimmed == "{" || trimmed == "}" || trimmed == "{}" || trimmed == "}," + || trimmed == "[" || trimmed == "]" || trimmed == "[]" || trimmed == "]," + { + return Line::from(Span::styled(line, Style::default().fg(Color::DarkGray))); + } + + Line::from(Span::raw(line)) +} + +fn colorize_value(rest: &str) -> Span<'_> { + let trimmed = rest.trim().trim_end_matches(','); + if trimmed.starts_with('"') { + Span::styled(rest, Style::default().fg(Color::Green)) + } else if trimmed == "true" || trimmed == "false" { + Span::styled(rest, Style::default().fg(Color::Yellow)) + } else if trimmed == "null" { + Span::styled(rest, Style::default().fg(Color::DarkGray)) + } else if trimmed.parse::().is_ok() { + Span::styled(rest, Style::default().fg(Color::Magenta)) + } else if trimmed.starts_with('[') || trimmed.starts_with("[]") { + // Inline compact array — render in default color (contains mixed types) + Span::styled(rest, Style::default().fg(Color::White)) + } else { + Span::raw(rest) + } +} diff --git a/cli/src/main.rs b/cli/src/main.rs index 28594bf4..b8e6ceb0 100644 --- a/cli/src/main.rs +++ b/cli/src/main.rs @@ -146,6 +146,9 @@ enum Commands { /// Inspect and analyze Anchor/Shank IDL files Idl(commands::idl::IdlArgs), + + /// Stream live entity data from a deployed stack via WebSocket + Stream(commands::stream::StreamArgs), } #[derive(Subcommand)] @@ -418,6 +421,7 @@ fn command_name(cmd: &Commands) -> &'static str { Commands::Build(_) => "build", Commands::Telemetry(_) => "telemetry", Commands::Idl(_) => "idl", + Commands::Stream(_) => "stream", } } @@ -540,6 +544,7 @@ fn run(cli: Cli) -> anyhow::Result<()> { } => commands::build::status(build_id, watch, json || cli.json), }, Commands::Idl(args) => commands::idl::run(args), + Commands::Stream(args) => commands::stream::run(args, &cli.config), Commands::Telemetry(telemetry_cmd) => match telemetry_cmd { TelemetryCommands::Status => commands::telemetry::status(), TelemetryCommands::Enable => commands::telemetry::enable(), diff --git a/rust/hyperstack-sdk/src/lib.rs b/rust/hyperstack-sdk/src/lib.rs index a5cafdca..b6ebfbce 100644 --- a/rust/hyperstack-sdk/src/lib.rs +++ b/rust/hyperstack-sdk/src/lib.rs @@ -33,13 +33,16 @@ pub use client::{HyperStack, HyperStackBuilder}; pub use connection::ConnectionState; pub use entity::Stack; pub use error::HyperStackError; -pub use frame::{Frame, Mode, Operation}; -pub use store::{SharedStore, StoreUpdate}; +pub use frame::{ + parse_frame, parse_snapshot_entities, try_parse_subscribed_frame, Frame, Mode, Operation, + SnapshotEntity, +}; +pub use store::{deep_merge_with_append, SharedStore, StoreUpdate}; pub use stream::{ EntityStream, FilterMapStream, FilteredStream, KeyFilter, MapStream, RichEntityStream, RichUpdate, Update, UseStream, }; -pub use subscription::Subscription; -pub use view::{ - RichWatchBuilder, StateView, UseBuilder, ViewBuilder, ViewHandle, Views, WatchBuilder, -}; + +pub use subscription::{ClientMessage, Subscription}; +pub use view::{RichWatchBuilder, StateView, UseBuilder, ViewBuilder, ViewHandle, Views, WatchBuilder}; + diff --git a/rust/hyperstack-sdk/src/store.rs b/rust/hyperstack-sdk/src/store.rs index a2b4fcb0..45a4f1b0 100644 --- a/rust/hyperstack-sdk/src/store.rs +++ b/rust/hyperstack-sdk/src/store.rs @@ -114,7 +114,7 @@ struct ViewData { sorted_keys: BTreeMap, } -fn deep_merge_with_append( +pub fn deep_merge_with_append( target: &mut Value, patch: &Value, append_paths: &[String],