Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Datatx pull model #2052

Merged
merged 21 commits into from
Mar 16, 2022
Merged
Show file tree
Hide file tree
Changes from 20 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions changelog/unreleased/pull-transfer.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
Enhancement: New CS3API datatx methods

CS3 datatx pull model methods: PullTransfer, RetryTransfer, ListTransfers
Method CreateTransfer removed.

https://github.com/cs3org/reva/pull/2052
2 changes: 2 additions & 0 deletions cmd/reva/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,8 @@ var (
transferCreateCommand(),
transferGetStatusCommand(),
transferCancelCommand(),
transferListCommand(),
transferRetryCommand(),
appTokensListCommand(),
appTokensRemoveCommand(),
appTokensCreateCommand(),
Expand Down
28 changes: 25 additions & 3 deletions cmd/reva/transfer-cancel.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,23 +19,27 @@
package main

import (
"encoding/gob"
"errors"
"io"
"os"
"time"

rpc "github.com/cs3org/go-cs3apis/cs3/rpc/v1beta1"
datatx "github.com/cs3org/go-cs3apis/cs3/tx/v1beta1"
"github.com/jedib0t/go-pretty/table"
)

func transferCancelCommand() *command {
cmd := newCommand("transfer-cancel")
cmd.Description = func() string { return "cancel a running transfer" }
cmd.Usage = func() string { return "Usage: transfer-cancel [-flags]" }
txID := cmd.String("txID", "", "the transfer identifier")
txID := cmd.String("txId", "", "the transfer identifier")

cmd.Action = func(w ...io.Writer) error {
// validate flags
if *txID == "" {
return errors.New("txID must be specified: use -txID flag\n" + cmd.Usage())
return errors.New("txId must be specified: use -txId flag\n" + cmd.Usage())
}

ctx := getAuthContext()
Expand All @@ -44,7 +48,9 @@ func transferCancelCommand() *command {
return err
}

cancelRequest := &datatx.CancelTransferRequest{}
cancelRequest := &datatx.CancelTransferRequest{
TxId: &datatx.TxId{OpaqueId: *txID},
}

cancelResponse, err := client.CancelTransfer(ctx, cancelRequest)
if err != nil {
Expand All @@ -54,6 +60,22 @@ func transferCancelCommand() *command {
return formatError(cancelResponse.Status)
}

if len(w) == 0 {
t := table.NewWriter()
t.SetOutputMirror(os.Stdout)
t.AppendHeader(table.Row{"ShareId.OpaqueId", "Id.OpaqueId", "Status", "Ctime"})
cTime := time.Unix(int64(cancelResponse.TxInfo.Ctime.Seconds), int64(cancelResponse.TxInfo.Ctime.Nanos))
t.AppendRows([]table.Row{
{cancelResponse.TxInfo.ShareId.OpaqueId, cancelResponse.TxInfo.Id.OpaqueId, cancelResponse.TxInfo.Status, cTime.Format("Mon Jan 2 15:04:05 -0700 MST 2006")},
})
t.Render()
} else {
enc := gob.NewEncoder(w[0])
if err := enc.Encode(cancelResponse.TxInfo); err != nil {
return err
}
}

return nil
}
return cmd
Expand Down
4 changes: 2 additions & 2 deletions cmd/reva/transfer-create.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,8 @@ func transferCreateCommand() *command {
cmd.Description = func() string { return "create transfer between 2 sites" }
cmd.Usage = func() string { return "Usage: transfer-create [-flags] <path>" }
grantee := cmd.String("grantee", "", "the grantee, receiver of the transfer")
granteeType := cmd.String("granteeType", "user", "the grantee type, one of: user, group")
idp := cmd.String("idp", "", "the idp of the grantee, default to same idp as the user triggering the action")
granteeType := cmd.String("granteeType", "user", "the grantee type, one of: user, group (defaults to user)")
idp := cmd.String("idp", "", "the idp of the grantee")
userType := cmd.String("user-type", "primary", "the type of user account, defaults to primary")

cmd.Action = func(w ...io.Writer) error {
Expand Down
28 changes: 25 additions & 3 deletions cmd/reva/transfer-get-status.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,23 +19,27 @@
package main

import (
"encoding/gob"
"errors"
"io"
"os"
"time"

rpc "github.com/cs3org/go-cs3apis/cs3/rpc/v1beta1"
datatx "github.com/cs3org/go-cs3apis/cs3/tx/v1beta1"
"github.com/jedib0t/go-pretty/table"
)

func transferGetStatusCommand() *command {
cmd := newCommand("transfer-get-status")
cmd.Description = func() string { return "get the status of a transfer" }
cmd.Usage = func() string { return "Usage: transfer-get-status [-flags]" }
txID := cmd.String("txID", "", "the transfer identifier")
txID := cmd.String("txId", "", "the transfer identifier")

cmd.Action = func(w ...io.Writer) error {
// validate flags
if *txID == "" {
return errors.New("txID must be specified: use -txID flag\n" + cmd.Usage())
return errors.New("txId must be specified: use -txId flag\n" + cmd.Usage())
}

ctx := getAuthContext()
Expand All @@ -44,7 +48,9 @@ func transferGetStatusCommand() *command {
return err
}

getStatusRequest := &datatx.GetTransferStatusRequest{}
getStatusRequest := &datatx.GetTransferStatusRequest{
TxId: &datatx.TxId{OpaqueId: *txID},
}

getStatusResponse, err := client.GetTransferStatus(ctx, getStatusRequest)
if err != nil {
Expand All @@ -54,6 +60,22 @@ func transferGetStatusCommand() *command {
return formatError(getStatusResponse.Status)
}

if len(w) == 0 {
t := table.NewWriter()
t.SetOutputMirror(os.Stdout)
t.AppendHeader(table.Row{"ShareId.OpaqueId", "Id.OpaqueId", "Status", "Ctime"})
cTime := time.Unix(int64(getStatusResponse.TxInfo.Ctime.Seconds), int64(getStatusResponse.TxInfo.Ctime.Nanos))
t.AppendRows([]table.Row{
{getStatusResponse.TxInfo.ShareId.OpaqueId, getStatusResponse.TxInfo.Id.OpaqueId, getStatusResponse.TxInfo.Status, cTime.Format("Mon Jan 2 15:04:05 -0700 MST 2006")},
})
t.Render()
} else {
enc := gob.NewEncoder(w[0])
if err := enc.Encode(getStatusResponse.TxInfo); err != nil {
return err
}
}

return nil
}
return cmd
Expand Down
91 changes: 91 additions & 0 deletions cmd/reva/transfer-list.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
// Copyright 2018-2021 CERN
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// In applying this license, CERN does not waive the privileges and immunities
// granted to it by virtue of its status as an Intergovernmental Organization
// or submit itself to any jurisdiction.

package main

import (
"encoding/gob"
"io"
"os"

rpc "github.com/cs3org/go-cs3apis/cs3/rpc/v1beta1"
ocm "github.com/cs3org/go-cs3apis/cs3/sharing/ocm/v1beta1"
datatx "github.com/cs3org/go-cs3apis/cs3/tx/v1beta1"
"github.com/jedib0t/go-pretty/table"
)

func transferListCommand() *command {
cmd := newCommand("transfer-list")
cmd.Description = func() string { return "get a list of transfers" }
cmd.Usage = func() string { return "Usage: transfer-list [-flags]" }
filterShareID := cmd.String("shareId", "", "share ID filter (optional)")

cmd.Action = func(w ...io.Writer) error {
ctx := getAuthContext()
client, err := getClient()
if err != nil {
return err
}

// validate flags
var filters []*datatx.ListTransfersRequest_Filter
if *filterShareID != "" {
filters = append(filters, &datatx.ListTransfersRequest_Filter{
Type: datatx.ListTransfersRequest_Filter_TYPE_SHARE_ID,
Term: &datatx.ListTransfersRequest_Filter_ShareId{
ShareId: &ocm.ShareId{
OpaqueId: *filterShareID,
},
},
})
}

transferslistRequest := &datatx.ListTransfersRequest{
Filters: filters,
}

listTransfersResponse, err := client.ListTransfers(ctx, transferslistRequest)
if err != nil {
return err
}
if listTransfersResponse.Status.Code != rpc.Code_CODE_OK {
return formatError(listTransfersResponse.Status)
}

if len(w) == 0 {
t := table.NewWriter()
t.SetOutputMirror(os.Stdout)
t.AppendHeader(table.Row{"ShareId.OpaqueId", "Id.OpaqueId"})

for _, s := range listTransfersResponse.Transfers {
t.AppendRows([]table.Row{
{s.ShareId.OpaqueId, s.Id.OpaqueId},
})
}
t.Render()
} else {
enc := gob.NewEncoder(w[0])
if err := enc.Encode(listTransfersResponse.Transfers); err != nil {
return err
}
}

return nil
}
return cmd
}
84 changes: 84 additions & 0 deletions cmd/reva/transfer-retry.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
// Copyright 2018-2021 CERN
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// In applying this license, CERN does not waive the privileges and immunities
// granted to it by virtue of its status as an Intergovernmental Organization
// or submit itself to any jurisdiction.

package main

import (
"encoding/gob"
"errors"
"io"
"os"
"time"

rpc "github.com/cs3org/go-cs3apis/cs3/rpc/v1beta1"
datatx "github.com/cs3org/go-cs3apis/cs3/tx/v1beta1"
"github.com/jedib0t/go-pretty/table"
)

func transferRetryCommand() *command {
cmd := newCommand("transfer-retry")
cmd.Description = func() string { return "retry a transfer" }
cmd.Usage = func() string { return "Usage: transfer-retry [-flags]" }
txID := cmd.String("txId", "", "the transfer identifier")

cmd.Action = func(w ...io.Writer) error {
// validate flags
if *txID == "" {
return errors.New("txId must be specified: use -txId flag\n" + cmd.Usage())
}

ctx := getAuthContext()
client, err := getClient()
if err != nil {
return err
}

retryRequest := &datatx.RetryTransferRequest{
TxId: &datatx.TxId{
OpaqueId: *txID,
},
}

retryResponse, err := client.RetryTransfer(ctx, retryRequest)
if err != nil {
return err
}
if retryResponse.Status.Code != rpc.Code_CODE_OK {
return formatError(retryResponse.Status)
}

if len(w) == 0 {
t := table.NewWriter()
t.SetOutputMirror(os.Stdout)
t.AppendHeader(table.Row{"ShareId.OpaqueId", "Id.OpaqueId", "Status", "Ctime"})
cTime := time.Unix(int64(retryResponse.TxInfo.Ctime.Seconds), int64(retryResponse.TxInfo.Ctime.Nanos))
t.AppendRows([]table.Row{
{retryResponse.TxInfo.ShareId.OpaqueId, retryResponse.TxInfo.Id.OpaqueId, retryResponse.TxInfo.Status, cTime},
})
t.Render()
} else {
enc := gob.NewEncoder(w[0])
if err := enc.Encode(retryResponse.TxInfo); err != nil {
return err
}
}

return nil
}
return cmd
}
1 change: 1 addition & 0 deletions cmd/revad/runtime/loader.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
_ "github.com/cs3org/reva/pkg/auth/manager/loader"
_ "github.com/cs3org/reva/pkg/auth/registry/loader"
_ "github.com/cs3org/reva/pkg/cbox/loader"
_ "github.com/cs3org/reva/pkg/datatx/manager/loader"
_ "github.com/cs3org/reva/pkg/group/manager/loader"
_ "github.com/cs3org/reva/pkg/metrics/driver/loader"
_ "github.com/cs3org/reva/pkg/ocm/invite/manager/loader"
Expand Down
22 changes: 22 additions & 0 deletions examples/datatx/datatx.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
# example data transfer service configuration
[grpc.services.datatx]
# rclone is the default data transfer driver
txdriver = "rclone"
# the shares,transfers db file (default: /var/tmp/reva/datatx-shares.json)
tx_shares_file = ""
# base folder of the data transfers (default: /home/DataTransfers)
data_transfers_folder = ""

# rclone data transfer driver
[grpc.services.datatx.txdrivers.rclone]
# rclone endpoint
endpoint = "http://..."
# basic auth is used
auth_user = "...rcloneuser"
auth_pass = "...rcloneusersecret"
# the transfers(jobs) db file (default: /var/tmp/reva/datatx-transfers.json)
file = ""
# check status job interval in milliseconds
job_status_check_interval = 2000
# the job timeout in milliseconds (must be long enough for big transfers!)
job_timeout = 120000
3 changes: 2 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ require (
github.com/cheggaaa/pb v1.0.29
github.com/coreos/go-oidc v2.2.1+incompatible
github.com/cs3org/cato v0.0.0-20200828125504-e418fc54dd5e
github.com/cs3org/go-cs3apis v0.0.0-20211214102128-4e8745ab1654
github.com/cs3org/go-cs3apis v0.0.0-20220214084335-d975ab5d6e65
github.com/cubewise-code/go-mime v0.0.0-20200519001935-8c5762b177d8
github.com/dgraph-io/ristretto v0.1.0
github.com/eventials/go-tus v0.0.0-20200718001131-45c7ec8f5d59
Expand Down Expand Up @@ -89,6 +89,7 @@ require (
go 1.16

replace (
// github.com/cs3org/go-cs3apis => ../../go-cs3apis
ishank011 marked this conversation as resolved.
Show resolved Hide resolved
github.com/eventials/go-tus => github.com/andrewmostello/go-tus v0.0.0-20200314041820-904a9904af9a
github.com/oleiade/reflections => github.com/oleiade/reflections v1.0.1
google.golang.org/grpc => google.golang.org/grpc v1.26.0 // temporary downgrade
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,8 @@ github.com/cs3org/cato v0.0.0-20200828125504-e418fc54dd5e h1:tqSPWQeueWTKnJVMJff
github.com/cs3org/cato v0.0.0-20200828125504-e418fc54dd5e/go.mod h1:XJEZ3/EQuI3BXTp/6DUzFr850vlxq11I6satRtz0YQ4=
github.com/cs3org/go-cs3apis v0.0.0-20211214102128-4e8745ab1654 h1:ha5tiuuFyDrwKUrVEc3TrRDFgTKVQ9NGDRmEP0PRPno=
github.com/cs3org/go-cs3apis v0.0.0-20211214102128-4e8745ab1654/go.mod h1:UXha4TguuB52H14EMoSsCqDj7k8a/t7g4gVP+bgY5LY=
github.com/cs3org/go-cs3apis v0.0.0-20220214084335-d975ab5d6e65 h1:cee0dhBsF8KofV2TM52T41eOo1QLSgtgEZsjYmC5dhU=
github.com/cs3org/go-cs3apis v0.0.0-20220214084335-d975ab5d6e65/go.mod h1:UXha4TguuB52H14EMoSsCqDj7k8a/t7g4gVP+bgY5LY=
github.com/cubewise-code/go-mime v0.0.0-20200519001935-8c5762b177d8 h1:Z9lwXumT5ACSmJ7WGnFl+OMLLjpz5uR2fyz7dC255FI=
github.com/cubewise-code/go-mime v0.0.0-20200519001935-8c5762b177d8/go.mod h1:4abs/jPXcmJzYoYGF91JF9Uq9s/KL5n1jvFDix8KcqY=
github.com/cyberdelia/templates v0.0.0-20141128023046-ca7fffd4298c/go.mod h1:GyV+0YP4qX0UQ7r2MoYZ+AvYDp12OF5yg4q8rGnyNh4=
Expand Down
Loading