Skip to content

Commit

Permalink
flowctl: add catalog status subcommand
Browse files Browse the repository at this point in the history
Adds `flowctl catalog status`, which uses the new status API to fetch the
current status of one or more live specs and print them to the terminal.
  • Loading branch information
psFried committed Dec 10, 2024
1 parent 21b28a1 commit 32b6dad
Show file tree
Hide file tree
Showing 3 changed files with 97 additions and 0 deletions.
29 changes: 29 additions & 0 deletions crates/flow-client/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,35 @@ impl Client {
self.user_access_token.is_some()
}

/// Performs a GET request to the control-plane agent HTTP API and returns a
/// result with either the deserialized response or an error.
pub async fn api_get<T: serde::de::DeserializeOwned>(
&self,
path: &str,
raw_query: &[(String, String)],
) -> anyhow::Result<T> {
let url = self.agent_endpoint.join(path)?;
let mut builder = self.http_client.get(url).query(raw_query);
// if let Some(query) = maybe_query {
// builder = builder.query(query);
// }
if let Some(token) = &self.user_access_token {
builder = builder.bearer_auth(token);
}
let request = builder.build()?;
tracing::debug!(url = %request.url(), method = "GET", "sending request");

let response = self.http_client.execute(request).await?;
let status = response.status();

if status.is_success() {
Ok(response.json().await?)
} else {
let body = response.text().await?;
anyhow::bail!("GET {path}: {status}: {body}");
}
}

pub async fn agent_unary<Request, Response>(
&self,
path: &str,
Expand Down
11 changes: 11 additions & 0 deletions crates/flowctl/src/catalog/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
mod delete;
mod publish;
mod pull_specs;
mod status;
mod test;

use crate::{
Expand Down Expand Up @@ -75,6 +76,15 @@ pub enum Command {
/// Once in your draft, use `draft develop` and `draft author` to
/// develop and author updates to the specification.
Draft(Draft),

/// Print status information for a given task or collection (beta).
///
/// Note: This command is still in beta and the output is likely to change in the future.
///
/// The status shows the current state of the live spec, as known to the
/// control plane. This does not yet include _shard_ status from the data
/// plane, so not all failures will be visible here.
Status(status::Status),
}

/// Common selection criteria based on the spec name.
Expand Down Expand Up @@ -216,6 +226,7 @@ impl Catalog {
Command::Test(source) => test::do_test(ctx, source).await,
Command::History(history) => do_history(ctx, history).await,
Command::Draft(draft) => do_draft(ctx, draft).await,
Command::Status(status) => status::do_controller_status(ctx, status).await,
}
}
}
Expand Down
57 changes: 57 additions & 0 deletions crates/flowctl/src/catalog/status.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
use models::status::StatusResponse;
use serde::Serialize;

use crate::output::{to_table_row, JsonCell};

#[derive(Debug, clap::Args)]
pub struct Status {
/// Names of the live specs to fetch the status of
pub catalog_names: Vec<String>,
}

pub async fn do_controller_status(
ctx: &mut crate::CliContext,
Status { catalog_names }: &Status,
) -> anyhow::Result<()> {
let query = catalog_names
.iter()
.map(|name| ("name".to_string(), name.clone()))
.collect::<Vec<_>>();
let resp: Vec<StatusResponse> = ctx.client.api_get("/api/v1/status", &query).await?;
ctx.write_all::<_, StatusOutput>(resp.into_iter().map(StatusOutput), ())?;
Ok(())
}

#[derive(Debug, Serialize)]
#[serde(transparent)]
pub struct StatusOutput(StatusResponse);

impl crate::output::CliOutput for StatusOutput {
type TableAlt = ();
type CellValue = JsonCell;

fn table_headers(_alt: Self::TableAlt) -> Vec<&'static str> {
vec![
"Name",
"Type",
"Controller Updated At",
"Live Spec Updated At",
"Controller Error",
"Failures",
]
}

fn into_table_row(self, _alt: Self::TableAlt) -> Vec<Self::CellValue> {
to_table_row(
&self.0,
&[
"/catalog_name",
"/spec_type",
"/controller_updated_at",
"/live_spec_updated_at",
"/controller_error",
"/failures",
],
)
}
}

0 comments on commit 32b6dad

Please sign in to comment.