Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(plugin): add async io for the plugin #116

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion plugin/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,11 @@ keywords = [ "plugin", "cln", "rpc", "lightning", "bitcoin" ]
readme = "README.md"

[dependencies]
clightningrpc-common = { version = "0.3.0-beta.4" }
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
clightningrpc-common = { version = "0.3.0-beta.4" }
log = { version = "0.4.17", optional = true }
mio = { version = "0.8.10", features = ["os-ext"] }

[features]
log = ["dep:log"]
7 changes: 5 additions & 2 deletions plugin/examples/foo_plugin.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
extern crate clightningrpc_plugin;

use std::io;

use clightningrpc_plugin::types::LogLevel;
use clightningrpc_plugin::{commands::RPCCommand, errors::PluginError, plugin::Plugin};
use serde_json::{json, Value};
Expand Down Expand Up @@ -35,7 +37,7 @@ impl RPCCommand<PluginState> for OnShutdown {
}
}

fn main() {
fn main() -> io::Result<()> {
let plugin = Plugin::<PluginState>::new(PluginState(()), true)
.add_rpc_method(
"hello",
Expand All @@ -56,5 +58,6 @@ fn main() {
json!({})
})
.clone();
plugin.start();
plugin.start()?;
Ok(())
}
11 changes: 10 additions & 1 deletion plugin/src/commands/builtin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,21 +51,30 @@ impl<T: Clone> RPCCommand<T> for ManifestRPC {
#[derive(Clone)]
/// Type to define the init method and its attributes, used in plugin
pub struct InitRPC<T: 'static + Clone> {
#[allow(clippy::type_complexity)]
pub(crate) on_init: Option<Arc<dyn Fn(&mut Plugin<T>) -> Value>>,
}

impl<T: Clone> InitRPC<T> {
fn parse_option(&self, plugin: &mut Plugin<T>, options: &HashMap<String, serde_json::Value>) {
for option_name in options.keys() {
// SAFETY: We are iterating over the key this never None
#[allow(clippy::unwrap_used)]
let option = options.get(option_name).unwrap();
plugin.option.get_mut(option_name).unwrap().value = Some(option.to_owned());
// SAFETY: we put them into it so it is safe to unwrap.
// If we panic this mean that there is a bug
#[allow(clippy::unwrap_used)]
let opt = plugin.option.get_mut(option_name).unwrap();
opt.value = Some(option.to_owned());
}
}
}

impl<T: Clone> RPCCommand<T> for InitRPC<T> {
fn call<'c>(&self, plugin: &mut Plugin<T>, request: Value) -> Result<Value, PluginError> {
let mut response = init_payload();
// SAFETY: Shouwl be valid json so should be safe to unwrap
#[allow(clippy::unwrap_used)]
let init: InitConf = serde_json::from_value(request.to_owned()).unwrap();
plugin.configuration = Some(init.configuration);
self.parse_option(plugin, &init.options);
Expand Down
4 changes: 2 additions & 2 deletions plugin/src/commands/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use crate::errors::PluginError;
/// in contrast, it is more complex but the plugin_macros package will help to simplify the API.
pub trait RPCCommand<T: Clone>: RPCCommandClone<T> {
/// call is a generic method that it is used to simulate the callback.
fn call<'c>(
fn call(
&self,
_: &mut Plugin<T>,
_: serde_json::Value,
Expand All @@ -24,7 +24,7 @@ pub trait RPCCommand<T: Clone>: RPCCommandClone<T> {
}

/// void call is a generic method that it is used to simulate a callback with a void return type
fn call_void<'c>(&self, _plugin: &mut Plugin<T>, _request: &'c serde_json::Value) {}
fn call_void(&self, _plugin: &mut Plugin<T>, _request: &serde_json::Value) {}
}

// Splitting RPCCommandClone into its own trait allows us to provide a blanket
Expand Down
116 changes: 116 additions & 0 deletions plugin/src/io.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
//! async io module of the plugin io.
//!
//! Vincenzo Palazzo <[email protected]>
use std::io::Write;
use std::io::{self, Read};
use std::os::fd::AsRawFd;
use std::time::Duration;

const SERVER_TOKEN: mio::Token = mio::Token(0);

pub struct AsyncIO {
poll: mio::Poll,
events: mio::Events,
}

impl AsyncIO {
pub fn new() -> io::Result<Self> {
Ok(Self {
poll: mio::Poll::new()?,
events: mio::Events::with_capacity(1024),
})
}

pub fn register(&self) -> io::Result<()> {
let stdin = std::io::stdin().as_raw_fd();
let mut stdin = mio::unix::SourceFd(&stdin);
self.poll
.registry()
.register(&mut stdin, SERVER_TOKEN, mio::Interest::READABLE)?;
Ok(())
}

#[allow(clippy::wrong_self_convention)]
pub fn into_loop<F>(&mut self, mut async_callback: F) -> io::Result<()>
where
F: FnMut(String) -> Option<String>,
{
loop {
self.poll
.poll(&mut self.events, Some(Duration::from_millis(100)))?;
for event in self.events.iter() {
match event.token() {
SERVER_TOKEN => {
if event.is_readable() {
self.handle_connection(&mut async_callback)?;
}
}
_ => unreachable!(),
}
}
}
}

fn handle_connection<F>(&self, async_callback: &mut F) -> io::Result<()>
where
F: FnMut(String) -> Option<String>,
{
loop {
let mut reader = io::stdin().lock();
let mut buffer = String::new();
loop {
let mut byte = [0; 1];
crate::poll_check!(reader.read_exact(&mut byte))?;

// Append the byte to the buffer
buffer.push(byte[0] as char);

// Check if the buffer ends with double newline
if buffer.ends_with("\n\n") {
break; // Exit the loop
}
}

if let Some(resp) = async_callback(buffer.clone()) {
let mut writer = io::stdout().lock();
crate::poll_check!(writer.write_all(resp.as_bytes()))?;
crate::poll_check!(writer.flush())?;
}
}
Ok(())
}
}

#[macro_export]
macro_rules! poll_check {
($expr:expr) => {{
match $expr {
Ok(val) => Ok::<_, std::io::Error>(val),
Err(ref err) if err.kind() == std::io::ErrorKind::WouldBlock => {
// Handle WouldBlock error
// For example, wait for readiness event and retry
// You may need to use mio's event loop to wait for readiness
// and then retry the operation
// For simplicity, we'll just continue the loop here
break;
}
Err(err) => Err(err.into()),
}
}};
}

#[macro_export]
macro_rules! poll_loop {
($code:block) => {{
while let Err(ref err) = $code {
if err.kind() == std::io::ErrorKind::WouldBlock {
// Handle WouldBlock error
// For example, wait for readiness event and retry
// You may need to use mio's event loop to wait for readiness
// and then retry the operation
// For simplicity, we'll just continue the loop here
continue;
}
}
}};
}
2 changes: 2 additions & 0 deletions plugin/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,10 @@
//!
//! author and mantainer: Vincenzo Palazzo https://github.com/vincenzopalazzo
#![crate_name = "clightningrpc_plugin"]
#![deny(clippy::unwrap_used)]
pub mod commands;
pub mod errors;
mod io;
pub mod macros;
pub mod plugin;
pub mod types;
83 changes: 51 additions & 32 deletions plugin/src/plugin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,19 @@
use std::collections::{HashMap, HashSet};
use std::io;
use std::io::Write;
use std::string::String;

Check warning on line 7 in plugin/src/plugin.rs

View workflow job for this annotation

GitHub Actions / Build (nightly)

the item `String` is imported redundantly

Check warning on line 7 in plugin/src/plugin.rs

View workflow job for this annotation

GitHub Actions / Build (nightly)

the item `String` is imported redundantly

Check warning on line 7 in plugin/src/plugin.rs

View workflow job for this annotation

GitHub Actions / Build (beta)

the item `String` is imported redundantly

Check warning on line 7 in plugin/src/plugin.rs

View workflow job for this annotation

GitHub Actions / Build (beta)

the item `String` is imported redundantly

Check warning on line 7 in plugin/src/plugin.rs

View workflow job for this annotation

GitHub Actions / Build (nightly)

the item `String` is imported redundantly

Check warning on line 7 in plugin/src/plugin.rs

View workflow job for this annotation

GitHub Actions / Build (nightly)

the item `String` is imported redundantly

Check warning on line 7 in plugin/src/plugin.rs

View workflow job for this annotation

GitHub Actions / Build (beta)

the item `String` is imported redundantly

Check warning on line 7 in plugin/src/plugin.rs

View workflow job for this annotation

GitHub Actions / Build (beta)

the item `String` is imported redundantly
use std::sync::Arc;

use serde_json::Value;

use clightningrpc_common::json_utils::{add_str, init_payload, init_success_response};
use clightningrpc_common::types::Request;
use serde_json::Value;

use crate::commands::builtin::{InitRPC, ManifestRPC};
use crate::commands::types::{CLNConf, RPCHookInfo, RPCMethodInfo};
use crate::commands::RPCCommand;
use crate::errors::PluginError;
use crate::io::AsyncIO;
use crate::types::{LogLevel, RpcOption};

#[cfg(feature = "log")]
Expand Down Expand Up @@ -50,7 +52,7 @@
/// core lightning configuration sent with the init call.
pub configuration: Option<CLNConf>,
/// onInit callback called when the method on init is ran.
on_init: Option<Arc<dyn Fn(&mut Plugin<T>) -> Value>>,

Check warning on line 55 in plugin/src/plugin.rs

View workflow job for this annotation

GitHub Actions / clippy

very complex type used. Consider factoring parts into `type` definitions

warning: very complex type used. Consider factoring parts into `type` definitions --> plugin/src/plugin.rs:55:14 | 55 | on_init: Option<Arc<dyn Fn(&mut Plugin<T>) -> Value>>, | ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ | = help: for further information visit https://rust-lang.github.io/rust-clippy/master/index.html#type_complexity = note: `#[warn(clippy::type_complexity)]` on by default
}

#[cfg(feature = "log")]
Expand All @@ -64,10 +66,10 @@

fn log(&self, record: &Record) {
if self.enabled(record.metadata()) {
let mut writer = io::stdout().lock();
let level: LogLevel = record.level().into();
let msg = record.args();

let mut writer = io::stdout();
let mut payload = init_payload();
add_str(&mut payload, "level", &level.to_string());
add_str(&mut payload, "message", &format!("{msg}"));
Expand All @@ -77,10 +79,11 @@
method: "log".to_owned(),
params: payload,
};
writer
.write_all(serde_json::to_string(&request).unwrap().as_bytes())
.unwrap();
writer.flush().unwrap();

crate::poll_loop!({
writer.write_all(serde_json::to_string(&request).unwrap().as_bytes())

Check failure on line 84 in plugin/src/plugin.rs

View workflow job for this annotation

GitHub Actions / clippy

used `unwrap()` on a `Result` value

error: used `unwrap()` on a `Result` value --> plugin/src/plugin.rs:84:34 | 84 | writer.write_all(serde_json::to_string(&request).unwrap().as_bytes()) | ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ | = note: if this value is an `Err`, it will panic = help: consider using `expect()` to provide a better panic message = help: for further information visit https://rust-lang.github.io/rust-clippy/master/index.html#unwrap_used note: the lint level is defined here --> plugin/src/lib.rs:10:9 | 10 | #![deny(clippy::unwrap_used)] | ^^^^^^^^^^^^^^^^^^^
});
crate::poll_loop!({ writer.flush() });
}
}

Expand Down Expand Up @@ -112,7 +115,7 @@
}

pub fn log(&self, level: LogLevel, msg: &str) {
let mut writer = io::stdout();
let mut writer = io::stdout().lock();
let mut payload = init_payload();
add_str(&mut payload, "level", &level.to_string());
add_str(&mut payload, "message", msg);
Expand All @@ -122,10 +125,12 @@
method: "log".to_owned(),
params: payload,
};
writer
.write_all(serde_json::to_string(&request).unwrap().as_bytes())
.unwrap();
writer.flush().unwrap();
crate::poll_loop!({
// SAFETY: it is valid json and if we panic there is a buf somewhere
#[allow(clippy::unwrap_used)]
writer.write_all(serde_json::to_string(&request).unwrap().as_bytes())
});
crate::poll_loop!({ writer.flush() });
}

/// register the plugin option.
Expand All @@ -139,10 +144,14 @@
) -> &mut Self {
let def_val = match opt_type {
"flag" | "bool" => {
def_val.and_then(|val| Some(serde_json::json!(val.parse::<bool>().unwrap())))
// FIXME: remove unwrap and return the error
#[allow(clippy::unwrap_used)]
def_val.map(|val| serde_json::json!(val.parse::<bool>().unwrap()))
}
"int" => def_val.and_then(|val| Some(serde_json::json!(val.parse::<i64>().unwrap()))),
"string" => def_val.and_then(|val| Some(serde_json::json!(val))),
// FIXME: remove unwrap and return the error
#[allow(clippy::unwrap_used)]
"int" => def_val.map(|val| serde_json::json!(val.parse::<i64>().unwrap())),
"string" => def_val.map(|val| serde_json::json!(val)),
_ => unreachable!("{opt_type} not supported"),
};
self.option.insert(
Expand Down Expand Up @@ -209,6 +218,9 @@
}

fn handle_notification(&'a mut self, name: &str, params: serde_json::Value) {
// SAFETY: we register the notification and if we do not have inside the map
// this is a bug.
#[allow(clippy::unwrap_used)]
let notification = self.rpc_notification.get(name).unwrap().clone();
notification.call_void(self, &params);
}
Expand Down Expand Up @@ -250,22 +262,21 @@
match result {
Ok(json_resp) => response["result"] = json_resp.to_owned(),
Err(json_err) => {
// SAFETY: should be valud json
#[allow(clippy::unwrap_used)]
let err_resp = serde_json::to_value(json_err).unwrap();
response["error"] = err_resp;
}
}
}

pub fn start(mut self) {
let reader = io::stdin();
let mut writer = io::stdout();
let mut buffer = String::new();
pub fn start(mut self) -> io::Result<()> {
#[cfg(feature = "log")]
{
use std::str::FromStr;
// We are compatible with the cln-plugin
let level = std::env::var("CLN_PLUGIN_LOG").unwrap_or("info".to_string());
let level = LevelFilter::from_str(&level).unwrap();

Check failure on line 279 in plugin/src/plugin.rs

View workflow job for this annotation

GitHub Actions / clippy

used `unwrap()` on a `Result` value

error: used `unwrap()` on a `Result` value --> plugin/src/plugin.rs:279:25 | 279 | let level = LevelFilter::from_str(&level).unwrap(); | ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ | = note: if this value is an `Err`, it will panic = help: consider using `expect()` to provide a better panic message = help: for further information visit https://rust-lang.github.io/rust-clippy/master/index.html#unwrap_used
let _ = log::set_logger(&Log {}).map(|()| log::set_max_level(level));
}
self.rpc_method
Expand All @@ -276,29 +287,37 @@
on_init: self.on_init.clone(),
}),
);
// FIXME: core lightning end with the double endline, so this can cause
// problem for some input reader.
// we need to parse the writer, and avoid this while loop
while let Ok(_) = reader.read_line(&mut buffer) {
let req_str = buffer.to_string();
buffer.clear();
let Ok(request) = serde_json::from_str::<Request<serde_json::Value>>(&req_str) else {
continue;
};
let mut asyncio = AsyncIO::new()?;
asyncio.register()?;
asyncio.into_loop(|buffer| {
#[cfg(feature = "log")]
log::info!("looping around the string: {buffer}");
// SAFETY: should be valud json
#[allow(clippy::unwrap_used)]
let request: Request<serde_json::Value> = serde_json::from_str(&buffer).unwrap();
if let Some(id) = request.id {
// when the id is specified this is a RPC or Hook, so we need to return a response
let response = self.call_rpc_method(&request.method, request.params);
let mut rpc_response = init_success_response(id);
self.write_respose(&response, &mut rpc_response);
writer
.write_all(serde_json::to_string(&rpc_response).unwrap().as_bytes())
.unwrap();
writer.flush().unwrap();
#[cfg(feature = "log")]
log::info!(
"rpc or hook: {} with reponse {:?}",
request.method,
rpc_response
);
// SAFETY: should be valud json
#[allow(clippy::unwrap_used)]
Some(serde_json::to_string(&rpc_response).unwrap())
} else {
// in case of the id is None, we are receiving the notification, so the server is not
// interested in the answer.
self.handle_notification(&request.method, request.params);
#[cfg(feature = "log")]
log::info!("notification: {}", request.method);
None
}
}
})?;
Ok(())
}
}
2 changes: 1 addition & 1 deletion plugin_macros/src/attr_parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ fn parse_key_values(
trace!(tracer, "removing the `,` tok");
check!(",", stream.advance())?;
}
let value = value.to_string().replace("\"", "");
let value = value.to_string().replace('\"', "");
trace!(tracer, "key {key} = value {value}");
hash_map.insert(key.to_string(), value.to_string());
trace!(tracer, "map is {:?}", hash_map);
Expand Down
Loading
Loading