Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Get*Capabilities and Probe common functions #17

Merged
merged 1 commit into from
Feb 20, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
115 changes: 115 additions & 0 deletions connection/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@ import (
"strings"
"time"

"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"

"github.com/container-storage-interface/spec/lib/go/csi"
"github.com/kubernetes-csi/csi-lib-utils/protosanitizer"
"google.golang.org/grpc"
Expand All @@ -33,6 +36,9 @@ import (
const (
// Interval of logging connection errors
connectionLoggingInterval = 10 * time.Second

// Interval of trying to call Probe() until it succeeds
probeInterval = 1 * time.Second
)

// Connect opens insecure gRPC connection to a CSI driver. Address must be either absolute path to UNIX domain socket
Expand Down Expand Up @@ -163,6 +169,7 @@ func LogGRPC(ctx context.Context, method string, req, reply interface{}, cc *grp
return err
}

// GetDriverName returns name of CSI driver.
func GetDriverName(ctx context.Context, conn *grpc.ClientConn) (string, error) {
client := csi.NewIdentityClient(conn)

Expand All @@ -177,3 +184,111 @@ func GetDriverName(ctx context.Context, conn *grpc.ClientConn) (string, error) {
}
return name, nil
}

// PluginCapabilitySet is set of CSI plugin capabilities. Only supported capabilities are in the map.
type PluginCapabilitySet map[csi.PluginCapability_Service_Type]bool

// GetPluginCapabilities returns set of supported capabilities of CSI driver.
func GetPluginCapabilities(ctx context.Context, conn *grpc.ClientConn) (PluginCapabilitySet, error) {
client := csi.NewIdentityClient(conn)
req := csi.GetPluginCapabilitiesRequest{}
rsp, err := client.GetPluginCapabilities(ctx, &req)
if err != nil {
return nil, err
}
caps := PluginCapabilitySet{}
for _, cap := range rsp.GetCapabilities() {
if cap == nil {
continue
}
srv := cap.GetService()
if srv == nil {
continue
}
t := srv.GetType()
caps[t] = true
}
return caps, nil
}

// ControllerCapabilitySet is set of CSI controller capabilities. Only supported capabilities are in the map.
type ControllerCapabilitySet map[csi.ControllerServiceCapability_RPC_Type]bool

// GetControllerCapabilities returns set of supported controller capabilities of CSI driver.
func GetControllerCapabilities(ctx context.Context, conn *grpc.ClientConn) (ControllerCapabilitySet, error) {
client := csi.NewControllerClient(conn)
req := csi.ControllerGetCapabilitiesRequest{}
rsp, err := client.ControllerGetCapabilities(ctx, &req)
if err != nil {
return nil, err
}

caps := ControllerCapabilitySet{}
for _, cap := range rsp.GetCapabilities() {
if cap == nil {
continue
}
rpc := cap.GetRpc()
if rpc == nil {
continue
}
t := rpc.GetType()
caps[t] = true
}
return caps, nil
}

// ProbeForever calls Probe() of a CSI driver and waits until the driver becomes ready.
// Any error other than timeout is returned.
func ProbeForever(conn *grpc.ClientConn, singleProbeTimeout time.Duration) error {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does the singleProbeTimeout need to be configurable?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some drivers may require longer timeout for their checks and only driver author knows that. Traditionally we use the same timeout as for the real CSI call (ControllerPublish, CreateVolume).

for {
klog.Info("Probing CSI driver for readiness")
ready, err := probeOnce(conn, singleProbeTimeout)
if err != nil {
st, ok := status.FromError(err)
if !ok {
// This is not gRPC error. The probe must have failed before gRPC
// method was called, otherwise we would get gRPC error.
return fmt.Errorf("CSI driver probe failed: %s", err)
}
if st.Code() != codes.DeadlineExceeded {
return fmt.Errorf("CSI driver probe failed: %s", err)
}
// Timeout -> driver is not ready. Fall through to sleep() below.
klog.Warning("CSI driver probe timed out")
} else {
if ready {
return nil
}
klog.Warning("CSI driver is not ready")
}
// Timeout was returned or driver is not ready.
time.Sleep(probeInterval)
}
}

// probeOnce is a helper to simplify defer cancel()
func probeOnce(conn *grpc.ClientConn, timeout time.Duration) (bool, error) {
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()
return Probe(ctx, conn)
}

// Probe calls driver Probe() just once and returns its result without any processing.
func Probe(ctx context.Context, conn *grpc.ClientConn) (ready bool, err error) {
client := csi.NewIdentityClient(conn)

req := csi.ProbeRequest{}
rsp, err := client.Probe(ctx, &req)

if err != nil {
return false, err
}

r := rsp.GetReady()
if r == nil {
// "If not present, the caller SHALL assume that the plugin is in a ready state"
return true, nil
}
return r.GetValue(), nil
}
Loading