Skip to content

Commit

Permalink
rbd: get lastsyncbytes and lastsycduration for volrep
Browse files Browse the repository at this point in the history
This commit get more information from the description
like lastsyncbytes and lastsyncduration and send them
as a response of getvolumereplicationinfo request.

Signed-off-by: Yati Padia <[email protected]>
  • Loading branch information
yati1998 committed Jun 12, 2023
1 parent ea19d63 commit 05392be
Show file tree
Hide file tree
Showing 6 changed files with 219 additions and 118 deletions.
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
module github.com/ceph/ceph-csi

go 1.20
go 1.19

require (
github.com/IBM/keyprotect-go-client v0.10.0
Expand All @@ -11,7 +11,7 @@ require (
github.com/ceph/go-ceph v0.21.0
github.com/container-storage-interface/spec v1.8.0
github.com/csi-addons/replication-lib-utils v0.2.0
github.com/csi-addons/spec v0.2.0
github.com/csi-addons/spec v0.2.1-0.20230606140122-d20966d2e444
github.com/gemalto/kmip-go v0.0.9
github.com/golang/protobuf v1.5.3
github.com/google/fscrypt v0.3.4
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -259,8 +259,8 @@ github.com/creack/pty v1.1.18/go.mod h1:MOBLtS5ELjhRRrroQr9kyvTxUAFNvYEK993ew/Vr
github.com/csi-addons/replication-lib-utils v0.2.0 h1:tGs42wfjkObbBo/98a3uxTFWEJ1dq5PIMqPWtdLd040=
github.com/csi-addons/replication-lib-utils v0.2.0/go.mod h1:ROQlEsc2EerVtc/K/C+6Hx8pqaQ9MVy9xFFpyKfI9lc=
github.com/csi-addons/spec v0.1.0/go.mod h1:Mwq4iLiUV4s+K1bszcWU6aMsR5KPsbIYzzszJ6+56vI=
github.com/csi-addons/spec v0.2.0 h1:Ews7bxpN9P6nFxl1XvMg87cR1wLROdH1FzSfLfb4VfI=
github.com/csi-addons/spec v0.2.0/go.mod h1:Mwq4iLiUV4s+K1bszcWU6aMsR5KPsbIYzzszJ6+56vI=
github.com/csi-addons/spec v0.2.1-0.20230606140122-d20966d2e444 h1:hWVCrZWVHctpWt6cQxV1I6dW3wpBDMg3Vrvu9uAuUxw=
github.com/csi-addons/spec v0.2.1-0.20230606140122-d20966d2e444/go.mod h1:Mwq4iLiUV4s+K1bszcWU6aMsR5KPsbIYzzszJ6+56vI=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
Expand Down
61 changes: 45 additions & 16 deletions internal/csi-addons/rbd/replication.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import (
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/types/known/durationpb"
"google.golang.org/protobuf/types/known/timestamppb"
)

Expand Down Expand Up @@ -719,18 +720,20 @@ func (rs *ReplicationServer) GetVolumeReplicationInfo(ctx context.Context,
}

description := remoteStatus.Description
lastSyncTime, err := getLastSyncTime(description)
lastSyncTime, lastSyncDuration, lastSyncBytes, err := getLastSyncInfo(description)
if err != nil {
if errors.Is(err, corerbd.ErrLastSyncTimeNotFound) {
return nil, status.Errorf(codes.NotFound, "failed to get last sync time: %v", err)
return nil, status.Errorf(codes.NotFound, "failed to get last sync info: %v", err)
}
log.ErrorLog(ctx, err.Error())

return nil, status.Errorf(codes.Internal, "failed to get last sync time: %v", err)
return nil, status.Errorf(codes.Internal, "failed to get last sync info: %v", err)
}

resp := &replication.GetVolumeReplicationInfoResponse{
LastSyncTime: lastSyncTime,
LastSyncTime: lastSyncTime,
LastSyncDuration: lastSyncDuration,
LastSyncBytes: lastSyncBytes,
}

return resp, nil
Expand All @@ -756,42 +759,68 @@ func RemoteStatus(gmis *librbd.GlobalMirrorImageStatus) (librbd.SiteMirrorImageS
return ss, err
}

// This function gets the local snapshot time from the description
// of localStatus and converts it into required type.
func getLastSyncTime(description string) (*timestamppb.Timestamp, error) {
// This function gets the local snapshot time, last sync snapshot seconds
// and last sync bytes from the description of localStatus and convert
// it into required types.
func getLastSyncInfo(description string) (*timestamppb.Timestamp, *durationpb.Duration, int64, error) {
// Format of the description will be as followed:
// description = "replaying,{"bytes_per_second":0.0,
// "bytes_per_snapshot":149504.0,"local_snapshot_timestamp":1662655501
// ,"remote_snapshot_timestamp":1662655501}"
// description = `replaying, {"bytes_per_second":0.0,"bytes_per_snapshot":81920.0,
// "last_snapshot_bytes":81920,"last_snapshot_sync_seconds":0,
// "local_snapshot_timestamp":1684675261,
// "remote_snapshot_timestamp":1684675261,"replay_state":"idle"}`
// In case there is no last snapshot bytes returns 0 as the
// LastSyncBytes is optional.
// In case there is no last snapshot sync seconds, it returns nil as the
// LastSyncDuration is optional.
// In case there is no local snapshot timestamp return an error as the
// LastSyncTime is required.

if description == "" {
return nil, fmt.Errorf("empty description: %w", corerbd.ErrLastSyncTimeNotFound)
return nil, nil, 0, fmt.Errorf("empty description: %w", corerbd.ErrLastSyncTimeNotFound)
}
splittedString := strings.SplitN(description, ",", 2)
if len(splittedString) == 1 {
return nil, fmt.Errorf("no local snapshot timestamp: %w", corerbd.ErrLastSyncTimeNotFound)
return nil, nil, 0, fmt.Errorf("no snapshot details: %w", corerbd.ErrLastSyncTimeNotFound)
}
type localStatus struct {
LocalSnapshotTime int64 `json:"local_snapshot_timestamp"`
LocalSnapshotTime int64 `json:"local_snapshot_timestamp"`
LocalSnapshotBytes int64 `json:"last_snapshot_bytes"`
LocalSnapshotDuration int64 `json:"last_snapshot_sync_seconds"`
}

var localSnapTime localStatus
err := json.Unmarshal([]byte(splittedString[1]), &localSnapTime)
if err != nil {
return nil, fmt.Errorf("failed to unmarshal description: %w", err)
return nil, nil, 0, fmt.Errorf("failed to unmarshal local snapshot time: %w", err)
}

// If the json unmarsal is successful but the local snapshot time is 0, we
// need to consider it as an error as the LastSyncTime is required.
if localSnapTime.LocalSnapshotTime == 0 {
return nil, fmt.Errorf("empty local snapshot timestamp: %w", corerbd.ErrLastSyncTimeNotFound)
return nil, nil, 0, fmt.Errorf("empty local snapshot timestamp: %w", corerbd.ErrLastSyncTimeNotFound)
}

lastUpdateTime := time.Unix(localSnapTime.LocalSnapshotTime, 0)
lastSyncTime := timestamppb.New(lastUpdateTime)

return lastSyncTime, nil
var localSnapBytes localStatus
errb := json.Unmarshal([]byte(splittedString[1]), &localSnapBytes)
if errb != nil {
return nil, nil, 0, fmt.Errorf("failed to unmarshal last snapshot bytes: %w", errb)
}

var localSnapDuration localStatus
errd := json.Unmarshal([]byte(splittedString[1]), &localSnapDuration)
if errd != nil {
return nil, nil, 0, fmt.Errorf("failed to unmarshal last snap sync seconds: %w", errd)
}

lastUpdateDuration := time.Unix(localSnapDuration.LocalSnapshotDuration, 0)
lastUpdateDuration1 := time.Until(lastUpdateDuration)
lastSyncDuration := durationpb.New(lastUpdateDuration1)

return lastSyncTime, lastSyncDuration, localSnapBytes.LocalSnapshotBytes, nil

}

func checkVolumeResyncStatus(localStatus librbd.SiteMirrorImageStatus) error {
Expand Down
55 changes: 45 additions & 10 deletions internal/csi-addons/rbd/replication_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (

librbd "github.com/ceph/go-ceph/rbd"
"github.com/ceph/go-ceph/rbd/admin"
"google.golang.org/protobuf/types/known/durationpb"
"google.golang.org/protobuf/types/known/timestamppb"
)

Expand Down Expand Up @@ -438,58 +439,92 @@ func TestCheckRemoteSiteStatus(t *testing.T) {
}
}

func TestValidateLastSyncTime(t *testing.T) {
func TestValidateLastSyncInfo(t *testing.T) {
t.Parallel()
tests := []struct {
name string
description string
timestamp *timestamppb.Timestamp
duration *durationpb.Duration
synctime *timestamppb.Timestamp
bytes int64
expectedErr string
}{
{
"valid description",
//nolint:lll // sample output cannot be split into multiple lines.
`replaying,{"bytes_per_second":0.0,"bytes_per_snapshot":149504.0,"local_snapshot_timestamp":1662655501,"remote_snapshot_timestamp":1662655501}`,
`replaying, {"bytes_per_second":0.0,"bytes_per_snapshot":81920.0,"last_snapshot_bytes":81920,"last_snapshot_sync_seconds":56743,"local_snapshot_timestamp":1684675261,"remote_snapshot_timestamp":1684675261,"replay_state":"idle"}`,
durationpb.New(time.Until(time.Unix(56743, 0))),
timestamppb.New(time.Unix(1662655501, 0)),
81920,
"",
},
{
"empty description",
"",
nil,
nil,
0,
corerbd.ErrLastSyncTimeNotFound.Error(),
},
{
"description without local_snapshot_timestamp",
`replaying,{"bytes_per_second":0.0,"bytes_per_snapshot":149504.0,"remote_snapshot_timestamp":1662655501}`,
"description without last_snapshot_bytes",
`replaying, {"bytes_per_second":0.0,"last_snapshot_sync_seconds":56743,"local_snapshot_timestamp":1684675261,"remote_snapshot_timestamp":1684675261,"replay_state":"idle"}`,
durationpb.New(time.Until(time.Unix(56743, 0))),
timestamppb.New(time.Unix(1662655501, 0)),
0,
"",
},
{
"description without local_snapshot_time",
`replaying, {"bytes_per_second":0.0,"bytes_per_snapshot":81920.0,"last_snapshot_sync_seconds":56743,"remote_snapshot_timestamp":1684675261,"replay_state":"idle"}`,
durationpb.New(time.Until(time.Unix(56743, 0))),
nil,
81920,
"",
},
{
"description without last_snapshot_sync_seconds",
`replaying, {"bytes_per_second":0.0,"bytes_per_snapshot":81920.0,"local_snapshot_timestamp":1684675261,"remote_snapshot_timestamp":1684675261,"replay_state":"idle"}`,
nil,
timestamppb.New(time.Unix(1662655501, 0)),
81920,
"",
},
{
"description with invalid JSON",
`replaying,{"bytes_per_second":0.0,"bytes_per_snapshot":149504.0","remote_snapshot_timestamp":1662655501`,
`replaying,{"bytes_per_second":0.0,"last_snapshot_bytes":81920","bytes_per_snapshot":149504.0","remote_snapshot_timestamp":1662655501`,
nil,
nil,
0,
"failed to unmarshal",
},
{
"description with no JSON",
`replaying`,
nil,
nil,
0,
corerbd.ErrLastSyncTimeNotFound.Error(),
},
}
for _, tt := range tests {
tt := tt
t.Run(tt.name, func(t *testing.T) {
t.Parallel()
ts, err := getLastSyncTime(tt.description)
ts, td, tb, err := getLastSyncInfo(tt.description)
if err != nil && !strings.Contains(err.Error(), tt.expectedErr) {
// returned error
t.Errorf("getLastSyncTime() returned error, expected: %v, got: %v",
t.Errorf("getLastSyncInfo() returned error, expected: %v, got: %v",
tt.expectedErr, err)
}
if !ts.AsTime().Equal(tt.timestamp.AsTime()) {
t.Errorf("getLastSyncTime() %v, expected %v", ts, tt.timestamp)
if ts != tt.synctime {
t.Errorf("getLastSyncInfo() %v, expected %v", ts, tt.bytes)
}
if td != tt.duration {
t.Errorf("getLastSyncInfo() %v, expected %v", td, tt.bytes)
}
if tb != tt.bytes {
t.Errorf("getLastSyncInfo() %v, expected %v", tb, tt.bytes)
}
})
}
Expand Down
Loading

0 comments on commit 05392be

Please sign in to comment.