From eb9783249b57520e0fcbfb484685a17c31c6834a Mon Sep 17 00:00:00 2001 From: Camillarhi Date: Tue, 6 Jan 2026 00:28:21 +0100 Subject: [PATCH] receive payjoin payments --- Cargo.toml | 2 + src/builder.rs | 64 ++- src/chain/bitcoind.rs | 91 ++++ src/chain/electrum.rs | 136 ++++- src/chain/esplora.rs | 7 + src/chain/mod.rs | 34 +- src/config.rs | 25 +- src/error.rs | 3 + src/io/mod.rs | 4 + src/io/utils.rs | 78 +++ src/lib.rs | 75 ++- src/payment/mod.rs | 2 + src/payment/payjoin/manager.rs | 658 +++++++++++++++++++++++++ src/payment/payjoin/mod.rs | 49 ++ src/payment/payjoin/payjoin_session.rs | 245 +++++++++ src/payment/payjoin/persist.rs | 231 +++++++++ src/payment/store.rs | 25 +- src/types.rs | 5 + src/wallet/mod.rs | 93 ++++ 19 files changed, 1811 insertions(+), 16 deletions(-) create mode 100644 src/payment/payjoin/manager.rs create mode 100644 src/payment/payjoin/mod.rs create mode 100644 src/payment/payjoin/payjoin_session.rs create mode 100644 src/payment/payjoin/persist.rs diff --git a/Cargo.toml b/Cargo.toml index 5c82d7d65..eb405bfca 100755 --- a/Cargo.toml +++ b/Cargo.toml @@ -80,6 +80,8 @@ prost = { version = "0.11.6", default-features = false} #bitcoin-payment-instructions = { version = "0.6" } bitcoin-payment-instructions = { git = "https://github.com/tnull/bitcoin-payment-instructions", rev = "ea50a9d2a8da524b69a2af43233706666cf2ffa5" } +payjoin = { git = "https://github.com/payjoin/rust-payjoin.git", package = "payjoin", default-features = false, features = ["v2", "io"] } + [target.'cfg(windows)'.dependencies] winapi = { version = "0.3", features = ["winbase"] } diff --git a/src/builder.rs b/src/builder.rs index 7a285876f..2ddf45216 100644 --- a/src/builder.rs +++ b/src/builder.rs @@ -45,7 +45,7 @@ use vss_client::headers::VssHeaderProvider; use crate::chain::ChainSource; use crate::config::{ default_user_config, may_announce_channel, AnnounceError, AsyncPaymentsRole, - BitcoindRestClientConfig, Config, ElectrumSyncConfig, EsploraSyncConfig, + BitcoindRestClientConfig, Config, ElectrumSyncConfig, EsploraSyncConfig, PayjoinConfig, DEFAULT_ESPLORA_SERVER_URL, DEFAULT_LOG_FILENAME, DEFAULT_LOG_LEVEL, }; use crate::connection::ConnectionManager; @@ -56,12 +56,13 @@ use crate::gossip::GossipSource; use crate::io::sqlite_store::SqliteStore; use crate::io::utils::{ read_event_queue, read_external_pathfinding_scores_from_cache, read_network_graph, - read_node_metrics, read_output_sweeper, read_payments, read_peer_info, read_pending_payments, - read_scorer, write_node_metrics, + read_node_metrics, read_output_sweeper, read_payjoin_sessions, read_payments, read_peer_info, + read_pending_payments, read_scorer, write_node_metrics, }; use crate::io::vss_store::VssStoreBuilder; use crate::io::{ - self, PAYMENT_INFO_PERSISTENCE_PRIMARY_NAMESPACE, PAYMENT_INFO_PERSISTENCE_SECONDARY_NAMESPACE, + self, PAYJOIN_SESSION_STORE_PRIMARY_NAMESPACE, PAYJOIN_SESSION_STORE_SECONDARY_NAMESPACE, + PAYMENT_INFO_PERSISTENCE_PRIMARY_NAMESPACE, PAYMENT_INFO_PERSISTENCE_SECONDARY_NAMESPACE, PENDING_PAYMENT_INFO_PERSISTENCE_PRIMARY_NAMESPACE, PENDING_PAYMENT_INFO_PERSISTENCE_SECONDARY_NAMESPACE, }; @@ -71,13 +72,14 @@ use crate::liquidity::{ use crate::logger::{log_error, LdkLogger, LogLevel, LogWriter, Logger}; use crate::message_handler::NodeCustomMessageHandler; use crate::payment::asynchronous::om_mailbox::OnionMessageMailbox; +use crate::payment::payjoin::manager::PayjoinManager; use crate::peer_store::PeerStore; use crate::runtime::{Runtime, RuntimeSpawner}; use crate::tx_broadcaster::TransactionBroadcaster; use crate::types::{ AsyncPersister, ChainMonitor, ChannelManager, DynStore, DynStoreWrapper, GossipSync, Graph, - KeysManager, MessageRouter, OnionMessenger, PaymentStore, PeerManager, PendingPaymentStore, - Persister, SyncAndAsyncKVStore, + KeysManager, MessageRouter, OnionMessenger, PayjoinSessionStore, PaymentStore, PeerManager, + PendingPaymentStore, Persister, SyncAndAsyncKVStore, }; use crate::wallet::persist::KVStoreWalletPersister; use crate::wallet::Wallet; @@ -547,6 +549,15 @@ impl NodeBuilder { Ok(self) } + /// Configures the [`Node`] instance to enable payjoin payments. + /// + /// The `payjoin_config` specifies the PayJoin directory and OHTTP relay URLs required + /// for payjoin V2 protocol. + pub fn set_payjoin_config(&mut self, payjoin_config: PayjoinConfig) -> &mut Self { + self.config.payjoin_config = Some(payjoin_config); + self + } + /// Configures the [`Node`] to resync chain data from genesis on first startup, recovering any /// historical wallet funds. /// @@ -933,6 +944,14 @@ impl ArcedNodeBuilder { self.inner.write().unwrap().set_async_payments_role(role).map(|_| ()) } + /// Configures the [`Node`] instance to enable payjoin payments. + /// + /// The `payjoin_config` specifies the PayJoin directory and OHTTP relay URLs required + /// for payjoin V2 protocol. + pub fn set_payjoin_config(&self, payjoin_config: PayjoinConfig) { + self.inner.write().unwrap().set_payjoin_config(payjoin_config); + } + /// Configures the [`Node`] to resync chain data from genesis on first startup, recovering any /// historical wallet funds. /// @@ -1083,12 +1102,13 @@ fn build_with_store_internal( let kv_store_ref = Arc::clone(&kv_store); let logger_ref = Arc::clone(&logger); - let (payment_store_res, node_metris_res, pending_payment_store_res) = + let (payment_store_res, node_metris_res, pending_payment_store_res, payjoin_session_store_res) = runtime.block_on(async move { tokio::join!( read_payments(&*kv_store_ref, Arc::clone(&logger_ref)), read_node_metrics(&*kv_store_ref, Arc::clone(&logger_ref)), - read_pending_payments(&*kv_store_ref, Arc::clone(&logger_ref)) + read_pending_payments(&*kv_store_ref, Arc::clone(&logger_ref)), + read_payjoin_sessions(&*kv_store_ref, Arc::clone(&logger_ref)) ) }); @@ -1771,6 +1791,33 @@ fn build_with_store_internal( let pathfinding_scores_sync_url = pathfinding_scores_sync_config.map(|c| c.url.clone()); + let payjoin_session_store = match payjoin_session_store_res { + Ok(payjoin_sessions) => Arc::new(PayjoinSessionStore::new( + payjoin_sessions, + PAYJOIN_SESSION_STORE_PRIMARY_NAMESPACE.to_string(), + PAYJOIN_SESSION_STORE_SECONDARY_NAMESPACE.to_string(), + Arc::clone(&kv_store), + Arc::clone(&logger), + )), + Err(e) => { + log_error!(logger, "Failed to read payjoin session data from store: {}", e); + return Err(BuildError::ReadFailed); + }, + }; + + let payjoin_manager = Arc::new(PayjoinManager::new( + Arc::clone(&payjoin_session_store), + Arc::clone(&kv_store), + Arc::clone(&logger), + Arc::clone(&config), + Arc::clone(&wallet), + Arc::clone(&fee_estimator), + Arc::clone(&chain_source), + stop_sender.subscribe(), + Arc::clone(&payment_store), + Arc::clone(&pending_payment_store), + )); + #[cfg(cycle_tests)] let mut _leak_checker = crate::LeakChecker(Vec::new()); #[cfg(cycle_tests)] @@ -1817,6 +1864,7 @@ fn build_with_store_internal( hrn_resolver, #[cfg(cycle_tests)] _leak_checker, + payjoin_manager, }) } diff --git a/src/chain/bitcoind.rs b/src/chain/bitcoind.rs index 8a7167022..1c2586693 100644 --- a/src/chain/bitcoind.rs +++ b/src/chain/bitcoind.rs @@ -619,6 +619,57 @@ impl BitcoindChainSource { } } } + + pub(crate) async fn can_broadcast_transaction(&self, tx: &Transaction) -> Result { + let timeout_fut = tokio::time::timeout( + Duration::from_secs(DEFAULT_TX_BROADCAST_TIMEOUT_SECS), + self.api_client.test_mempool_accept(tx), + ); + + match timeout_fut.await { + Ok(res) => res.map_err(|e| { + log_error!( + self.logger, + "Failed to test mempool accept for transaction {}: {}", + tx.compute_txid(), + e + ); + Error::TxBroadcastFailed + }), + Err(e) => { + log_error!( + self.logger, + "Failed to test mempool accept for transaction {} due to timeout: {}", + tx.compute_txid(), + e + ); + log_trace!( + self.logger, + "Failed test mempool accept transaction bytes: {}", + log_bytes!(tx.encode()) + ); + Err(Error::TxBroadcastFailed) + }, + } + } + + pub(crate) async fn get_transaction(&self, txid: &Txid) -> Result, Error> { + let timeout_fut = tokio::time::timeout( + Duration::from_secs(DEFAULT_TX_BROADCAST_TIMEOUT_SECS), + self.api_client.get_raw_transaction(txid), + ); + + match timeout_fut.await { + Ok(res) => res.map_err(|e| { + log_error!(self.logger, "Failed to get transaction {}: {}", txid, e); + Error::TxSyncFailed + }), + Err(e) => { + log_error!(self.logger, "Failed to get transaction {} due to timeout: {}", txid, e); + Err(Error::TxSyncTimeout) + }, + } + } } #[derive(Clone)] @@ -1229,6 +1280,46 @@ impl BitcoindClient { .collect(); Ok(evicted_txids) } + + /// Tests whether the provided transaction would be accepted by the mempool. + pub(crate) async fn test_mempool_accept(&self, tx: &Transaction) -> std::io::Result { + match self { + BitcoindClient::Rpc { rpc_client, .. } => { + Self::test_mempool_accept_inner(Arc::clone(rpc_client), tx).await + }, + BitcoindClient::Rest { rpc_client, .. } => { + // We rely on the internal RPC client to make this call, as this + // operation is not supported by Bitcoin Core's REST interface. + Self::test_mempool_accept_inner(Arc::clone(rpc_client), tx).await + }, + } + } + + async fn test_mempool_accept_inner( + rpc_client: Arc, tx: &Transaction, + ) -> std::io::Result { + let tx_serialized = bitcoin::consensus::encode::serialize_hex(tx); + let tx_array = serde_json::json!([tx_serialized]); + + let resp = + rpc_client.call_method::("testmempoolaccept", &[tx_array]).await?; + + if let Some(array) = resp.as_array() { + if let Some(first_result) = array.first() { + Ok(first_result.get("allowed").and_then(|v| v.as_bool()).unwrap_or(false)) + } else { + Err(std::io::Error::new( + std::io::ErrorKind::Other, + "Empty array response from testmempoolaccept", + )) + } + } else { + Err(std::io::Error::new( + std::io::ErrorKind::InvalidData, + "testmempoolaccept did not return an array", + )) + } + } } impl BlockSource for BitcoindClient { diff --git a/src/chain/electrum.rs b/src/chain/electrum.rs index 7b08c3845..b6532ca2e 100644 --- a/src/chain/electrum.rs +++ b/src/chain/electrum.rs @@ -15,7 +15,7 @@ use bdk_chain::bdk_core::spk_client::{ }; use bdk_electrum::BdkElectrumClient; use bdk_wallet::{KeychainKind as BdkKeyChainKind, Update as BdkUpdate}; -use bitcoin::{FeeRate, Network, Script, ScriptBuf, Transaction, Txid}; +use bitcoin::{FeeRate, Network, OutPoint, Script, ScriptBuf, Transaction, Txid}; use electrum_client::{ Batch, Client as ElectrumClient, ConfigBuilder as ElectrumConfigBuilder, ElectrumApi, }; @@ -288,6 +288,21 @@ impl ElectrumChainSource { electrum_client.broadcast(tx).await; } } + + pub(crate) async fn get_transaction(&self, txid: &Txid) -> Result, Error> { + let electrum_client: Arc = + if let Some(client) = self.electrum_runtime_status.read().unwrap().client().as_ref() { + Arc::clone(client) + } else { + debug_assert!( + false, + "We should have started the chain source before getting transactions" + ); + return Err(Error::TxSyncFailed); + }; + + electrum_client.get_transaction(txid).await + } } impl Filter for ElectrumChainSource { @@ -652,6 +667,125 @@ impl ElectrumRuntimeClient { Ok(new_fee_rate_cache) } + + async fn get_transaction(&self, txid: &Txid) -> Result, Error> { + let electrum_client = Arc::clone(&self.electrum_client); + let txid_copy = *txid; + + let spawn_fut = + self.runtime.spawn_blocking(move || electrum_client.transaction_get(&txid_copy)); + let timeout_fut = tokio::time::timeout( + Duration::from_secs( + self.sync_config.timeouts_config.lightning_wallet_sync_timeout_secs, + ), + spawn_fut, + ); + + match timeout_fut.await { + Ok(res) => match res { + Ok(inner_res) => match inner_res { + Ok(tx) => Ok(Some(tx)), + Err(e) => { + // Check if it's a "not found" error + let error_str = e.to_string(); + if error_str.contains("No such mempool or blockchain transaction") + || error_str.contains("not found") + { + Ok(None) + } else { + log_error!(self.logger, "Failed to get transaction {}: {}", txid, e); + Err(Error::TxSyncFailed) + } + }, + }, + Err(e) => { + log_error!(self.logger, "Failed to get transaction {}: {}", txid, e); + Err(Error::TxSyncFailed) + }, + }, + Err(e) => { + log_error!(self.logger, "Failed to get transaction {} due to timeout: {}", txid, e); + Err(Error::TxSyncTimeout) + }, + } + } + + async fn is_outpoint_spent(&self, outpoint: &OutPoint) -> Result { + // First get the transaction to find the scriptPubKey of the output + let tx = match self.get_transaction(&outpoint.txid).await? { + Some(tx) => tx, + None => { + // Transaction doesn't exist, so outpoint can't be spent + // (or never existed) + return Ok(false); + }, + }; + + // Check if the output index is valid + let vout = outpoint.vout as usize; + if vout >= tx.output.len() { + // Invalid output index + return Ok(false); + } + + let script_pubkey = &tx.output[vout].script_pubkey; + let electrum_client = Arc::clone(&self.electrum_client); + let script_pubkey_clone = script_pubkey.clone(); + let outpoint_txid = outpoint.txid; + let outpoint_vout = outpoint.vout; + + let spawn_fut = self + .runtime + .spawn_blocking(move || electrum_client.script_list_unspent(&script_pubkey_clone)); + let timeout_fut = tokio::time::timeout( + Duration::from_secs( + self.sync_config.timeouts_config.lightning_wallet_sync_timeout_secs, + ), + spawn_fut, + ); + + match timeout_fut.await { + Ok(res) => match res { + Ok(inner_res) => match inner_res { + Ok(unspent_list) => { + // Check if our outpoint is in the unspent list + let is_unspent = unspent_list.iter().any(|u| { + u.tx_hash == outpoint_txid && u.tx_pos == outpoint_vout as usize + }); + // Return true if spent (not in unspent list) + Ok(!is_unspent) + }, + Err(e) => { + log_error!( + self.logger, + "Failed to check if outpoint {} is spent: {}", + outpoint, + e + ); + Err(Error::TxSyncFailed) + }, + }, + Err(e) => { + log_error!( + self.logger, + "Failed to check if outpoint {} is spent: {}", + outpoint, + e + ); + Err(Error::TxSyncFailed) + }, + }, + Err(e) => { + log_error!( + self.logger, + "Failed to check if outpoint {} is spent due to timeout: {}", + outpoint, + e + ); + Err(Error::TxSyncTimeout) + }, + } + } } impl Filter for ElectrumRuntimeClient { diff --git a/src/chain/esplora.rs b/src/chain/esplora.rs index 245db72f6..4eb61e738 100644 --- a/src/chain/esplora.rs +++ b/src/chain/esplora.rs @@ -422,6 +422,13 @@ impl EsploraChainSource { } } } + + pub(crate) async fn get_transaction(&self, txid: &Txid) -> Result, Error> { + self.esplora_client.get_tx(txid).await.map_err(|e| { + log_error!(self.logger, "Failed to get transaction {}: {}", txid, e); + Error::TxSyncFailed + }) + } } impl Filter for EsploraChainSource { diff --git a/src/chain/mod.rs b/src/chain/mod.rs index 49c011a78..d61621bd6 100644 --- a/src/chain/mod.rs +++ b/src/chain/mod.rs @@ -13,7 +13,7 @@ use std::collections::HashMap; use std::sync::{Arc, Mutex, RwLock}; use std::time::Duration; -use bitcoin::{Script, Txid}; +use bitcoin::{Script, Transaction, Txid}; use lightning::chain::{BestBlock, Filter}; use crate::chain::bitcoind::{BitcoindChainSource, UtxoSourceClient}; @@ -468,6 +468,38 @@ impl ChainSource { } } } + + pub(crate) fn can_broadcast_transaction(&self, tx: &Transaction) -> Result { + tokio::task::block_in_place(|| { + tokio::runtime::Handle::current().block_on(async { + match &self.kind { + ChainSourceKind::Bitcoind(bitcoind_chain_source) => { + bitcoind_chain_source.can_broadcast_transaction(tx).await + }, + ChainSourceKind::Esplora{..} => { + // Esplora doesn't support testmempoolaccept equivalent. + unreachable!("Mempool accept testing is not supported with Esplora backend. Use BitcoindRpc for this functionality.") + }, + ChainSourceKind::Electrum{..} => { + // Electrum doesn't support testmempoolaccept equivalent. + unreachable!("Mempool accept testing is not supported with Electrum backend. Use BitcoindRpc for this functionality.") + }, + } + }) + }) + } + + pub(crate) fn get_transaction(&self, txid: &Txid) -> Result, Error> { + tokio::task::block_in_place(|| { + tokio::runtime::Handle::current().block_on(async { + match &self.kind { + ChainSourceKind::Bitcoind(bitcoind) => bitcoind.get_transaction(txid).await, + ChainSourceKind::Esplora(esplora) => esplora.get_transaction(txid).await, + ChainSourceKind::Electrum(electrum) => electrum.get_transaction(txid).await, + } + }) + }) + } } impl Filter for ChainSource { diff --git a/src/config.rs b/src/config.rs index 1dfa66176..9164332dd 100644 --- a/src/config.rs +++ b/src/config.rs @@ -12,6 +12,7 @@ use std::time::Duration; use bitcoin::secp256k1::PublicKey; use bitcoin::Network; +use bitreq::URL; use lightning::ln::msgs::SocketAddress; use lightning::routing::gossip::NodeAlias; use lightning::routing::router::RouteParametersConfig; @@ -107,6 +108,15 @@ pub(crate) const EXTERNAL_PATHFINDING_SCORES_SYNC_TIMEOUT_SECS: u64 = 5; // The timeout after which we abort a parsing/looking up an HRN resolution. pub(crate) const HRN_RESOLUTION_TIMEOUT_SECS: u64 = 5; +// The time interval at which we resume persisted payjoin sessions. +pub(crate) const PAYJOIN_RESUME_INTERVAL: Duration = Duration::from_secs(15); + +// The duration after which completed or failed payjoin sessions are cleaned up (24 hours). +pub(crate) const PAYJOIN_SESSION_CLEANUP_AGE_SECS: u64 = 24 * 60 * 60; + +// The interval at which we check for old payjoin sessions to clean up (1 hour). +pub(crate) const PAYJOIN_SESSION_CLEANUP_INTERVAL: Duration = Duration::from_secs(60 * 60); + #[derive(Debug, Clone)] /// Represents the configuration of an [`Node`] instance. /// @@ -127,7 +137,8 @@ pub(crate) const HRN_RESOLUTION_TIMEOUT_SECS: u64 = 5; /// | `probing_liquidity_limit_multiplier` | 3 | /// | `log_level` | Debug | /// | `anchor_channels_config` | Some(..) | -/// | `route_parameters` | None | +/// | `route_parameters` | None | +/// | `payjoin_config` | None | /// /// See [`AnchorChannelsConfig`] and [`RouteParametersConfig`] for more information regarding their /// respective default values. @@ -192,6 +203,8 @@ pub struct Config { /// **Note:** If unset, default parameters will be used, and you will be able to override the /// parameters on a per-payment basis in the corresponding method calls. pub route_parameters: Option, + /// Configuration options for PayJoin payments. + pub payjoin_config: Option, } impl Default for Config { @@ -206,6 +219,7 @@ impl Default for Config { anchor_channels_config: Some(AnchorChannelsConfig::default()), route_parameters: None, node_alias: None, + payjoin_config: None, } } } @@ -607,6 +621,15 @@ pub enum AsyncPaymentsRole { Server, } +/// Configuration options for PayJoin payments. +#[derive(Debug, Clone)] +pub struct PayjoinConfig { + /// The URL of the PayJoin directory to use for discovering PayJoin receivers. + pub payjoin_directory: URL, + /// The URL of the OHTTP relay to use for sending OHTTP requests to PayJoin receivers. + pub ohttp_relay: URL, +} + #[cfg(test)] mod tests { use std::str::FromStr; diff --git a/src/error.rs b/src/error.rs index ea0bcca3b..e8c232864 100644 --- a/src/error.rs +++ b/src/error.rs @@ -131,6 +131,8 @@ pub enum Error { AsyncPaymentServicesDisabled, /// Parsing a Human-Readable Name has failed. HrnParsingFailed, + /// A transaction broadcast operation failed. + TxBroadcastFailed, } impl fmt::Display for Error { @@ -213,6 +215,7 @@ impl fmt::Display for Error { Self::HrnParsingFailed => { write!(f, "Failed to parse a human-readable name.") }, + Self::TxBroadcastFailed => write!(f, "Failed to broadcast transaction."), } } } diff --git a/src/io/mod.rs b/src/io/mod.rs index e080d39f7..93249031f 100644 --- a/src/io/mod.rs +++ b/src/io/mod.rs @@ -82,3 +82,7 @@ pub(crate) const STATIC_INVOICE_STORE_PRIMARY_NAMESPACE: &str = "static_invoices /// The pending payment information will be persisted under this prefix. pub(crate) const PENDING_PAYMENT_INFO_PERSISTENCE_PRIMARY_NAMESPACE: &str = "pending_payments"; pub(crate) const PENDING_PAYMENT_INFO_PERSISTENCE_SECONDARY_NAMESPACE: &str = ""; + +/// The payjoin sessions will be persisted under this key. +pub(crate) const PAYJOIN_SESSION_STORE_PRIMARY_NAMESPACE: &str = "payjoin_sessions"; +pub(crate) const PAYJOIN_SESSION_STORE_SECONDARY_NAMESPACE: &str = ""; diff --git a/src/io/utils.rs b/src/io/utils.rs index d2f70377b..7444cf436 100644 --- a/src/io/utils.rs +++ b/src/io/utils.rs @@ -46,6 +46,7 @@ use crate::io::{ NODE_METRICS_KEY, NODE_METRICS_PRIMARY_NAMESPACE, NODE_METRICS_SECONDARY_NAMESPACE, }; use crate::logger::{log_error, LdkLogger, Logger}; +use crate::payment::payjoin::payjoin_session::PayjoinSession; use crate::payment::PendingPaymentDetails; use crate::peer_store::PeerStore; use crate::types::{Broadcaster, DynStore, KeysManager, Sweeper}; @@ -704,6 +705,83 @@ where Ok(res) } +/// Read previously persisted payjoin sessions information from the store. +pub(crate) async fn read_payjoin_sessions( + kv_store: &DynStore, logger: L, +) -> Result, std::io::Error> +where + L::Target: LdkLogger, +{ + let mut res = Vec::new(); + + let mut stored_keys = KVStore::list( + &*kv_store, + PAYJOIN_SESSION_STORE_PRIMARY_NAMESPACE, + PAYJOIN_SESSION_STORE_SECONDARY_NAMESPACE, + ) + .await?; + + const BATCH_SIZE: usize = 50; + + let mut set = tokio::task::JoinSet::new(); + + // Fill JoinSet with tasks if possible + while set.len() < BATCH_SIZE && !stored_keys.is_empty() { + if let Some(next_key) = stored_keys.pop() { + let fut = KVStore::read( + &*kv_store, + PAYJOIN_SESSION_STORE_PRIMARY_NAMESPACE, + PAYJOIN_SESSION_STORE_SECONDARY_NAMESPACE, + &next_key, + ); + set.spawn(fut); + debug_assert!(set.len() <= BATCH_SIZE); + } + } + + while let Some(read_res) = set.join_next().await { + // Exit early if we get an IO error. + let reader = read_res + .map_err(|e| { + log_error!(logger, "Failed to read PayjoinSessions: {}", e); + set.abort_all(); + e + })? + .map_err(|e| { + log_error!(logger, "Failed to read PayjoinSessions: {}", e); + set.abort_all(); + e + })?; + + // Refill set for every finished future, if we still have something to do. + if let Some(next_key) = stored_keys.pop() { + let fut = KVStore::read( + &*kv_store, + PAYJOIN_SESSION_STORE_PRIMARY_NAMESPACE, + PAYJOIN_SESSION_STORE_SECONDARY_NAMESPACE, + &next_key, + ); + set.spawn(fut); + debug_assert!(set.len() <= BATCH_SIZE); + } + + // Handle result. + let payjoin_session = PayjoinSession::read(&mut &*reader).map_err(|e| { + log_error!(logger, "Failed to deserialize PayjoinSession: {}", e); + std::io::Error::new( + std::io::ErrorKind::InvalidData, + "Failed to deserialize PayjoinSession", + ) + })?; + res.push(payjoin_session); + } + + debug_assert!(set.is_empty()); + debug_assert!(stored_keys.is_empty()); + + Ok(res) +} + #[cfg(test)] mod tests { use super::read_or_generate_seed_file; diff --git a/src/lib.rs b/src/lib.rs index 2b60307b0..b9c9515ad 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -153,8 +153,8 @@ use logger::{log_debug, log_error, log_info, log_trace, LdkLogger, Logger}; use payment::asynchronous::om_mailbox::OnionMessageMailbox; use payment::asynchronous::static_invoice_store::StaticInvoiceStore; use payment::{ - Bolt11Payment, Bolt12Payment, OnchainPayment, PaymentDetails, SpontaneousPayment, - UnifiedPayment, + Bolt11Payment, Bolt12Payment, OnchainPayment, PayjoinPayment, PaymentDetails, + SpontaneousPayment, UnifiedPayment, }; use peer_store::{PeerInfo, PeerStore}; use rand::Rng; @@ -170,6 +170,8 @@ pub use { vss_client, }; +use crate::config::{PAYJOIN_RESUME_INTERVAL, PAYJOIN_SESSION_CLEANUP_INTERVAL}; +use crate::payment::payjoin::manager::PayjoinManager; use crate::scoring::setup_background_pathfinding_scores_sync; #[cfg(feature = "uniffi")] @@ -229,6 +231,7 @@ pub struct Node { hrn_resolver: Arc, #[cfg(cycle_tests)] _leak_checker: LeakChecker, + payjoin_manager: Arc, } impl Node { @@ -661,6 +664,50 @@ impl Node { }); } + // Periodically resume payjoin sessions. + let payjoin_manager = Arc::clone(&self.payjoin_manager); + let resume_logger = Arc::clone(&self.logger); + let mut stop_resume = self.stop_sender.subscribe(); + self.runtime.spawn_cancellable_background_task(async move { + let mut interval = tokio::time::interval(PAYJOIN_RESUME_INTERVAL); + interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip); + loop { + tokio::select! { + _ = stop_resume.changed() => { + log_debug!(resume_logger, "Stopping payjoin session resume task."); + return; + } + _ = interval.tick() => { + if let Err(e) = payjoin_manager.resume_payjoin_sessions().await { + log_error!(resume_logger, "Failed to resume payjoin sessions: {:?}", e); + } + } + } + } + }); + + // Periodically clean up old completed/failed payjoin sessions. + let cleanup_payjoin_manager = Arc::clone(&self.payjoin_manager); + let cleanup_logger = Arc::clone(&self.logger); + let mut stop_cleanup = self.stop_sender.subscribe(); + self.runtime.spawn_cancellable_background_task(async move { + let mut interval = tokio::time::interval(PAYJOIN_SESSION_CLEANUP_INTERVAL); + interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip); + loop { + tokio::select! { + _ = stop_cleanup.changed() => { + log_debug!(cleanup_logger, "Stopping payjoin session cleanup task."); + return; + } + _ = interval.tick() => { + if let Err(e) = cleanup_payjoin_manager.cleanup_old_sessions() { + log_error!(cleanup_logger, "Failed to cleanup old payjoin sessions: {:?}", e); + } + } + } + } + }); + log_info!(self.logger, "Startup complete."); *is_running_lock = true; Ok(()) @@ -962,6 +1009,30 @@ impl Node { )) } + /// Returns a payment handler allowing to receive [Payjoin] payments. + /// + /// [Payjoin]: https://payjoin.org + #[cfg(not(feature = "uniffi"))] + pub fn payjoin_payment(&self) -> PayjoinPayment { + PayjoinPayment::new( + Arc::clone(&self.runtime), + Arc::clone(&self.payjoin_manager), + Arc::clone(&self.is_running), + ) + } + + /// Returns a payment handler allowing to receive [Payjoin] payments. + /// + /// [Payjoin]: https://payjoin.org + #[cfg(feature = "uniffi")] + pub fn payjoin_payment(&self) -> Arc { + Arc::new(PayjoinPayment::new( + Arc::clone(&self.runtime), + Arc::clone(&self.payjoin_manager), + Arc::clone(&self.is_running), + )) + } + /// Returns a payment handler that supports creating and paying to [BIP 21] URIs with on-chain, /// [BOLT 11], and [BOLT 12] payment options. /// diff --git a/src/payment/mod.rs b/src/payment/mod.rs index 42b5aff3b..7eafacb77 100644 --- a/src/payment/mod.rs +++ b/src/payment/mod.rs @@ -11,6 +11,7 @@ pub(crate) mod asynchronous; mod bolt11; mod bolt12; mod onchain; +pub(crate) mod payjoin; pub(crate) mod pending_payment_store; mod spontaneous; pub(crate) mod store; @@ -19,6 +20,7 @@ mod unified; pub use bolt11::Bolt11Payment; pub use bolt12::Bolt12Payment; pub use onchain::OnchainPayment; +pub use payjoin::PayjoinPayment; pub use pending_payment_store::PendingPaymentDetails; pub use spontaneous::SpontaneousPayment; pub use store::{ diff --git a/src/payment/payjoin/manager.rs b/src/payment/payjoin/manager.rs new file mode 100644 index 000000000..949b00d92 --- /dev/null +++ b/src/payment/payjoin/manager.rs @@ -0,0 +1,658 @@ +// This file is Copyright its original authors, visible in version control history. +// +// This file is licensed under the Apache License, Version 2.0 or the MIT license , at your option. You may not use this file except in +// accordance with one or both of these licenses. + +use bitcoin::consensus::encode::serialize_hex; +use bitcoin::{Amount, FeeRate, TxIn, Weight}; +use lightning::ln::channelmanager::PaymentId; +use payjoin::persist::{AsyncSessionPersister, OptionalTransitionOutcome}; +use payjoin::receive::InputPair; +use payjoin::ImplementationError; + +use crate::chain::ChainSource; +use crate::config::{Config, PAYJOIN_SESSION_CLEANUP_AGE_SECS}; +use crate::fee_estimator::{ConfirmationTarget, FeeEstimator, OnchainFeeEstimator}; +use crate::logger::{log_debug, log_error, log_info, LdkLogger, Logger}; +use crate::payment::payjoin::payjoin_session::{PayjoinDirection, PayjoinStatus}; +use crate::payment::{ + ConfirmationStatus, PaymentDetails, PaymentDirection, PaymentKind, PaymentStatus, + PendingPaymentDetails, +}; +use crate::types::{DynStore, PaymentStore, PendingPaymentStore}; +use crate::Error; +use crate::{ + payment::payjoin::persist::KVStorePayjoinReceiverPersister, types::PayjoinSessionStore, + wallet::Wallet, +}; +use payjoin::bitcoin::psbt::Input; +use payjoin::io::fetch_ohttp_keys; +use payjoin::receive::v2::{ + replay_event_log_async as replay_receiver_event_log_async, HasReplyableError, Initialized, + MaybeInputsOwned, MaybeInputsSeen, Monitor, OutputsUnknown, PayjoinProposal, + ProvisionalProposal, ReceiveSession, Receiver, ReceiverBuilder, UncheckedOriginalPayload, + WantsFeeRange, WantsInputs, WantsOutputs, +}; +use rand::RngCore; +use std::sync::Arc; + +#[derive(Clone)] +pub(crate) struct PayjoinManager { + payjoin_session_store: Arc, + kv_store: Arc, + logger: Arc, + config: Arc, + wallet: Arc, + fee_estimator: Arc, + chain_source: Arc, + stop_receiver: tokio::sync::watch::Receiver<()>, + payment_store: Arc, + pending_payment_store: Arc, +} + +// UPDATE ERROR TYPES!!!!!!!!! +// UPDATE ERROR TYPES!!!!!!!!! +// UPDATE ERROR TYPES!!!!!!!!! +// UPDATE ERROR TYPES!!!!!!!!! +// UPDATE ERROR TYPES!!!!!!!!! +// UPDATE ERROR TYPES!!!!!!!!! + +impl PayjoinManager { + pub(crate) fn new( + payjoin_session_store: Arc, kv_store: Arc, + logger: Arc, config: Arc, wallet: Arc, + fee_estimator: Arc, chain_source: Arc, + stop_receiver: tokio::sync::watch::Receiver<()>, payment_store: Arc, + pending_payment_store: Arc, + ) -> Self { + Self { + payjoin_session_store, + kv_store, + logger, + config, + wallet, + fee_estimator, + chain_source, + stop_receiver, + payment_store, + pending_payment_store, + } + } + + pub(crate) async fn receive_payjoin( + &self, amount: Amount, fee_rate: Option, + ) -> Result { + let payjoin_config = self.config.payjoin_config.as_ref().ok_or(Error::InvalidAddress)?; + + // Generate a new session ID + let mut random_bytes = [0u8; 32]; + rand::rng().fill_bytes(&mut random_bytes); + let session_id = PaymentId(random_bytes); + + // Create a new persister for this session + let persister = KVStorePayjoinReceiverPersister::new( + session_id, + Some(amount.to_sat() * 1000), + self.kv_store.clone(), + self.logger.clone(), + )?; + + let address = self.wallet.get_new_address()?; + let ohttp_keys = fetch_ohttp_keys( + payjoin_config.ohttp_relay.clone().as_str(), + payjoin_config.payjoin_directory.clone().as_str(), + ) + .await + .map_err(|e| { + log_error!(self.logger, "Failed to fetch OHTTP keys: {}", e); + Error::InvalidAddress + })?; + log_debug!(self.logger, "Fetched OHTTP keys: {:?}", ohttp_keys); + + let confirmation_target = ConfirmationTarget::OnchainPayment; + let fee_rate = + fee_rate.unwrap_or_else(|| self.fee_estimator.estimate_fee_rate(confirmation_target)); + + let session = ReceiverBuilder::new( + address, + payjoin_config.payjoin_directory.clone().as_str(), + ohttp_keys, + ) + .map_err(|e| { + log_error!(self.logger, "Failed to create receiver builder: {}", e); + Error::InvalidAddress + })? + .with_amount(amount) + .with_max_fee_rate(fee_rate) + .build() + .save_async(&persister) + .await?; + + log_info!(self.logger, "Receive session established"); + let pj_uri = session.pj_uri(); + log_info!(self.logger, "Request Payjoin by sharing this Payjoin Uri: {}", pj_uri); + + let pj_uri_string = pj_uri.to_string(); + let self_clone = self.clone(); + + Ok(pj_uri_string) + } + + async fn process_receiver_session( + &self, session: ReceiveSession, persister: &KVStorePayjoinReceiverPersister, + ) -> Result<(), Error> { + let res = { + match session { + ReceiveSession::Initialized(proposal) => { + self.read_from_directory(proposal, persister).await + }, + ReceiveSession::UncheckedOriginalPayload(proposal) => { + self.check_proposal(proposal, persister).await + }, + ReceiveSession::MaybeInputsOwned(proposal) => { + self.check_inputs_not_owned(proposal, persister).await + }, + ReceiveSession::MaybeInputsSeen(proposal) => { + self.check_no_inputs_seen_before(proposal, persister).await + }, + ReceiveSession::OutputsUnknown(proposal) => { + self.identify_receiver_outputs(proposal, persister).await + }, + ReceiveSession::WantsOutputs(proposal) => { + self.commit_outputs(proposal, persister).await + }, + ReceiveSession::WantsInputs(proposal) => { + self.contribute_inputs(proposal, persister).await + }, + ReceiveSession::WantsFeeRange(proposal) => { + self.apply_fee_range(proposal, persister).await + }, + ReceiveSession::ProvisionalProposal(proposal) => { + self.finalize_proposal(proposal, persister).await + }, + ReceiveSession::PayjoinProposal(proposal) => { + self.send_payjoin_proposal(proposal, persister).await + }, + ReceiveSession::HasReplyableError(error) => { + self.handle_error(error, persister).await + }, + ReceiveSession::Monitor(proposal) => { + self.monitor_payjoin_proposal(proposal, persister).await + }, + ReceiveSession::Closed(_) => return Err(Error::InvalidAddress), + } + }; + res + } + + async fn read_from_directory( + &self, session: Receiver, persister: &KVStorePayjoinReceiverPersister, + ) -> Result<(), Error> { + let mut interrupt = self.stop_receiver.clone(); + let receiver = tokio::select! { + res = self.long_poll_fallback(session, &*persister) => res, + _ = interrupt.changed() => { + log_error!(self.logger, "Session interrupted by node shutdown. Will resume on restart."); + return Err(Error::InvalidAddress); + } + }?; + self.check_proposal(receiver, &*persister).await + } + + async fn long_poll_fallback( + &self, session: Receiver, persister: &KVStorePayjoinReceiverPersister, + ) -> Result, Error> { + let payjoin_config = self.config.payjoin_config.as_ref().ok_or(Error::InvalidAddress)?; + let ohttp_relay = payjoin_config.ohttp_relay.clone(); + + let mut session = session; + loop { + let (req, context) = + session.create_poll_request(ohttp_relay.as_str()).map_err(|e| { + log_error!(self.logger, "Failed to create poll request: {}", e); + Error::InvalidAddress + })?; + log_debug!(self.logger, "Polling receive request..."); + let ohttp_response = self.post_request(req).await?; + let state_transition = session + .process_response(ohttp_response.as_bytes().to_vec().as_slice(), context) + .save_async(persister) + .await; + match state_transition { + Ok(OptionalTransitionOutcome::Progress(next_state)) => { + log_info!( + self.logger, + "Got a request from the sender. Responding with a Payjoin proposal." + ); + return Ok(next_state); + }, + Ok(OptionalTransitionOutcome::Stasis(current_state)) => { + session = current_state; + continue; + }, + Err(_) => return Err(Error::PersistenceFailed), + } + } + } + + async fn post_request(&self, req: payjoin::Request) -> Result { + bitreq::post(req.url) + .with_header("Content-Type", req.content_type) + .with_body(req.body) + .send_async() + .await + .map_err(|e| { + log_error!(self.logger, "HTTP request failed: {}", e); + Error::InvalidAddress + }) + } + + async fn check_proposal( + &self, proposal: Receiver, + persister: &KVStorePayjoinReceiverPersister, + ) -> Result<(), Error> { + let proposal = proposal + .check_broadcast_suitability(None, |tx| { + self.chain_source + .can_broadcast_transaction(tx) + .map_err(|e| ImplementationError::from(e.to_string().as_str())) + }) + .save_async(persister) + .await + .map_err(|_| Error::PersistenceFailed)?; + + // TODO: Store the fallback transaction (proposal.extract_tx_to_schedule_broadcast()) in PayjoinSession. + // If the payjoin fails or times out, broadcast this fallback tx to ensure the receiver still gets paid. + // Consider adding a `fallback_tx: Option>` field to PayjoinSession for persistence. + log_info!(self.logger, "Fallback transaction received. Consider broadcasting this to get paid if the Payjoin fails: {}", serialize_hex(&proposal.extract_tx_to_schedule_broadcast())); + self.check_inputs_not_owned(proposal, persister).await + } + + async fn check_inputs_not_owned( + &self, proposal: Receiver, persister: &KVStorePayjoinReceiverPersister, + ) -> Result<(), Error> { + let proposal = proposal + .check_inputs_not_owned(&mut |input| { + self.wallet + .is_mine(input.to_owned()) + .map_err(|e| ImplementationError::from(e.to_string().as_str())) + }) + .save_async(persister) + .await + .map_err(|_| Error::PersistenceFailed)?; + + self.check_no_inputs_seen_before(proposal, persister).await + } + + async fn check_no_inputs_seen_before( + &self, proposal: Receiver, persister: &KVStorePayjoinReceiverPersister, + ) -> Result<(), Error> { + let proposal = proposal + // TODO: DETERMINE IF SAVING THE INPUT AT THIS POINT IS NECESSARY FOR NOW WE JUST RETURN FALSE + // BUT I THINK IT WOULD BE BETTER TO SAVE IT SO THAT IF THE SESSION IS RESUMED WE CAN CHECK AGAIN + .check_no_inputs_seen_before(&mut |_| Ok(false)) + .save_async(persister) + .await + .map_err(|_| Error::PersistenceFailed)?; + self.identify_receiver_outputs(proposal, persister).await + } + + async fn identify_receiver_outputs( + &self, proposal: Receiver, persister: &KVStorePayjoinReceiverPersister, + ) -> Result<(), Error> { + let proposal = proposal + .identify_receiver_outputs(&mut |output_script| { + self.wallet + .is_mine(output_script.to_owned()) + .map_err(|e| ImplementationError::from(e.to_string().as_str())) + }) + .save_async(persister) + .await + .map_err(|_| Error::PersistenceFailed)?; + self.commit_outputs(proposal, persister).await + } + + async fn commit_outputs( + &self, proposal: Receiver, persister: &KVStorePayjoinReceiverPersister, + ) -> Result<(), Error> { + let proposal = proposal.commit_outputs().save_async(persister).await?; + self.contribute_inputs(proposal, persister).await + } + + async fn contribute_inputs( + &self, proposal: Receiver, persister: &KVStorePayjoinReceiverPersister, + ) -> Result<(), Error> { + let candidate_inputs = self.list_input_pairs()?; + + if candidate_inputs.is_empty() { + return Err({ + log_error!( + self.logger, + "No spendable UTXOs available in wallet. Cannot contribute inputs to payjoin." + ); + Error::InvalidAddress + }); + } + + let selected_input = + proposal.try_preserving_privacy(candidate_inputs).map_err(|_| Error::InvalidAddress)?; + let proposal = proposal + .contribute_inputs(vec![selected_input]) + .map_err(|_| Error::InvalidAddress)? + .commit_inputs() + .save_async(persister) + .await?; + self.apply_fee_range(proposal, persister).await + } + + fn list_input_pairs(&self) -> Result, Error> { + let unspent = self.wallet.list_unspent_utxos()?; + + let mut input_pairs = Vec::with_capacity(unspent.len()); + + for u in unspent { + let txin = TxIn { previous_output: u.outpoint, ..Default::default() }; + let psbtin = Input { witness_utxo: Some(u.output.clone()), ..Default::default() }; + let satisfaction_weight = Weight::from_wu(u.satisfaction_weight); + + let input_pair = + InputPair::new(txin, psbtin, Some(satisfaction_weight)).map_err(|e| { + log_error!(self.logger, "Failed to create InputPair: {}", e); + Error::InvalidAddress + })?; + + input_pairs.push(input_pair); + } + + Ok(input_pairs) + } + + async fn apply_fee_range( + &self, proposal: Receiver, persister: &KVStorePayjoinReceiverPersister, + ) -> Result<(), Error> { + let proposal = proposal + .apply_fee_range(None, None) + .save_async(persister) + .await + .map_err(|_| Error::PersistenceFailed)?; + + self.finalize_proposal(proposal, persister).await + } + + async fn finalize_proposal( + &self, proposal: Receiver, persister: &KVStorePayjoinReceiverPersister, + ) -> Result<(), Error> { + let proposal = proposal + .finalize_proposal(|psbt| { + self.wallet + .process_psbt(psbt.clone()) + .map_err(|e| ImplementationError::from(e.to_string().as_str())) + }) + .save_async(persister) + .await + .map_err(|_| Error::PersistenceFailed)?; + self.send_payjoin_proposal(proposal, persister).await + } + + async fn send_payjoin_proposal( + &self, proposal: Receiver, persister: &KVStorePayjoinReceiverPersister, + ) -> Result<(), Error> { + let payjoin_config = self.config.payjoin_config.as_ref().ok_or(Error::InvalidAddress)?; + let ohttp_relay = payjoin_config.ohttp_relay.clone(); + let (req, ohttp_ctx) = proposal.create_post_request(ohttp_relay.as_str()).map_err(|e| { + log_error!(self.logger, "v2 req extraction failed {}", e); + Error::InvalidAddress + })?; + let res = self.post_request(req).await?; + let payjoin_psbt = proposal.psbt().clone(); + let session = proposal + .process_response(&res.as_bytes(), ohttp_ctx) + .save_async(persister) + .await + .map_err(|_| Error::PersistenceFailed)?; + + // at this point we will persist to the payment and pending payment stores + let txid = payjoin_psbt.clone().extract_tx_unchecked_fee_rate().compute_txid(); + let fee = payjoin_psbt.clone().fee().unwrap_or(Amount::ZERO); + let fee_sat = fee.to_sat(); + let kind = PaymentKind::Payjoin { txid, status: ConfirmationStatus::Unconfirmed }; + + let amount_msat = match persister.get_session() { + Some(payjoin_session) => payjoin_session.amount_msat, + None => None, + }; + let payment = PaymentDetails::new( + persister.session_id(), + kind, + amount_msat, + Some(fee_sat * 1000), + PaymentDirection::Inbound, + PaymentStatus::Pending, + ); + + self.payment_store.insert_or_update(payment.clone())?; + + //persit to pending payment store??? + let pending_payment = PendingPaymentDetails::new(payment, Vec::new()); + self.pending_payment_store.insert_or_update(pending_payment)?; + + log_info!( + self.logger, + "Response successful. Watch mempool for successful Payjoin. TXID: {}", + payjoin_psbt.extract_tx_unchecked_fee_rate().compute_txid() + ); + + return self.monitor_payjoin_proposal(session, persister).await; + } + + async fn handle_error( + &self, session: Receiver, persister: &KVStorePayjoinReceiverPersister, + ) -> Result<(), Error> { + let payjoin_config = self.config.payjoin_config.as_ref().ok_or(Error::InvalidAddress)?; + let ohttp_relay = payjoin_config.ohttp_relay.clone(); + + let (err_req, err_ctx) = session + .create_error_request(ohttp_relay.as_str()) + .map_err(|_| Error::InvalidAddress)?; + + let err_response = match self.post_request(err_req).await { + Ok(response) => response, + Err(_) => return Err(Error::InvalidAddress), + }; + + let err_bytes = err_response.as_bytes(); + + if let Err(_) = + session.process_error_response(&err_bytes, err_ctx).save_async(persister).await + { + return Err(Error::InvalidAddress); + } + + Ok(()) + } + + // is this really neccessary? can we use the events returned from bdk wallet during + // chain sync to know if the transaction is a payjoin tx? + async fn monitor_payjoin_proposal( + &self, proposal: Receiver, persister: &KVStorePayjoinReceiverPersister, + ) -> Result<(), Error> { + // On a session resumption, the receiver will resume again in this state. + let poll_interval = tokio::time::Duration::from_millis(200); + // TODO: This timeout is too short for real-world mempool propagation (can take 10-30+ seconds). + // Make this configurable or increase to a more reasonable default (e.g., 30-60 seconds). + let timeout_duration = tokio::time::Duration::from_secs(5); + + let mut interval = tokio::time::interval(poll_interval); + interval.tick().await; + + log_debug!(self.logger, "Polling for payment confirmation"); + + let result = tokio::time::timeout(timeout_duration, async { + loop { + interval.tick().await; + let check_result = proposal + .check_payment(|txid| { + self.chain_source + .get_transaction(&txid) + .map_err(|e| ImplementationError::from(e.to_string().as_str())) + }) + .save_async(persister) + .await; + + match check_result { + Ok(_) => { + log_info!(self.logger, "Payjoin transaction detected in the mempool!"); + return Ok(()); + }, + Err(_) => { + // keep polling + continue; + }, + } + } + }) + .await; + + match result { + Ok(ok) => ok, + Err(_) => Err({ + log_error!( + self.logger, + "Timeout waiting for payment confirmation after {:?}", + timeout_duration + ); + Error::InvalidAddress + }), + } + } + + pub(crate) async fn resume_payjoin_sessions(&self) -> Result<(), Error> { + let recv_session_ids = self + .payjoin_session_store + .list_filter(|p| { + p.direction == PayjoinDirection::Receive && p.status == PayjoinStatus::Active + }) + .iter() + .map(|s| s.session_id.clone()) + .collect::>(); + + if recv_session_ids.is_empty() { + log_info!(self.logger, "No sessions to resume."); + return Ok(()); + } + + let mut tasks = Vec::new(); + + // Process receiver sessions + for session_id in recv_session_ids { + let self_clone = self.clone(); + // Create a persister for this session + let recv_persister = match KVStorePayjoinReceiverPersister::from_session( + session_id.clone(), + self.kv_store.clone(), + self.logger.clone(), + ) { + Ok(p) => p, + Err(e) => { + log_error!( + self.logger, + "Failed to create persister for session {:?}: {:?}", + session_id, + e + ); + continue; + }, + }; + + match replay_receiver_event_log_async(&recv_persister).await { + Ok((receiver_state, _)) => { + tasks.push(tokio::spawn(async move { + self_clone.process_receiver_session(receiver_state, &recv_persister).await + })); + }, + Err(e) => { + log_error!( + self.logger, + "An error {:?} occurred while replaying receiver session", + e + ); + self.close_failed_session(&recv_persister, &session_id, "receiver").await; + }, + } + } + + let mut interrupt = self.stop_receiver.clone(); + tokio::select! { + _ = async { + for task in tasks { + let _ = task.await; + } + } => { + println!("All payjoin resumed sessions completed."); + } + _ = interrupt.changed() => { + println!("Resumed payjoin sessions were interrupted."); + } + } + Ok(()) + } + + async fn close_failed_session

(&self, persister: &P, session_id: &PaymentId, role: &str) + where + P: AsyncSessionPersister, + { + if let Err(close_err) = AsyncSessionPersister::close(persister).await { + log_error!( + self.logger, + "Failed to close {} session {}: {:?}", + role, + session_id, + close_err + ); + } else { + log_error!(self.logger, "Closed failed {} session: {}", role, session_id); + } + } + + /// Cleans up old payjoin sessions that are completed or failed. + /// Sessions older than `PAYJOIN_SESSION_CLEANUP_AGE_SECS` will be removed. + pub(crate) fn cleanup_old_sessions(&self) -> Result<(), Error> { + let now = std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .unwrap_or(std::time::Duration::from_secs(0)) + .as_secs(); + + let sessions_to_remove: Vec = self + .payjoin_session_store + .list_filter(|s| { + let is_terminal = + s.status == PayjoinStatus::Completed || s.status == PayjoinStatus::Failed; + let age = now.saturating_sub(s.latest_update_timestamp); + is_terminal && age > PAYJOIN_SESSION_CLEANUP_AGE_SECS + }) + .iter() + .map(|s| s.session_id) + .collect(); + + if sessions_to_remove.is_empty() { + return Ok(()); + } + + log_info!(self.logger, "Cleaning up {} old payjoin sessions", sessions_to_remove.len()); + + for session_id in sessions_to_remove { + if let Err(e) = self.payjoin_session_store.remove(&session_id) { + log_error!( + self.logger, + "Failed to remove old payjoin session {:?}: {:?}", + session_id, + e + ); + } + } + + Ok(()) + } +} diff --git a/src/payment/payjoin/mod.rs b/src/payment/payjoin/mod.rs new file mode 100644 index 000000000..6a5afb2be --- /dev/null +++ b/src/payment/payjoin/mod.rs @@ -0,0 +1,49 @@ +// This file is Copyright its original authors, visible in version control history. +// +// This file is licensed under the Apache License, Version 2.0 or the MIT license , at your option. You may not use this file except in +// accordance with one or both of these licenses. + +//! Holds a payment handler for sending and receiving Payjoin payments. + +pub(crate) mod manager; +pub(crate) mod payjoin_session; +pub(crate) mod persist; + +use std::sync::{Arc, RwLock}; + +use crate::error::Error; +use crate::runtime::Runtime; +use crate::types::PayjoinManager; + +/// A payment handler allowing to receive [Payjoin] payments. +/// +/// Should be retrieved by calling [`Node::payjoin_payment`]. +/// +/// [Payjoin]: https://payjoin.org +/// [`Node::payjoin_payment`]: crate::Node::payjoin_payment +pub struct PayjoinPayment { + runtime: Arc, + manager: Arc, + is_running: Arc>, +} + +impl PayjoinPayment { + pub(crate) fn new( + runtime: Arc, manager: Arc, is_running: Arc>, + ) -> Self { + Self { runtime, manager, is_running } + } + + /// Returns a Payjoin URI that can be shared with a sender to receive a Payjoin payment. + /// + /// The returned string is a BIP 21 URI with Payjoin parameters that the sender can use + /// to initiate the Payjoin flow. + pub fn receive(&self, amount: bitcoin::Amount) -> Result { + if !*self.is_running.read().unwrap() { + return Err(Error::NotRunning); + } + self.runtime.block_on(self.manager.receive_payjoin(amount, None)) + } +} diff --git a/src/payment/payjoin/payjoin_session.rs b/src/payment/payjoin/payjoin_session.rs new file mode 100644 index 000000000..5f0acc1d6 --- /dev/null +++ b/src/payment/payjoin/payjoin_session.rs @@ -0,0 +1,245 @@ +// This file is Copyright its original authors, visible in version control history. +// +// This file is licensed under the Apache License, Version 2.0 or the MIT license , at your option. You may not use this file except in +// accordance with one or both of these licenses. + +use std::time::{Duration, SystemTime, UNIX_EPOCH}; + +use lightning::ln::channelmanager::PaymentId; +use lightning::ln::msgs::DecodeError; +use lightning::util::ser::{Readable, Writeable}; +use lightning::{ + _init_and_read_len_prefixed_tlv_fields, impl_writeable_tlv_based, + impl_writeable_tlv_based_enum, write_tlv_fields, +}; + +use crate::data_store::{StorableObject, StorableObjectUpdate}; + +/// Represents a payjoin session with persisted events +#[derive(Clone, Debug, PartialEq, Eq)] +pub struct PayjoinSession { + /// Session identifier (uses PaymentId from PaymentDetails) + pub session_id: PaymentId, + + /// Direction of the payjoin (Send or Receive) + pub direction: PayjoinDirection, + + /// HPKE public key of receiver (only for sender sessions) + pub receiver_pubkey: Option>, + + /// The amount transferred. + pub amount_msat: Option, + + /// Serialized session events as JSON strings + pub events: Vec, + + /// Current status of the session + pub status: PayjoinStatus, + + /// Unix timestamp of session completion (if completed) + pub completed_at: Option, + + /// The timestamp, in seconds since start of the UNIX epoch, when this entry was last updated. + pub latest_update_timestamp: u64, +} + +impl PayjoinSession { + pub fn new( + session_id: PaymentId, direction: PayjoinDirection, receiver_pubkey: Option>, + amount_msat: Option, + ) -> Self { + let latest_update_timestamp = SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap_or(Duration::from_secs(0)) + .as_secs(); + Self { + session_id, + direction, + receiver_pubkey, + amount_msat, + events: Vec::new(), + status: PayjoinStatus::Active, + completed_at: None, + latest_update_timestamp, + } + } +} + +impl Writeable for PayjoinSession { + fn write( + &self, writer: &mut W, + ) -> Result<(), lightning::io::Error> { + write_tlv_fields!(writer, { + (0, self.session_id, required), + (2, self.direction, required), + (4, self.receiver_pubkey, option), + (6, self.amount_msat, option), + (8, self.events, required_vec), + (10, self.status, required), + (12, self.completed_at, required), + (14, self.latest_update_timestamp, required), + }); + Ok(()) + } +} + +impl Readable for PayjoinSession { + fn read(reader: &mut R) -> Result { + let unix_time_secs = SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap_or(Duration::from_secs(0)) + .as_secs(); + _init_and_read_len_prefixed_tlv_fields!(reader, { + (0, session_id, required), + (2, direction, required), + (4, receiver_pubkey, option), + (6, amount_msat, option), + (8, events, required_vec), + (10, status, required), + (12, completed_at, option), + (14, latest_update_timestamp, (default_value, unix_time_secs)) + }); + + let session_id: PaymentId = session_id.0.ok_or(DecodeError::InvalidValue)?; + let direction: PayjoinDirection = direction.0.ok_or(DecodeError::InvalidValue)?; + let status: PayjoinStatus = status.0.ok_or(DecodeError::InvalidValue)?; + let latest_update_timestamp: u64 = + latest_update_timestamp.0.ok_or(DecodeError::InvalidValue)?; + + Ok(PayjoinSession { + session_id, + direction, + receiver_pubkey, + amount_msat, + events, + status, + completed_at, + latest_update_timestamp, + }) + } +} + +#[derive(Copy, Clone, Debug, PartialEq, Eq)] +pub enum PayjoinDirection { + /// The session is for sending a payment + Send, + /// The session is for receiving a payment + Receive, +} + +#[derive(Copy, Clone, Debug, PartialEq, Eq)] +pub enum PayjoinStatus { + /// The session is active + Active, + /// The session has completed successfully + Completed, + /// The session has failed + Failed, +} + +#[derive(Clone, Debug, PartialEq, Eq)] +pub struct SerializedSessionEvent { + /// JSON representation of the event + pub event_json: String, + /// Unix timestamp of when the event occurred + pub created_at: u64, +} + +impl_writeable_tlv_based!(SerializedSessionEvent, { + (0, event_json, required), + (2, created_at, required), +}); + +impl_writeable_tlv_based_enum!(PayjoinDirection, + (0, Send) => {}, + (2, Receive) => {} +); + +impl_writeable_tlv_based_enum!(PayjoinStatus, + (0, Active) => {}, + (2, Completed) => {}, + (4, Failed) => {} +); + +/// Represents a payjoin session with persisted events +#[derive(Clone, Debug, PartialEq, Eq)] +pub(crate) struct PayjoinSessionUpdate { + pub session_id: PaymentId, + pub receiver_pubkey: Option>>, + pub events: Option>, + pub status: Option, + pub completed_at: Option>, +} + +impl From<&PayjoinSession> for PayjoinSessionUpdate { + fn from(value: &PayjoinSession) -> Self { + Self { + session_id: value.session_id, + receiver_pubkey: Some(value.receiver_pubkey.clone()), + events: Some(value.events.clone()), + status: Some(value.status), + completed_at: Some(value.completed_at), + } + } +} + +impl StorableObject for PayjoinSession { + type Id = PaymentId; + type Update = PayjoinSessionUpdate; + + fn id(&self) -> Self::Id { + self.session_id + } + + fn update(&mut self, update: Self::Update) -> bool { + debug_assert_eq!( + self.session_id, update.session_id, + "We should only ever override data for the same id" + ); + + let mut updated = false; + + macro_rules! update_if_necessary { + ($val:expr, $update:expr) => { + if $val != $update { + $val = $update; + updated = true; + } + }; + } + + if let Some(receiver_pubkey_opt) = &update.receiver_pubkey { + update_if_necessary!(self.receiver_pubkey, receiver_pubkey_opt.clone()); + } + if let Some(events_opt) = &update.events { + update_if_necessary!(self.events, events_opt.clone()); + } + if let Some(status_opt) = update.status { + update_if_necessary!(self.status, status_opt); + } + if let Some(completed_at_opt) = update.completed_at { + update_if_necessary!(self.completed_at, completed_at_opt); + } + + if updated { + self.latest_update_timestamp = SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap_or(Duration::from_secs(0)) + .as_secs(); + } + + updated + } + + fn to_update(&self) -> Self::Update { + self.into() + } +} + +impl StorableObjectUpdate for PayjoinSessionUpdate { + fn id(&self) -> ::Id { + self.session_id + } +} diff --git a/src/payment/payjoin/persist.rs b/src/payment/payjoin/persist.rs new file mode 100644 index 000000000..9acd5058a --- /dev/null +++ b/src/payment/payjoin/persist.rs @@ -0,0 +1,231 @@ +// This file is Copyright its original authors, visible in version control history. +// +// This file is licensed under the Apache License, Version 2.0 or the MIT license , at your option. You may not use this file except in +// accordance with one or both of these licenses. + +use crate::payment::payjoin::payjoin_session::{PayjoinStatus, SerializedSessionEvent}; +use crate::Error; +use std::{ + sync::Arc, + time::{Duration, SystemTime, UNIX_EPOCH}, +}; + +use lightning::util::ser::Readable; +use lightning::{io::Cursor, ln::channelmanager::PaymentId, util::persist::KVStoreSync}; + +use crate::io::{ + PAYJOIN_SESSION_STORE_PRIMARY_NAMESPACE, PAYJOIN_SESSION_STORE_SECONDARY_NAMESPACE, +}; +use crate::logger::{log_error, LdkLogger, Logger}; +use crate::payment::payjoin::payjoin_session::{PayjoinDirection, PayjoinSession}; +use crate::types::{DynStore, PayjoinSessionStore}; + +use payjoin::persist::AsyncSessionPersister; +use payjoin::receive::v2::SessionEvent as ReceiverSessionEvent; + +pub(crate) struct KVStorePayjoinReceiverPersister { + session_id: PaymentId, + kv_store: Arc, +} + +impl KVStorePayjoinReceiverPersister { + pub fn new( + session_id: PaymentId, amount_msat: Option, kv_store: Arc, + logger: Arc, + ) -> Result { + let sessions = Self::load_all_sessions(&kv_store, &logger)?; + let data_store = Arc::new(PayjoinSessionStore::new( + sessions, + PAYJOIN_SESSION_STORE_PRIMARY_NAMESPACE.to_string(), + PAYJOIN_SESSION_STORE_SECONDARY_NAMESPACE.to_string(), + kv_store, + logger, + )); + + let session = PayjoinSession::new(session_id, PayjoinDirection::Receive, None, amount_msat); + + data_store.insert(session)?; + + Ok(Self { session_id, kv_store: data_store }) + } + + pub(super) fn session_id(&self) -> PaymentId { + self.session_id + } + + pub(super) fn get_session(&self) -> Option { + self.kv_store.get(&self.session_id) + } + + /// Reconstruct persister from existing session + pub fn from_session( + session_id: PaymentId, kv_store: Arc, logger: Arc, + ) -> Result { + let sessions = Self::load_all_sessions(&kv_store, &logger)?; + let data_store = Arc::new(PayjoinSessionStore::new( + sessions, + PAYJOIN_SESSION_STORE_PRIMARY_NAMESPACE.to_string(), + PAYJOIN_SESSION_STORE_SECONDARY_NAMESPACE.to_string(), + kv_store, + logger, + )); + + if data_store.get(&session_id).is_none() { + return Err(Error::InvalidPaymentId); + } + + Ok(Self { session_id, kv_store: data_store }) + } + + /// Load all sessions from KV store + fn load_all_sessions( + kv_store: &Arc, logger: &Arc, + ) -> Result, Error> { + let keys = KVStoreSync::list( + &**kv_store, + PAYJOIN_SESSION_STORE_PRIMARY_NAMESPACE, + PAYJOIN_SESSION_STORE_SECONDARY_NAMESPACE, + ) + .map_err(|e| { + log_error!(logger, "Failed to list payjoin sessions: {:?}", e); + Error::PersistenceFailed + })?; + + let mut sessions = Vec::new(); + for key in keys { + match KVStoreSync::read( + &**kv_store, + PAYJOIN_SESSION_STORE_PRIMARY_NAMESPACE, + PAYJOIN_SESSION_STORE_SECONDARY_NAMESPACE, + &key, + ) { + Ok(data) => { + let mut reader = Cursor::new(&data[..]); + match PayjoinSession::read(&mut reader) { + Ok(session) => sessions.push(session), + Err(e) => { + log_error!( + logger, + "Failed to deserialize PayjoinSession for key {}: {:?}. Skipping corrupted session.", + key, e + ); + continue; + }, + } + }, + Err(e) => { + log_error!( + logger, + "Failed to read PayjoinSession data for key {}: {:?}", + key, + e + ); + continue; + }, + } + } + + Ok(sessions) + } + + /// Get all active Receiver session IDs + pub fn get_active_session_ids( + kv_store: Arc, logger: Arc, + ) -> Result, Error> { + let sessions = Self::load_all_sessions(&kv_store, &logger)?; + Ok(sessions + .into_iter() + .filter(|s| { + s.direction == PayjoinDirection::Receive && s.status == PayjoinStatus::Active + }) + .map(|s| s.session_id) + .collect()) + } + + /// Get all inactive Receiver sessions (for cleanup) + pub fn get_inactive_sessions( + kv_store: Arc, logger: Arc, + ) -> Result, Error> { + let sessions = Self::load_all_sessions(&kv_store, &logger)?; + Ok(sessions + .into_iter() + .filter(|s| { + s.direction == PayjoinDirection::Receive + && s.status != PayjoinStatus::Active + && s.completed_at.is_some() + }) + .map(|s| (s.session_id, s.completed_at.unwrap())) + .collect()) + } +} + +impl AsyncSessionPersister for KVStorePayjoinReceiverPersister { + type SessionEvent = ReceiverSessionEvent; + type InternalStorageError = Error; + + fn save_event( + &self, event: Self::SessionEvent, + ) -> impl std::future::Future> + Send { + async move { + let mut session = self.kv_store.get(&self.session_id).ok_or(Error::InvalidPaymentId)?; + + let event_json = serde_json::to_string(&event).map_err(|_| Error::PersistenceFailed)?; + + session.events.push(SerializedSessionEvent { + event_json, + created_at: SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap_or(Duration::from_secs(0)) + .as_secs(), + }); + + self.kv_store.insert_or_update(session)?; + + Ok(()) + } + } + + fn load( + &self, + ) -> impl std::future::Future< + Output = Result< + Box + Send>, + Self::InternalStorageError, + >, + > + Send { + async move { + let session = self.kv_store.get(&self.session_id).ok_or(Error::InvalidPaymentId)?; + + let events: Vec = session + .events + .iter() + .map(|e| serde_json::from_str(&e.event_json)) + .collect::, _>>() + .map_err(|_| Error::PersistenceFailed)?; + + Ok(Box::new(events.into_iter()) as Box + Send>) + } + } + + fn close( + &self, + ) -> impl std::future::Future> + Send { + async move { + let mut session = self.kv_store.get(&self.session_id).ok_or(Error::InvalidPaymentId)?; + + session.completed_at = Some(now()); + session.status = PayjoinStatus::Completed; + + self.kv_store.insert_or_update(session)?; + + Ok(()) + } + } +} + +// Helper function for timestamp +fn now() -> u64 { + SystemTime::now().duration_since(UNIX_EPOCH).unwrap_or(Duration::from_secs(0)).as_secs() +} diff --git a/src/payment/store.rs b/src/payment/store.rs index 58b410894..5abcdf49b 100644 --- a/src/payment/store.rs +++ b/src/payment/store.rs @@ -284,7 +284,8 @@ impl StorableObject for PaymentDetails { if let Some(confirmation_status) = update.confirmation_status { match self.kind { - PaymentKind::Onchain { ref mut status, .. } => { + PaymentKind::Onchain { ref mut status, .. } + | PaymentKind::Payjoin { ref mut status, .. } => { update_if_necessary!(*status, confirmation_status); }, _ => {}, @@ -443,6 +444,18 @@ pub enum PaymentKind { /// The pre-image used by the payment. preimage: Option, }, + /// An on-chain Payjoin payment. + /// + /// Payments of this kind will be considered pending until the respective transaction has + /// reached [`ANTI_REORG_DELAY`] confirmations on-chain. + /// + /// [`ANTI_REORG_DELAY`]: lightning::chain::channelmonitor::ANTI_REORG_DELAY + Payjoin { + /// The transaction identifier of this payment. + txid: Txid, + /// The confirmation status of this payment. + status: ConfirmationStatus, + }, } impl_writeable_tlv_based_enum!(PaymentKind, @@ -480,7 +493,11 @@ impl_writeable_tlv_based_enum!(PaymentKind, (2, preimage, option), (3, quantity, option), (4, secret, option), - } + }, + (11, Payjoin)=>{ + (0, txid, required), + (2, status, required), + }, ); /// Represents the confirmation status of a transaction. @@ -571,7 +588,9 @@ impl From<&PaymentDetails> for PaymentDetailsUpdate { }; let confirmation_status = match value.kind { - PaymentKind::Onchain { status, .. } => Some(status), + PaymentKind::Onchain { status, .. } | PaymentKind::Payjoin { status, .. } => { + Some(status) + }, _ => None, }; diff --git a/src/types.rs b/src/types.rs index b5b1ffed7..684be1cb0 100644 --- a/src/types.rs +++ b/src/types.rs @@ -39,6 +39,7 @@ use crate::data_store::DataStore; use crate::fee_estimator::OnchainFeeEstimator; use crate::logger::Logger; use crate::message_handler::NodeCustomMessageHandler; +use crate::payment::payjoin::payjoin_session::PayjoinSession; use crate::payment::{PaymentDetails, PendingPaymentDetails}; use crate::runtime::RuntimeSpawner; @@ -321,6 +322,8 @@ pub(crate) type BumpTransactionEventHandler = pub(crate) type PaymentStore = DataStore>; +pub(crate) type PayjoinSessionStore = DataStore>; + /// A local, potentially user-provided, identifier of a channel. /// /// By default, this will be randomly generated for the user to ensure local uniqueness. @@ -623,3 +626,5 @@ impl From<&(u64, Vec)> for CustomTlvRecord { } pub(crate) type PendingPaymentStore = DataStore>; + +pub(crate) type PayjoinManager = crate::PayjoinManager; diff --git a/src/wallet/mod.rs b/src/wallet/mod.rs index 87b544566..2786aa909 100644 --- a/src/wallet/mod.rs +++ b/src/wallet/mod.rs @@ -1002,6 +1002,99 @@ impl Wallet { None } + + /// Check if a script belongs to this wallet + pub fn is_mine(&self, script: ScriptBuf) -> Result { + let locked_wallet = self.inner.lock().unwrap(); + Ok(locked_wallet.is_mine(script)) + } + + #[allow(deprecated)] + pub fn process_psbt(&self, mut psbt: Psbt) -> Result { + let locked_wallet = self.inner.lock().unwrap(); + + let mut sign_options = SignOptions::default(); + sign_options.trust_witness_utxo = true; + + locked_wallet.sign(&mut psbt, sign_options).map_err(|e| { + log_error!(self.logger, "Failed to sign PSBT: {}", e); + Error::WalletOperationFailed + })?; + + // Return the signed PSBT (not extracted transaction) + Ok(psbt) + } + + pub fn list_unspent_utxos(&self) -> Result, Error> { + let locked_wallet = self.inner.lock().unwrap(); + + let mut utxos = Vec::new(); + + for u in locked_wallet.list_unspent() { + let script_pubkey = &u.txout.script_pubkey; + + match script_pubkey.witness_version() { + Some(version @ WitnessVersion::V0) => { + // P2WPKH handling + let witness_bytes = &script_pubkey.as_bytes()[2..]; + let witness_program = + WitnessProgram::new(version, witness_bytes).map_err(|e| { + log_error!(self.logger, "Failed to retrieve script payload: {}", e); + Error::InvalidAddress + })?; + + let wpkh = WPubkeyHash::from_slice(&witness_program.program().as_bytes()) + .map_err(|e| { + log_error!(self.logger, "Failed to retrieve script payload: {}", e); + Error::InvalidAddress + })?; + + let utxo = Utxo::new_v0_p2wpkh(u.outpoint, u.txout.value, &wpkh); + utxos.push(utxo); + }, + Some(version @ WitnessVersion::V1) => { + // P2TR (Taproot) handling + let witness_bytes = &script_pubkey.as_bytes()[2..]; + let witness_program = + WitnessProgram::new(version, witness_bytes).map_err(|e| { + log_error!(self.logger, "Failed to retrieve script payload: {}", e); + Error::InvalidAddress + })?; + + XOnlyPublicKey::from_slice(&witness_program.program().as_bytes()).map_err( + |e| { + log_error!(self.logger, "Failed to retrieve script payload: {}", e); + Error::InvalidAddress + }, + )?; + + let utxo = Utxo { + outpoint: u.outpoint, + output: TxOut { + value: u.txout.value, + script_pubkey: ScriptBuf::new_witness_program(&witness_program), + }, + satisfaction_weight: 1 /* empty script_sig */ * WITNESS_SCALE_FACTOR as u64 + + 1 /* witness items */ + 1 /* schnorr sig len */ + 64, // schnorr sig + }; + utxos.push(utxo); + }, + Some(version) => { + log_error!(self.logger, "Unexpected witness version: {}", version); + continue; + }, + None => { + log_error!( + self.logger, + "Tried to use a non-witness script. This must never happen." + ); + panic!("Tried to use a non-witness script. This must never happen."); + }, + } + } + + Ok(utxos) + } } impl Listen for Wallet {