Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add lightning-liquidity crate to the workspace #3436

Open
wants to merge 15 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ members = [
"lightning-transaction-sync",
"lightning-macros",
"lightning-dns-resolver",
"lightning-liquidity",
"possiblyrandom",
]

Expand Down
6 changes: 5 additions & 1 deletion ci/ci-tests.sh
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,9 @@ PIN_RELEASE_DEPS # pin the release dependencies in our main workspace
# Starting with version 0.5.9 (there is no .6-.8), the `home` crate has an MSRV of rustc 1.70.0.
[ "$RUSTC_MINOR_VERSION" -lt 70 ] && cargo update -p home --precise "0.5.5" --verbose

# proptest 1.3.0 requires rustc 1.64.0
[ "$RUSTC_MINOR_VERSION" -lt 64 ] && cargo update -p proptest --precise "1.2.0" --verbose

export RUST_BACKTRACE=1

echo -e "\n\nChecking the full workspace."
Expand All @@ -55,6 +58,7 @@ WORKSPACE_MEMBERS=(
lightning-transaction-sync
lightning-macros
lightning-dns-resolver
lightning-liquidity
possiblyrandom
)

Expand Down Expand Up @@ -107,7 +111,7 @@ echo -e "\n\nTest backtrace-debug builds"
cargo test -p lightning --verbose --color always --features backtrace

echo -e "\n\nTesting no_std builds"
for DIR in lightning-invoice lightning-rapid-gossip-sync; do
for DIR in lightning-invoice lightning-rapid-gossip-sync lightning-liquidity; do
cargo test -p $DIR --verbose --color always --no-default-features
done

Expand Down
50 changes: 50 additions & 0 deletions lightning-liquidity/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
[package]
name = "lightning-liquidity"
version = "0.1.0-alpha.6"
authors = ["John Cantrell <[email protected]>", "Elias Rohrer <[email protected]>"]
homepage = "https://lightningdevkit.org/"
license = "MIT OR Apache-2.0"
edition = "2021"
description = "Types and primitives to integrate a spec-compliant LSP with an LDK-based node."
repository = "https://github.com/lightningdevkit/lightning-liquidity/"
readme = "README.md"
keywords = ["bitcoin", "lightning", "ldk", "bdk"]
categories = ["cryptography::cryptocurrencies"]

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[features]
default = ["std"]
std = ["lightning/std"]
backtrace = ["dep:backtrace"]

[dependencies]
lightning = { version = "0.0.124", path = "../lightning", default-features = false }
lightning-types = { version = "0.1", path = "../lightning-types", default-features = false }
lightning-invoice = { version = "0.32.0", path = "../lightning-invoice", default-features = false, features = ["serde"] }

bitcoin = { version = "0.32.2", default-features = false, features = ["serde"] }

chrono = { version = "0.4", default-features = false, features = ["serde", "alloc"] }
TheBlueMatt marked this conversation as resolved.
Show resolved Hide resolved
serde = { version = "1.0", default-features = false, features = ["derive", "alloc"] }
serde_json = "1.0"
tnull marked this conversation as resolved.
Show resolved Hide resolved
backtrace = { version = "0.3", optional = true }

[dev-dependencies]
lightning = { version = "0.0.124", path = "../lightning", default-features = false, features = ["_test_utils"] }
lightning-invoice = { version = "0.32.0", path = "../lightning-invoice", default-features = false, features = ["serde", "std"] }
lightning-persister = { version = "0.0.124", path = "../lightning-persister", default-features = false }
lightning-background-processor = { version = "0.0.124", path = "../lightning-background-processor", default-features = false, features = ["std"] }

proptest = "1.0.0"
tokio = { version = "1.35", default-features = false, features = [ "rt-multi-thread", "time", "sync", "macros" ] }

[lints.rust.unexpected_cfgs]
level = "forbid"
# When adding a new cfg attribute, ensure that it is added to this list.
check-cfg = [
"cfg(lsps1_service)",
"cfg(c_bindings)",
"cfg(backtrace)",
"cfg(ldk_bench)",
]
20 changes: 20 additions & 0 deletions lightning-liquidity/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
# lightning-liquidity

The goal of this crate is to provide types and primitives to integrate a spec-compliant LSP with an LDK-based node. To this end, this crate provides client-side as well as service-side logic to implement the [LSP specifications].

Currently the following specifications are supported:
- [LSPS0] defines the transport protocol with the LSP over which the other protocols communicate.
- [LSPS1] allows to order Lightning channels from an LSP. This is useful when the client needs
inbound Lightning liquidity for which they are willing and able to pay in bitcoin.
- [LSPS2] allows to generate a special invoice for which, when paid, an LSP will open a "just-in-time".
This is useful for the initial on-boarding of clients as the channel opening fees are deducted
from the incoming payment, i.e., no funds are required client-side to initiate this flow.

To get started, you'll want to setup a `LiquidityManager` and configure it to be the `CustomMessageHandler` of your LDK node. You can then call `LiquidityManager::lsps1_client_handler` / `LiquidityManager::lsps2_client_handler`, or `LiquidityManager::lsps2_service_handler`, to access the respective client-side or service-side handlers.

`LiquidityManager` uses an eventing system to notify the user about important updates to the protocol flow. To this end, you will need to handle events emitted via one of the event handling methods provided by `LiquidityManager`, e.g., `LiquidityManager::next_event`.

[LSP specifications]: https://github.com/BitcoinAndLightningLayerSpecs/lsp
[LSPS0]: https://github.com/BitcoinAndLightningLayerSpecs/lsp/tree/main/LSPS0
[LSPS1]: https://github.com/BitcoinAndLightningLayerSpecs/lsp/tree/main/LSPS1
[LSPS2]: https://github.com/BitcoinAndLightningLayerSpecs/lsp/tree/main/LSPS2
245 changes: 245 additions & 0 deletions lightning-liquidity/src/events.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,245 @@
// This file is Copyright its original authors, visible in version control
// history.
//
// This file is licensed under the Apache License, Version 2.0 <LICENSE-APACHE
// or http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your option.
// You may not use this file except in accordance with one or both of these
// licenses.

//! Events are surfaced by the library to indicate some action must be taken
//! by the end-user.
//!
//! Because we don't have a built-in runtime, it's up to the end-user to poll
//! [`LiquidityManager::get_and_clear_pending_events`] to receive events.
//!
//! [`LiquidityManager::get_and_clear_pending_events`]: crate::LiquidityManager::get_and_clear_pending_events

use crate::lsps0;
use crate::lsps1;
use crate::lsps2;
use crate::prelude::{Vec, VecDeque};
use crate::sync::{Arc, Mutex};

use core::future::Future;
use core::task::{Poll, Waker};

/// The maximum queue size we allow before starting to drop events.
pub const MAX_EVENT_QUEUE_SIZE: usize = 1000;

pub(crate) struct EventQueue {
queue: Arc<Mutex<VecDeque<Event>>>,
waker: Arc<Mutex<Option<Waker>>>,
#[cfg(feature = "std")]
condvar: crate::sync::Condvar,
}

impl EventQueue {
pub fn new() -> Self {
let queue = Arc::new(Mutex::new(VecDeque::new()));
let waker = Arc::new(Mutex::new(None));
#[cfg(feature = "std")]
{
let condvar = crate::sync::Condvar::new();
Self { queue, waker, condvar }
}
#[cfg(not(feature = "std"))]
Self { queue, waker }
}

pub fn enqueue(&self, event: Event) {
tnull marked this conversation as resolved.
Show resolved Hide resolved
{
let mut queue = self.queue.lock().unwrap();
if queue.len() < MAX_EVENT_QUEUE_SIZE {
queue.push_back(event);
} else {
return;
}
}

if let Some(waker) = self.waker.lock().unwrap().take() {
waker.wake();
}
#[cfg(feature = "std")]
self.condvar.notify_one();
}

pub fn next_event(&self) -> Option<Event> {
self.queue.lock().unwrap().pop_front()
}

pub async fn next_event_async(&self) -> Event {
EventFuture { event_queue: Arc::clone(&self.queue), waker: Arc::clone(&self.waker) }.await
}

#[cfg(feature = "std")]
pub fn wait_next_event(&self) -> Event {
let mut queue = self
.condvar
.wait_while(self.queue.lock().unwrap(), |queue: &mut VecDeque<Event>| queue.is_empty())
.unwrap();

let event = queue.pop_front().expect("non-empty queue");
let should_notify = !queue.is_empty();

drop(queue);

if should_notify {
if let Some(waker) = self.waker.lock().unwrap().take() {
waker.wake();
}

self.condvar.notify_one();
}

event
}

pub fn get_and_clear_pending_events(&self) -> Vec<Event> {
self.queue.lock().unwrap().split_off(0).into()
}
}

/// An event which you should probably take some action in response to.
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum Event {
tnull marked this conversation as resolved.
Show resolved Hide resolved
/// An LSPS0 client event.
LSPS0Client(lsps0::event::LSPS0ClientEvent),
/// An LSPS1 (Channel Request) client event.
LSPS1Client(lsps1::event::LSPS1ClientEvent),
/// An LSPS1 (Channel Request) server event.
#[cfg(lsps1_service)]
LSPS1Service(lsps1::event::LSPS1ServiceEvent),
/// An LSPS2 (JIT Channel) client event.
LSPS2Client(lsps2::event::LSPS2ClientEvent),
/// An LSPS2 (JIT Channel) server event.
LSPS2Service(lsps2::event::LSPS2ServiceEvent),
}

struct EventFuture {
event_queue: Arc<Mutex<VecDeque<Event>>>,
waker: Arc<Mutex<Option<Waker>>>,
}

impl Future for EventFuture {
type Output = Event;

fn poll(
self: core::pin::Pin<&mut Self>, cx: &mut core::task::Context<'_>,
) -> core::task::Poll<Self::Output> {
if let Some(event) = self.event_queue.lock().unwrap().pop_front() {
Poll::Ready(event)
} else {
*self.waker.lock().unwrap() = Some(cx.waker().clone());
Poll::Pending
}
}
}

#[cfg(test)]
mod tests {
#[tokio::test]
#[cfg(feature = "std")]
async fn event_queue_works() {
use super::*;
use crate::lsps0::event::LSPS0ClientEvent;
use bitcoin::secp256k1::{PublicKey, Secp256k1, SecretKey};
use core::sync::atomic::{AtomicU16, Ordering};
use std::sync::Arc;
use std::time::Duration;

let event_queue = Arc::new(EventQueue::new());
assert_eq!(event_queue.next_event(), None);

let secp_ctx = Secp256k1::new();
let counterparty_node_id =
PublicKey::from_secret_key(&secp_ctx, &SecretKey::from_slice(&[42; 32]).unwrap());
let expected_event = Event::LSPS0Client(LSPS0ClientEvent::ListProtocolsResponse {
counterparty_node_id,
protocols: Vec::new(),
});

for _ in 0..3 {
event_queue.enqueue(expected_event.clone());
}

assert_eq!(event_queue.wait_next_event(), expected_event);
assert_eq!(event_queue.next_event_async().await, expected_event);
assert_eq!(event_queue.next_event(), Some(expected_event.clone()));
assert_eq!(event_queue.next_event(), None);

// Check `next_event_async` won't return if the queue is empty and always rather timeout.
tokio::select! {
_ = tokio::time::sleep(Duration::from_millis(10)) => {
// Timeout
}
_ = event_queue.next_event_async() => {
panic!();
}
}
assert_eq!(event_queue.next_event(), None);

// Check we get the expected number of events when polling/enqueuing concurrently.
let enqueued_events = AtomicU16::new(0);
let received_events = AtomicU16::new(0);
let mut delayed_enqueue = false;

for _ in 0..25 {
event_queue.enqueue(expected_event.clone());
enqueued_events.fetch_add(1, Ordering::SeqCst);
}

loop {
tokio::select! {
_ = tokio::time::sleep(Duration::from_millis(10)), if !delayed_enqueue => {
event_queue.enqueue(expected_event.clone());
enqueued_events.fetch_add(1, Ordering::SeqCst);
delayed_enqueue = true;
}
e = event_queue.next_event_async() => {
assert_eq!(e, expected_event);
received_events.fetch_add(1, Ordering::SeqCst);

event_queue.enqueue(expected_event.clone());
enqueued_events.fetch_add(1, Ordering::SeqCst);
}
e = event_queue.next_event_async() => {
assert_eq!(e, expected_event);
received_events.fetch_add(1, Ordering::SeqCst);
}
}

if delayed_enqueue
&& received_events.load(Ordering::SeqCst) == enqueued_events.load(Ordering::SeqCst)
{
break;
}
}
assert_eq!(event_queue.next_event(), None);

// Check we operate correctly, even when mixing and matching blocking and async API calls.
let (tx, mut rx) = tokio::sync::watch::channel(());
let thread_queue = Arc::clone(&event_queue);
let thread_event = expected_event.clone();
std::thread::spawn(move || {
let e = thread_queue.wait_next_event();
assert_eq!(e, thread_event);
tx.send(()).unwrap();
});

let thread_queue = Arc::clone(&event_queue);
let thread_event = expected_event.clone();
std::thread::spawn(move || {
// Sleep a bit before we enqueue the events everybody is waiting for.
std::thread::sleep(Duration::from_millis(20));
thread_queue.enqueue(thread_event.clone());
thread_queue.enqueue(thread_event.clone());
});

let e = event_queue.next_event_async().await;
assert_eq!(e, expected_event.clone());

rx.changed().await.unwrap();
assert_eq!(event_queue.next_event(), None);
}
}
Loading
Loading