Skip to content

Commit

Permalink
rbd: get lastsyncbytes and lastsycduration for volrep
Browse files Browse the repository at this point in the history
This commit get more information from the description
like lastsyncbytes and lastsyncduration and send them
as a response of getvolumereplicationinfo request.

Signed-off-by: Yati Padia <[email protected]>
  • Loading branch information
yati1998 committed Jun 21, 2023
1 parent 306bfa2 commit f41a361
Show file tree
Hide file tree
Showing 6 changed files with 265 additions and 138 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ require (
github.com/ceph/go-ceph v0.21.0
github.com/container-storage-interface/spec v1.8.0
github.com/csi-addons/replication-lib-utils v0.2.0
github.com/csi-addons/spec v0.2.0
github.com/csi-addons/spec v0.2.1-0.20230606140122-d20966d2e444
github.com/gemalto/kmip-go v0.0.9
github.com/golang/protobuf v1.5.3
github.com/google/fscrypt v0.3.4
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -260,8 +260,8 @@ github.com/creack/pty v1.1.18/go.mod h1:MOBLtS5ELjhRRrroQr9kyvTxUAFNvYEK993ew/Vr
github.com/csi-addons/replication-lib-utils v0.2.0 h1:tGs42wfjkObbBo/98a3uxTFWEJ1dq5PIMqPWtdLd040=
github.com/csi-addons/replication-lib-utils v0.2.0/go.mod h1:ROQlEsc2EerVtc/K/C+6Hx8pqaQ9MVy9xFFpyKfI9lc=
github.com/csi-addons/spec v0.1.0/go.mod h1:Mwq4iLiUV4s+K1bszcWU6aMsR5KPsbIYzzszJ6+56vI=
github.com/csi-addons/spec v0.2.0 h1:Ews7bxpN9P6nFxl1XvMg87cR1wLROdH1FzSfLfb4VfI=
github.com/csi-addons/spec v0.2.0/go.mod h1:Mwq4iLiUV4s+K1bszcWU6aMsR5KPsbIYzzszJ6+56vI=
github.com/csi-addons/spec v0.2.1-0.20230606140122-d20966d2e444 h1:hWVCrZWVHctpWt6cQxV1I6dW3wpBDMg3Vrvu9uAuUxw=
github.com/csi-addons/spec v0.2.1-0.20230606140122-d20966d2e444/go.mod h1:Mwq4iLiUV4s+K1bszcWU6aMsR5KPsbIYzzszJ6+56vI=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
Expand Down
72 changes: 52 additions & 20 deletions internal/csi-addons/rbd/replication.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import (
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/types/known/durationpb"
"google.golang.org/protobuf/types/known/timestamppb"
)

Expand Down Expand Up @@ -82,6 +83,12 @@ type ReplicationServer struct {
*corerbd.ControllerServer
}

type localstruct struct {
lastsynctime *timestamppb.Timestamp
lastsyncduration *durationpb.Duration
lastsyncbytes int64
}

func (rs *ReplicationServer) RegisterService(server grpc.ServiceRegistrar) {
replication.RegisterControllerServer(server, rs)
}
Expand Down Expand Up @@ -719,18 +726,20 @@ func (rs *ReplicationServer) GetVolumeReplicationInfo(ctx context.Context,
}

description := remoteStatus.Description
lastSyncTime, err := getLastSyncTime(description)
lastSyncInfo, err := getLastSyncInfo(description)
if err != nil {
if errors.Is(err, corerbd.ErrLastSyncTimeNotFound) {
return nil, status.Errorf(codes.NotFound, "failed to get last sync time: %v", err)
return nil, status.Errorf(codes.NotFound, "failed to get last sync info: %v", err)
}
log.ErrorLog(ctx, err.Error())

return nil, status.Errorf(codes.Internal, "failed to get last sync time: %v", err)
return nil, status.Errorf(codes.Internal, "failed to get last sync info: %v", err)
}

resp := &replication.GetVolumeReplicationInfoResponse{
LastSyncTime: lastSyncTime,
LastSyncTime: lastSyncInfo.lastsynctime,
LastSyncDuration: lastSyncInfo.lastsyncduration,
LastSyncBytes: lastSyncInfo.lastsyncbytes,
}

return resp, nil
Expand All @@ -756,42 +765,65 @@ func RemoteStatus(gmis *librbd.GlobalMirrorImageStatus) (librbd.SiteMirrorImageS
return ss, err
}

// This function gets the local snapshot time from the description
// of localStatus and converts it into required type.
func getLastSyncTime(description string) (*timestamppb.Timestamp, error) {
// This function gets the local snapshot time, last sync snapshot seconds
// and last sync bytes from the description of localStatus and convert
// it into required types.
func getLastSyncInfo(description string) (localstruct, error) {
// Format of the description will be as followed:
// description = "replaying,{"bytes_per_second":0.0,
// "bytes_per_snapshot":149504.0,"local_snapshot_timestamp":1662655501
// ,"remote_snapshot_timestamp":1662655501}"
// description = `replaying, {"bytes_per_second":0.0,"bytes_per_snapshot":81920.0,
// "last_snapshot_bytes":81920,"last_snapshot_sync_seconds":0,
// "local_snapshot_timestamp":1684675261,
// "remote_snapshot_timestamp":1684675261,"replay_state":"idle"}`
// In case there is no last snapshot bytes returns 0 as the
// LastSyncBytes is optional.
// In case there is no last snapshot sync seconds, it returns nil as the
// LastSyncDuration is optional.
// In case there is no local snapshot timestamp return an error as the
// LastSyncTime is required.

var localInfo localstruct

if description == "" {
return nil, fmt.Errorf("empty description: %w", corerbd.ErrLastSyncTimeNotFound)
return localInfo, fmt.Errorf("empty description: %w", corerbd.ErrLastSyncTimeNotFound)
}
splittedString := strings.SplitN(description, ",", 2)
if len(splittedString) == 1 {
return nil, fmt.Errorf("no local snapshot timestamp: %w", corerbd.ErrLastSyncTimeNotFound)
return localInfo, fmt.Errorf("no snapshot details: %w", corerbd.ErrLastSyncTimeNotFound)
}
type localStatus struct {
LocalSnapshotTime int64 `json:"local_snapshot_timestamp"`
LocalSnapshotTime int64 `json:"local_snapshot_timestamp"`
LastSnapshotBytes int64 `json:"last_snapshot_bytes"`
LastSnapshotDuration int64 `json:"last_snapshot_sync_seconds"`
}

var localSnapTime localStatus
err := json.Unmarshal([]byte(splittedString[1]), &localSnapTime)
var localSnapInfo localStatus
err := json.Unmarshal([]byte(splittedString[1]), &localSnapInfo)
if err != nil {
return nil, fmt.Errorf("failed to unmarshal description: %w", err)
return localInfo, fmt.Errorf("failed to unmarshal local snapshot info: %w", err)
}

// If the json unmarsal is successful but the local snapshot time is 0, we
// need to consider it as an error as the LastSyncTime is required.
if localSnapTime.LocalSnapshotTime == 0 {
return nil, fmt.Errorf("empty local snapshot timestamp: %w", corerbd.ErrLastSyncTimeNotFound)
if localSnapInfo.LocalSnapshotTime == 0 {
return localInfo, fmt.Errorf("empty local snapshot timestamp: %w", corerbd.ErrLastSyncTimeNotFound)
}

lastUpdateTime := time.Unix(localSnapTime.LocalSnapshotTime, 0)
// converts localSnapshotTime of type int64 to time.Time
lastUpdateTime := time.Unix(localSnapInfo.LocalSnapshotTime, 0)
lastSyncTime := timestamppb.New(lastUpdateTime)

return lastSyncTime, nil
// converts localSnapshotDuration of type int64 to time.Time
lastDurationTime := strconv.Itoa(int(localSnapInfo.LastSnapshotDuration))
// converts time.time to time.Duration
lastDuration, _ := time.ParseDuration(lastDurationTime)
// converts time.Duration to *durationpb.Duration
lastSyncDuration := durationpb.New(lastDuration)

localInfo.lastsynctime = lastSyncTime
localInfo.lastsyncduration = lastSyncDuration
localInfo.lastsyncbytes = localSnapInfo.LastSnapshotBytes

return localInfo, nil
}

func checkVolumeResyncStatus(localStatus librbd.SiteMirrorImageStatus) error {
Expand Down
112 changes: 85 additions & 27 deletions internal/csi-addons/rbd/replication_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,9 @@ import (
"time"

corerbd "github.com/ceph/ceph-csi/internal/rbd"

librbd "github.com/ceph/go-ceph/rbd"
"github.com/ceph/go-ceph/rbd/admin"
"google.golang.org/protobuf/types/known/durationpb"
"google.golang.org/protobuf/types/known/timestamppb"
)

Expand Down Expand Up @@ -438,58 +438,116 @@ func TestCheckRemoteSiteStatus(t *testing.T) {
}
}

func TestValidateLastSyncTime(t *testing.T) {
func TestValidateLastSyncInfo(t *testing.T) {
t.Parallel()

duration,_ := time.ParseDuration(strconv.Itoa(int(56743))

type localstruct struct {
lastsynctime *timestamppb.Timestamp
lastsyncduration *durationpb.Duration
lastsyncbytes int64
}
tests := []struct {
name string
description string
timestamp *timestamppb.Timestamp
info localstruct
expectedErr string
}{
{
"valid description",
name: "valid description",
//nolint:lll // sample output cannot be split into multiple lines.
`replaying,{"bytes_per_second":0.0,"bytes_per_snapshot":149504.0,"local_snapshot_timestamp":1662655501,"remote_snapshot_timestamp":1662655501}`,
timestamppb.New(time.Unix(1662655501, 0)),
"",
description: `replaying, {"bytes_per_second":0.0,"bytes_per_snapshot":81920.0,"last_snapshot_bytes":81920,"last_snapshot_sync_seconds":56743,"local_snapshot_timestamp":1684675261,"remote_snapshot_timestamp":1684675261,"replay_state":"idle"}`,
info: localstruct{
lastsynctime: timestamppb.New(time.Unix(1684675261, 0)),
lastsyncduration: durationpb.New(duration),
lastsyncbytes: 81920,
},
expectedErr: "",
},
{
"empty description",
"",
nil,
corerbd.ErrLastSyncTimeNotFound.Error(),
name: "empty description",
description: "",
info: localstruct{
lastsynctime: nil,
lastsyncduration: nil,
lastsyncbytes: 0,
},
expectedErr: corerbd.ErrLastSyncTimeNotFound.Error(),
},
{
"description without local_snapshot_timestamp",
`replaying,{"bytes_per_second":0.0,"bytes_per_snapshot":149504.0,"remote_snapshot_timestamp":1662655501}`,
nil,
"",
name: "description without last_snapshot_bytes",
//nolint:lll // sample output cannot be split into multiple lines.
description: `replaying, {"bytes_per_second":0.0,"last_snapshot_sync_seconds":56743,"local_snapshot_timestamp":1684675261,"remote_snapshot_timestamp":1684675261,"replay_state":"idle"}`,
info: localstruct{
lastsyncduration: durationpb.New(duration),
lastsynctime: timestamppb.New(time.Unix(1684675261, 0)),
lastsyncbytes: 0,
},
expectedErr: "",
},
{
"description with invalid JSON",
`replaying,{"bytes_per_second":0.0,"bytes_per_snapshot":149504.0","remote_snapshot_timestamp":1662655501`,
nil,
"failed to unmarshal",
name: "description without local_snapshot_time",
//nolint:lll // sample output cannot be split into multiple lines.
description: `replaying, {"bytes_per_second":0.0,"bytes_per_snapshot":81920.0,"last_snapshot_bytes":81920,"last_snapshot_sync_seconds":56743,"remote_snapshot_timestamp":1684675261,"replay_state":"idle"}`,
info: localstruct{
lastsyncduration: durationpb.New(duration),
lastsynctime: nil,
lastsyncbytes: 81920,
},
expectedErr: "",
},
{
"description with no JSON",
`replaying`,
nil,
corerbd.ErrLastSyncTimeNotFound.Error(),
name: "description without last_snapshot_sync_seconds",
//nolint:lll // sample output cannot be split into multiple lines.
description: `replaying, {"bytes_per_second":0.0,"bytes_per_snapshot":81920.0,"last_snapshot_bytes":81920,"local_snapshot_timestamp":1684675261,"remote_snapshot_timestamp":1684675261,"replay_state":"idle"}`,
info: localstruct{
lastsyncduration: nil,
lastsynctime: timestamppb.New(time.Unix(1684675261, 0)),
lastsyncbytes: 81920,
},
expectedErr: "",
},
{
name: "description with invalid JSON",
//nolint:lll // sample output cannot be split into multiple lines.
description: `replaying,{"bytes_per_second":0.0,"last_snapshot_bytes":81920","bytes_per_snapshot":149504.0","remote_snapshot_timestamp":1662655501`,
info: localstruct{
lastsyncduration: nil,
lastsynctime: nil,
lastsyncbytes: 0,
},
expectedErr: "failed to unmarshal",
},
{
name: "description with no JSON",
description: `replaying`,
info: localstruct{
lastsyncduration: nil,
lastsynctime: nil,
lastsyncbytes: 0,
},
expectedErr: corerbd.ErrLastSyncTimeNotFound.Error(),
},
}
for _, tt := range tests {
tt := tt
t.Run(tt.name, func(t *testing.T) {
t.Parallel()
ts, err := getLastSyncTime(tt.description)
teststruct, err := getLastSyncInfo(tt.description)
if err != nil && !strings.Contains(err.Error(), tt.expectedErr) {
// returned error
t.Errorf("getLastSyncTime() returned error, expected: %v, got: %v",
t.Errorf("getLastSyncInfo() returned error, expected: %v, got: %v",
tt.expectedErr, err)
}
if !ts.AsTime().Equal(tt.timestamp.AsTime()) {
t.Errorf("getLastSyncTime() %v, expected %v", ts, tt.timestamp)
if teststruct.lastsynctime != tt.info.lastsynctime {
t.Errorf("getLastSyncInfo() %v, expected %v", teststruct.lastsynctime, tt.info.lastsynctime)
}
if teststruct.lastsyncduration != tt.info.lastsyncduration {
t.Errorf("getLastSyncInfo() %v, expected %v", teststruct.lastsyncduration, tt.info.lastsyncduration)
}
if teststruct.lastsyncbytes != tt.info.lastsyncbytes {
t.Errorf("getLastSyncInfo() %v, expected %v", teststruct.lastsyncbytes, tt.info.lastsyncbytes)
}
})
}
Expand Down
Loading

0 comments on commit f41a361

Please sign in to comment.