From 559fccb141aac58d4d54c5604d094042a41a1d78 Mon Sep 17 00:00:00 2001 From: Antoon P Date: Wed, 16 Mar 2022 10:50:58 +0100 Subject: [PATCH] Datatx pull model (#2052) --- changelog/unreleased/pull-transfer.md | 6 + cmd/reva/main.go | 2 + cmd/reva/transfer-cancel.go | 28 +- cmd/reva/transfer-create.go | 4 +- cmd/reva/transfer-get-status.go | 28 +- cmd/reva/transfer-list.go | 91 ++ cmd/reva/transfer-retry.go | 84 ++ cmd/revad/runtime/loader.go | 1 + examples/datatx/datatx.toml | 22 + go.mod | 2 +- go.sum | 2 + internal/grpc/services/datatx/datatx.go | 305 ++++++- internal/grpc/services/gateway/datatx.go | 48 +- internal/grpc/services/gateway/gateway.go | 2 +- .../grpc/services/gateway/ocmshareprovider.go | 124 +++ pkg/datatx/datatx.go | 38 + pkg/datatx/manager/loader/loader.go | 25 + pkg/datatx/manager/rclone/rclone.go | 831 ++++++++++++++++++ pkg/datatx/manager/registry/registry.go | 36 + pkg/storage/utils/localfs/localfs.go | 2 +- 20 files changed, 1652 insertions(+), 29 deletions(-) create mode 100644 changelog/unreleased/pull-transfer.md create mode 100644 cmd/reva/transfer-list.go create mode 100644 cmd/reva/transfer-retry.go create mode 100644 examples/datatx/datatx.toml create mode 100644 pkg/datatx/datatx.go create mode 100644 pkg/datatx/manager/loader/loader.go create mode 100644 pkg/datatx/manager/rclone/rclone.go create mode 100644 pkg/datatx/manager/registry/registry.go diff --git a/changelog/unreleased/pull-transfer.md b/changelog/unreleased/pull-transfer.md new file mode 100644 index 0000000000..276609dd63 --- /dev/null +++ b/changelog/unreleased/pull-transfer.md @@ -0,0 +1,6 @@ +Enhancement: New CS3API datatx methods + +CS3 datatx pull model methods: PullTransfer, RetryTransfer, ListTransfers +Method CreateTransfer removed. + +https://github.com/cs3org/reva/pull/2052 \ No newline at end of file diff --git a/cmd/reva/main.go b/cmd/reva/main.go index a77df4b34f..f98551d339 100644 --- a/cmd/reva/main.go +++ b/cmd/reva/main.go @@ -82,6 +82,8 @@ var ( transferCreateCommand(), transferGetStatusCommand(), transferCancelCommand(), + transferListCommand(), + transferRetryCommand(), appTokensListCommand(), appTokensRemoveCommand(), appTokensCreateCommand(), diff --git a/cmd/reva/transfer-cancel.go b/cmd/reva/transfer-cancel.go index aafcce950c..dfa086467b 100644 --- a/cmd/reva/transfer-cancel.go +++ b/cmd/reva/transfer-cancel.go @@ -19,23 +19,27 @@ package main import ( + "encoding/gob" "errors" "io" + "os" + "time" rpc "github.com/cs3org/go-cs3apis/cs3/rpc/v1beta1" datatx "github.com/cs3org/go-cs3apis/cs3/tx/v1beta1" + "github.com/jedib0t/go-pretty/table" ) func transferCancelCommand() *command { cmd := newCommand("transfer-cancel") cmd.Description = func() string { return "cancel a running transfer" } cmd.Usage = func() string { return "Usage: transfer-cancel [-flags]" } - txID := cmd.String("txID", "", "the transfer identifier") + txID := cmd.String("txId", "", "the transfer identifier") cmd.Action = func(w ...io.Writer) error { // validate flags if *txID == "" { - return errors.New("txID must be specified: use -txID flag\n" + cmd.Usage()) + return errors.New("txId must be specified: use -txId flag\n" + cmd.Usage()) } ctx := getAuthContext() @@ -44,7 +48,9 @@ func transferCancelCommand() *command { return err } - cancelRequest := &datatx.CancelTransferRequest{} + cancelRequest := &datatx.CancelTransferRequest{ + TxId: &datatx.TxId{OpaqueId: *txID}, + } cancelResponse, err := client.CancelTransfer(ctx, cancelRequest) if err != nil { @@ -54,6 +60,22 @@ func transferCancelCommand() *command { return formatError(cancelResponse.Status) } + if len(w) == 0 { + t := table.NewWriter() + t.SetOutputMirror(os.Stdout) + t.AppendHeader(table.Row{"ShareId.OpaqueId", "Id.OpaqueId", "Status", "Ctime"}) + cTime := time.Unix(int64(cancelResponse.TxInfo.Ctime.Seconds), int64(cancelResponse.TxInfo.Ctime.Nanos)) + t.AppendRows([]table.Row{ + {cancelResponse.TxInfo.ShareId.OpaqueId, cancelResponse.TxInfo.Id.OpaqueId, cancelResponse.TxInfo.Status, cTime.Format("Mon Jan 2 15:04:05 -0700 MST 2006")}, + }) + t.Render() + } else { + enc := gob.NewEncoder(w[0]) + if err := enc.Encode(cancelResponse.TxInfo); err != nil { + return err + } + } + return nil } return cmd diff --git a/cmd/reva/transfer-create.go b/cmd/reva/transfer-create.go index 44654f9104..9f359eb439 100644 --- a/cmd/reva/transfer-create.go +++ b/cmd/reva/transfer-create.go @@ -42,8 +42,8 @@ func transferCreateCommand() *command { cmd.Description = func() string { return "create transfer between 2 sites" } cmd.Usage = func() string { return "Usage: transfer-create [-flags] " } grantee := cmd.String("grantee", "", "the grantee, receiver of the transfer") - granteeType := cmd.String("granteeType", "user", "the grantee type, one of: user, group") - idp := cmd.String("idp", "", "the idp of the grantee, default to same idp as the user triggering the action") + granteeType := cmd.String("granteeType", "user", "the grantee type, one of: user, group (defaults to user)") + idp := cmd.String("idp", "", "the idp of the grantee") userType := cmd.String("user-type", "primary", "the type of user account, defaults to primary") cmd.Action = func(w ...io.Writer) error { diff --git a/cmd/reva/transfer-get-status.go b/cmd/reva/transfer-get-status.go index bef584c913..ab73a52e69 100644 --- a/cmd/reva/transfer-get-status.go +++ b/cmd/reva/transfer-get-status.go @@ -19,23 +19,27 @@ package main import ( + "encoding/gob" "errors" "io" + "os" + "time" rpc "github.com/cs3org/go-cs3apis/cs3/rpc/v1beta1" datatx "github.com/cs3org/go-cs3apis/cs3/tx/v1beta1" + "github.com/jedib0t/go-pretty/table" ) func transferGetStatusCommand() *command { cmd := newCommand("transfer-get-status") cmd.Description = func() string { return "get the status of a transfer" } cmd.Usage = func() string { return "Usage: transfer-get-status [-flags]" } - txID := cmd.String("txID", "", "the transfer identifier") + txID := cmd.String("txId", "", "the transfer identifier") cmd.Action = func(w ...io.Writer) error { // validate flags if *txID == "" { - return errors.New("txID must be specified: use -txID flag\n" + cmd.Usage()) + return errors.New("txId must be specified: use -txId flag\n" + cmd.Usage()) } ctx := getAuthContext() @@ -44,7 +48,9 @@ func transferGetStatusCommand() *command { return err } - getStatusRequest := &datatx.GetTransferStatusRequest{} + getStatusRequest := &datatx.GetTransferStatusRequest{ + TxId: &datatx.TxId{OpaqueId: *txID}, + } getStatusResponse, err := client.GetTransferStatus(ctx, getStatusRequest) if err != nil { @@ -54,6 +60,22 @@ func transferGetStatusCommand() *command { return formatError(getStatusResponse.Status) } + if len(w) == 0 { + t := table.NewWriter() + t.SetOutputMirror(os.Stdout) + t.AppendHeader(table.Row{"ShareId.OpaqueId", "Id.OpaqueId", "Status", "Ctime"}) + cTime := time.Unix(int64(getStatusResponse.TxInfo.Ctime.Seconds), int64(getStatusResponse.TxInfo.Ctime.Nanos)) + t.AppendRows([]table.Row{ + {getStatusResponse.TxInfo.ShareId.OpaqueId, getStatusResponse.TxInfo.Id.OpaqueId, getStatusResponse.TxInfo.Status, cTime.Format("Mon Jan 2 15:04:05 -0700 MST 2006")}, + }) + t.Render() + } else { + enc := gob.NewEncoder(w[0]) + if err := enc.Encode(getStatusResponse.TxInfo); err != nil { + return err + } + } + return nil } return cmd diff --git a/cmd/reva/transfer-list.go b/cmd/reva/transfer-list.go new file mode 100644 index 0000000000..b0ea83ae9d --- /dev/null +++ b/cmd/reva/transfer-list.go @@ -0,0 +1,91 @@ +// Copyright 2018-2021 CERN +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// In applying this license, CERN does not waive the privileges and immunities +// granted to it by virtue of its status as an Intergovernmental Organization +// or submit itself to any jurisdiction. + +package main + +import ( + "encoding/gob" + "io" + "os" + + rpc "github.com/cs3org/go-cs3apis/cs3/rpc/v1beta1" + ocm "github.com/cs3org/go-cs3apis/cs3/sharing/ocm/v1beta1" + datatx "github.com/cs3org/go-cs3apis/cs3/tx/v1beta1" + "github.com/jedib0t/go-pretty/table" +) + +func transferListCommand() *command { + cmd := newCommand("transfer-list") + cmd.Description = func() string { return "get a list of transfers" } + cmd.Usage = func() string { return "Usage: transfer-list [-flags]" } + filterShareID := cmd.String("shareId", "", "share ID filter (optional)") + + cmd.Action = func(w ...io.Writer) error { + ctx := getAuthContext() + client, err := getClient() + if err != nil { + return err + } + + // validate flags + var filters []*datatx.ListTransfersRequest_Filter + if *filterShareID != "" { + filters = append(filters, &datatx.ListTransfersRequest_Filter{ + Type: datatx.ListTransfersRequest_Filter_TYPE_SHARE_ID, + Term: &datatx.ListTransfersRequest_Filter_ShareId{ + ShareId: &ocm.ShareId{ + OpaqueId: *filterShareID, + }, + }, + }) + } + + transferslistRequest := &datatx.ListTransfersRequest{ + Filters: filters, + } + + listTransfersResponse, err := client.ListTransfers(ctx, transferslistRequest) + if err != nil { + return err + } + if listTransfersResponse.Status.Code != rpc.Code_CODE_OK { + return formatError(listTransfersResponse.Status) + } + + if len(w) == 0 { + t := table.NewWriter() + t.SetOutputMirror(os.Stdout) + t.AppendHeader(table.Row{"ShareId.OpaqueId", "Id.OpaqueId"}) + + for _, s := range listTransfersResponse.Transfers { + t.AppendRows([]table.Row{ + {s.ShareId.OpaqueId, s.Id.OpaqueId}, + }) + } + t.Render() + } else { + enc := gob.NewEncoder(w[0]) + if err := enc.Encode(listTransfersResponse.Transfers); err != nil { + return err + } + } + + return nil + } + return cmd +} diff --git a/cmd/reva/transfer-retry.go b/cmd/reva/transfer-retry.go new file mode 100644 index 0000000000..2f2ada08d9 --- /dev/null +++ b/cmd/reva/transfer-retry.go @@ -0,0 +1,84 @@ +// Copyright 2018-2021 CERN +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// In applying this license, CERN does not waive the privileges and immunities +// granted to it by virtue of its status as an Intergovernmental Organization +// or submit itself to any jurisdiction. + +package main + +import ( + "encoding/gob" + "errors" + "io" + "os" + "time" + + rpc "github.com/cs3org/go-cs3apis/cs3/rpc/v1beta1" + datatx "github.com/cs3org/go-cs3apis/cs3/tx/v1beta1" + "github.com/jedib0t/go-pretty/table" +) + +func transferRetryCommand() *command { + cmd := newCommand("transfer-retry") + cmd.Description = func() string { return "retry a transfer" } + cmd.Usage = func() string { return "Usage: transfer-retry [-flags]" } + txID := cmd.String("txId", "", "the transfer identifier") + + cmd.Action = func(w ...io.Writer) error { + // validate flags + if *txID == "" { + return errors.New("txId must be specified: use -txId flag\n" + cmd.Usage()) + } + + ctx := getAuthContext() + client, err := getClient() + if err != nil { + return err + } + + retryRequest := &datatx.RetryTransferRequest{ + TxId: &datatx.TxId{ + OpaqueId: *txID, + }, + } + + retryResponse, err := client.RetryTransfer(ctx, retryRequest) + if err != nil { + return err + } + if retryResponse.Status.Code != rpc.Code_CODE_OK { + return formatError(retryResponse.Status) + } + + if len(w) == 0 { + t := table.NewWriter() + t.SetOutputMirror(os.Stdout) + t.AppendHeader(table.Row{"ShareId.OpaqueId", "Id.OpaqueId", "Status", "Ctime"}) + cTime := time.Unix(int64(retryResponse.TxInfo.Ctime.Seconds), int64(retryResponse.TxInfo.Ctime.Nanos)) + t.AppendRows([]table.Row{ + {retryResponse.TxInfo.ShareId.OpaqueId, retryResponse.TxInfo.Id.OpaqueId, retryResponse.TxInfo.Status, cTime}, + }) + t.Render() + } else { + enc := gob.NewEncoder(w[0]) + if err := enc.Encode(retryResponse.TxInfo); err != nil { + return err + } + } + + return nil + } + return cmd +} diff --git a/cmd/revad/runtime/loader.go b/cmd/revad/runtime/loader.go index b7a3f3ce10..2dabdd5029 100644 --- a/cmd/revad/runtime/loader.go +++ b/cmd/revad/runtime/loader.go @@ -33,6 +33,7 @@ import ( _ "github.com/cs3org/reva/pkg/auth/manager/loader" _ "github.com/cs3org/reva/pkg/auth/registry/loader" _ "github.com/cs3org/reva/pkg/cbox/loader" + _ "github.com/cs3org/reva/pkg/datatx/manager/loader" _ "github.com/cs3org/reva/pkg/group/manager/loader" _ "github.com/cs3org/reva/pkg/metrics/driver/loader" _ "github.com/cs3org/reva/pkg/ocm/invite/manager/loader" diff --git a/examples/datatx/datatx.toml b/examples/datatx/datatx.toml new file mode 100644 index 0000000000..cb50385005 --- /dev/null +++ b/examples/datatx/datatx.toml @@ -0,0 +1,22 @@ +# example data transfer service configuration +[grpc.services.datatx] +# rclone is the default data transfer driver +txdriver = "rclone" +# the shares,transfers db file (default: /var/tmp/reva/datatx-shares.json) +tx_shares_file = "" +# base folder of the data transfers (default: /home/DataTransfers) +data_transfers_folder = "" + +# rclone data transfer driver +[grpc.services.datatx.txdrivers.rclone] +# rclone endpoint +endpoint = "http://..." +# basic auth is used +auth_user = "...rcloneuser" +auth_pass = "...rcloneusersecret" +# the transfers(jobs) db file (default: /var/tmp/reva/datatx-transfers.json) +file = "" +# check status job interval in milliseconds +job_status_check_interval = 2000 +# the job timeout in milliseconds (must be long enough for big transfers!) +job_timeout = 120000 \ No newline at end of file diff --git a/go.mod b/go.mod index 30acd363f9..7ab3d4f2a8 100644 --- a/go.mod +++ b/go.mod @@ -19,7 +19,7 @@ require ( github.com/cheggaaa/pb v1.0.29 github.com/coreos/go-oidc v2.2.1+incompatible github.com/cs3org/cato v0.0.0-20200828125504-e418fc54dd5e - github.com/cs3org/go-cs3apis v0.0.0-20211214102128-4e8745ab1654 + github.com/cs3org/go-cs3apis v0.0.0-20220214084335-d975ab5d6e65 github.com/cubewise-code/go-mime v0.0.0-20200519001935-8c5762b177d8 github.com/dgraph-io/ristretto v0.1.0 github.com/eventials/go-tus v0.0.0-20200718001131-45c7ec8f5d59 diff --git a/go.sum b/go.sum index cafcdd296e..74c94fc60b 100644 --- a/go.sum +++ b/go.sum @@ -212,6 +212,8 @@ github.com/cs3org/cato v0.0.0-20200828125504-e418fc54dd5e h1:tqSPWQeueWTKnJVMJff github.com/cs3org/cato v0.0.0-20200828125504-e418fc54dd5e/go.mod h1:XJEZ3/EQuI3BXTp/6DUzFr850vlxq11I6satRtz0YQ4= github.com/cs3org/go-cs3apis v0.0.0-20211214102128-4e8745ab1654 h1:ha5tiuuFyDrwKUrVEc3TrRDFgTKVQ9NGDRmEP0PRPno= github.com/cs3org/go-cs3apis v0.0.0-20211214102128-4e8745ab1654/go.mod h1:UXha4TguuB52H14EMoSsCqDj7k8a/t7g4gVP+bgY5LY= +github.com/cs3org/go-cs3apis v0.0.0-20220214084335-d975ab5d6e65 h1:cee0dhBsF8KofV2TM52T41eOo1QLSgtgEZsjYmC5dhU= +github.com/cs3org/go-cs3apis v0.0.0-20220214084335-d975ab5d6e65/go.mod h1:UXha4TguuB52H14EMoSsCqDj7k8a/t7g4gVP+bgY5LY= github.com/cubewise-code/go-mime v0.0.0-20200519001935-8c5762b177d8 h1:Z9lwXumT5ACSmJ7WGnFl+OMLLjpz5uR2fyz7dC255FI= github.com/cubewise-code/go-mime v0.0.0-20200519001935-8c5762b177d8/go.mod h1:4abs/jPXcmJzYoYGF91JF9Uq9s/KL5n1jvFDix8KcqY= github.com/cyberdelia/templates v0.0.0-20141128023046-ca7fffd4298c/go.mod h1:GyV+0YP4qX0UQ7r2MoYZ+AvYDp12OF5yg4q8rGnyNh4= diff --git a/internal/grpc/services/datatx/datatx.go b/internal/grpc/services/datatx/datatx.go index 33e9f73d56..8c2c2b87e2 100644 --- a/internal/grpc/services/datatx/datatx.go +++ b/internal/grpc/services/datatx/datatx.go @@ -20,8 +20,18 @@ package datatx import ( "context" + "encoding/json" + "fmt" + "io/ioutil" + "net/url" + "os" + "sync" + ocm "github.com/cs3org/go-cs3apis/cs3/sharing/ocm/v1beta1" datatx "github.com/cs3org/go-cs3apis/cs3/tx/v1beta1" + types "github.com/cs3org/go-cs3apis/cs3/types/v1beta1" + txdriver "github.com/cs3org/reva/pkg/datatx" + txregistry "github.com/cs3org/reva/pkg/datatx/manager/registry" "github.com/cs3org/reva/pkg/errtypes" "github.com/cs3org/reva/pkg/rgrpc" "github.com/cs3org/reva/pkg/rgrpc/status" @@ -35,23 +45,72 @@ func init() { } type config struct { + // transfer driver + TxDriver string `mapstructure:"txdriver"` + TxDrivers map[string]map[string]interface{} `mapstructure:"txdrivers"` + // storage driver to persist share/transfer relation + StorageDriver string `mapstructure:"storage_driver"` + StorageDrivers map[string]map[string]interface{} `mapstructure:"storage_drivers"` + TxSharesFile string `mapstructure:"tx_shares_file"` + DataTransfersFolder string `mapstructure:"data_transfers_folder"` } type service struct { - conf *config + conf *config + txManager txdriver.Manager + txShareDriver *txShareDriver +} + +type txShareDriver struct { + sync.Mutex // concurrent access to the file + model *txShareModel +} +type txShareModel struct { + File string + TxShares map[string]*txShare `json:"shares"` +} + +type txShare struct { + TxID string + SrcTargetURI string + DestTargetURI string + Opaque *types.Opaque `json:"opaque"` +} + +type webdavEndpoint struct { + filePath string + endpoint string + endpointScheme string + token string } func (c *config) init() { + if c.TxDriver == "" { + c.TxDriver = "rclone" + } + if c.TxSharesFile == "" { + c.TxSharesFile = "/var/tmp/reva/datatx-shares.json" + } + if c.DataTransfersFolder == "" { + c.DataTransfersFolder = "/home/DataTransfers" + } } func (s *service) Register(ss *grpc.Server) { datatx.RegisterTxAPIServer(ss, s) } +func getDatatxManager(c *config) (txdriver.Manager, error) { + if f, ok := txregistry.NewFuncs[c.TxDriver]; ok { + return f(c.TxDrivers[c.TxDriver]) + } + return nil, errtypes.NotFound("datatx service: driver not found: " + c.TxDriver) +} + func parseConfig(m map[string]interface{}) (*config, error) { c := &config{} if err := mapstructure.Decode(m, c); err != nil { - err = errors.Wrap(err, "error decoding conf") + err = errors.Wrap(err, "datatx service: error decoding conf") return nil, err } return c, nil @@ -66,8 +125,24 @@ func New(m map[string]interface{}, ss *grpc.Server) (rgrpc.Service, error) { } c.init() + txManager, err := getDatatxManager(c) + if err != nil { + return nil, err + } + + model, err := loadOrCreate(c.TxSharesFile) + if err != nil { + err = errors.Wrap(err, "datatx service: error loading the file containing the transfer shares") + return nil, err + } + txShareDriver := &txShareDriver{ + model: model, + } + service := &service{ - conf: c, + conf: c, + txManager: txManager, + txShareDriver: txShareDriver, } return service, nil @@ -81,20 +156,228 @@ func (s *service) UnprotectedEndpoints() []string { return []string{} } -func (s *service) CreateTransfer(ctx context.Context, req *datatx.CreateTransferRequest) (*datatx.CreateTransferResponse, error) { - return &datatx.CreateTransferResponse{ - Status: status.NewUnimplemented(ctx, errtypes.NotSupported("CreateTransfer not implemented"), "CreateTransfer not implemented"), - }, nil +func (s *service) PullTransfer(ctx context.Context, req *datatx.PullTransferRequest) (*datatx.PullTransferResponse, error) { + srcEp, err := s.extractEndpointInfo(ctx, req.SrcTargetUri) + if err != nil { + return nil, err + } + srcRemote := fmt.Sprintf("%s://%s", srcEp.endpointScheme, srcEp.endpoint) + srcPath := srcEp.filePath + srcToken := srcEp.token + + destEp, err := s.extractEndpointInfo(ctx, req.DestTargetUri) + if err != nil { + return nil, err + } + dstRemote := fmt.Sprintf("%s://%s", destEp.endpointScheme, destEp.endpoint) + dstPath := destEp.filePath + dstToken := destEp.token + + txInfo, startTransferErr := s.txManager.StartTransfer(ctx, srcRemote, srcPath, srcToken, dstRemote, dstPath, dstToken) + + // we always save the transfer regardless of start transfer outcome + // only then, if starting fails, can we try to restart it + txShare := &txShare{ + TxID: txInfo.GetId().OpaqueId, + SrcTargetURI: req.SrcTargetUri, + DestTargetURI: req.DestTargetUri, + Opaque: req.Opaque, + } + s.txShareDriver.Lock() + defer s.txShareDriver.Unlock() + + s.txShareDriver.model.TxShares[txInfo.GetId().OpaqueId] = txShare + if err := s.txShareDriver.model.saveTxShare(); err != nil { + err = errors.Wrap(err, "datatx service: error saving transfer share: "+datatx.Status_STATUS_INVALID.String()) + return &datatx.PullTransferResponse{ + Status: status.NewInvalid(ctx, "error pulling transfer"), + }, err + } + + // now check start transfer outcome + if startTransferErr != nil { + startTransferErr = errors.Wrap(startTransferErr, "datatx service: error starting transfer job") + return &datatx.PullTransferResponse{ + Status: status.NewInvalid(ctx, "datatx service: error pulling transfer"), + TxInfo: txInfo, + }, startTransferErr + } + + return &datatx.PullTransferResponse{ + Status: status.NewOK(ctx), + TxInfo: txInfo, + }, err } -func (s *service) GetTransferStatus(ctx context.Context, in *datatx.GetTransferStatusRequest) (*datatx.GetTransferStatusResponse, error) { +func (s *service) GetTransferStatus(ctx context.Context, req *datatx.GetTransferStatusRequest) (*datatx.GetTransferStatusResponse, error) { + txShare, ok := s.txShareDriver.model.TxShares[req.GetTxId().GetOpaqueId()] + if !ok { + return nil, errtypes.InternalError("datatx service: transfer not found") + } + + txInfo, err := s.txManager.GetTransferStatus(ctx, req.GetTxId().OpaqueId) + if err != nil { + err = errors.Wrap(err, "datatx service: error retrieving transfer status") + return &datatx.GetTransferStatusResponse{ + Status: status.NewInternal(ctx, err, "datatx service: error getting transfer status"), + TxInfo: txInfo, + }, err + } + + txInfo.ShareId = &ocm.ShareId{OpaqueId: string(txShare.Opaque.Map["shareId"].Value)} + return &datatx.GetTransferStatusResponse{ - Status: status.NewUnimplemented(ctx, errtypes.NotSupported("GetTransferStatus not implemented"), "GetTransferStatus not implemented"), + Status: status.NewOK(ctx), + TxInfo: txInfo, }, nil } -func (s *service) CancelTransfer(ctx context.Context, in *datatx.CancelTransferRequest) (*datatx.CancelTransferResponse, error) { +func (s *service) CancelTransfer(ctx context.Context, req *datatx.CancelTransferRequest) (*datatx.CancelTransferResponse, error) { + txShare, ok := s.txShareDriver.model.TxShares[req.GetTxId().GetOpaqueId()] + if !ok { + return nil, errtypes.InternalError("datatx service: transfer not found") + } + + txInfo, err := s.txManager.CancelTransfer(ctx, req.GetTxId().OpaqueId) + if err != nil { + txInfo.ShareId = &ocm.ShareId{OpaqueId: string(txShare.Opaque.Map["shareId"].Value)} + err = errors.Wrap(err, "datatx service: error cancelling transfer") + return &datatx.CancelTransferResponse{ + Status: status.NewInternal(ctx, err, "error cancelling transfer"), + TxInfo: txInfo, + }, err + } + + txInfo.ShareId = &ocm.ShareId{OpaqueId: string(txShare.Opaque.Map["shareId"].Value)} + return &datatx.CancelTransferResponse{ - Status: status.NewUnimplemented(ctx, errtypes.NotSupported("CancelTransfer not implemented"), "CancelTransfer not implemented"), + Status: status.NewOK(ctx), + TxInfo: txInfo, }, nil } + +func (s *service) ListTransfers(ctx context.Context, req *datatx.ListTransfersRequest) (*datatx.ListTransfersResponse, error) { + filters := req.Filters + var txInfos []*datatx.TxInfo + for _, txShare := range s.txShareDriver.model.TxShares { + if len(filters) == 0 { + txInfos = append(txInfos, &datatx.TxInfo{ + Id: &datatx.TxId{OpaqueId: txShare.TxID}, + ShareId: &ocm.ShareId{OpaqueId: string(txShare.Opaque.Map["shareId"].Value)}, + }) + } else { + for _, f := range filters { + if f.Type == datatx.ListTransfersRequest_Filter_TYPE_SHARE_ID { + if f.GetShareId().GetOpaqueId() == string(txShare.Opaque.Map["shareId"].Value) { + txInfos = append(txInfos, &datatx.TxInfo{ + Id: &datatx.TxId{OpaqueId: txShare.TxID}, + ShareId: &ocm.ShareId{OpaqueId: string(txShare.Opaque.Map["shareId"].Value)}, + }) + } + } + } + } + } + + return &datatx.ListTransfersResponse{ + Status: status.NewOK(ctx), + Transfers: txInfos, + }, nil +} + +func (s *service) RetryTransfer(ctx context.Context, req *datatx.RetryTransferRequest) (*datatx.RetryTransferResponse, error) { + txShare, ok := s.txShareDriver.model.TxShares[req.GetTxId().GetOpaqueId()] + if !ok { + return nil, errtypes.InternalError("datatx service: transfer not found") + } + + txInfo, err := s.txManager.RetryTransfer(ctx, req.GetTxId().OpaqueId) + if err != nil { + err = errors.Wrap(err, "datatx service: error retrying transfer") + return &datatx.RetryTransferResponse{ + Status: status.NewInternal(ctx, err, "error retrying transfer"), + TxInfo: txInfo, + }, err + } + + txInfo.ShareId = &ocm.ShareId{OpaqueId: string(txShare.Opaque.Map["shareId"].Value)} + + return &datatx.RetryTransferResponse{ + Status: status.NewOK(ctx), + TxInfo: txInfo, + }, nil +} + +func (s *service) extractEndpointInfo(ctx context.Context, targetURL string) (*webdavEndpoint, error) { + if targetURL == "" { + return nil, errtypes.BadRequest("datatx service: ref target is an empty uri") + } + + uri, err := url.Parse(targetURL) + if err != nil { + return nil, errors.Wrap(err, "datatx service: error parsing target uri: "+targetURL) + } + + m, err := url.ParseQuery(uri.RawQuery) + if err != nil { + return nil, errors.Wrap(err, "datatx service: error parsing target resource name") + } + + return &webdavEndpoint{ + filePath: m["name"][0], + endpoint: uri.Host + uri.Path, + endpointScheme: uri.Scheme, + token: uri.User.String(), + }, nil +} + +func loadOrCreate(file string) (*txShareModel, error) { + _, err := os.Stat(file) + if os.IsNotExist(err) { + if err := ioutil.WriteFile(file, []byte("{}"), 0700); err != nil { + err = errors.Wrap(err, "datatx service: error creating the transfer shares storage file: "+file) + return nil, err + } + } + + fd, err := os.OpenFile(file, os.O_CREATE, 0644) + if err != nil { + err = errors.Wrap(err, "datatx service: error opening the transfer shares storage file: "+file) + return nil, err + } + defer fd.Close() + + data, err := ioutil.ReadAll(fd) + if err != nil { + err = errors.Wrap(err, "datatx service: error reading the data") + return nil, err + } + + model := &txShareModel{} + if err := json.Unmarshal(data, model); err != nil { + err = errors.Wrap(err, "datatx service: error decoding transfer shares data to json") + return nil, err + } + + if model.TxShares == nil { + model.TxShares = make(map[string]*txShare) + } + + model.File = file + return model, nil +} + +func (m *txShareModel) saveTxShare() error { + data, err := json.Marshal(m) + if err != nil { + err = errors.Wrap(err, "datatx service: error encoding transfer share data to json") + return err + } + + if err := ioutil.WriteFile(m.File, data, 0644); err != nil { + err = errors.Wrap(err, "datatx service: error writing transfer share data to file: "+m.File) + return err + } + + return nil +} diff --git a/internal/grpc/services/gateway/datatx.go b/internal/grpc/services/gateway/datatx.go index c4d857b910..ac2fed8a3e 100644 --- a/internal/grpc/services/gateway/datatx.go +++ b/internal/grpc/services/gateway/datatx.go @@ -27,18 +27,18 @@ import ( "github.com/pkg/errors" ) -func (s *svc) CreateTransfer(ctx context.Context, req *datatx.CreateTransferRequest) (*datatx.CreateTransferResponse, error) { +func (s *svc) PullTransfer(ctx context.Context, req *datatx.PullTransferRequest) (*datatx.PullTransferResponse, error) { c, err := pool.GetDataTxClient(s.c.DataTxEndpoint) if err != nil { - err = errors.Wrap(err, "gateway: error calling GetOCMShareProviderClient") - return &datatx.CreateTransferResponse{ + err = errors.Wrap(err, "gateway: error calling GetDataTxClient") + return &datatx.PullTransferResponse{ Status: status.NewInternal(ctx, err, "error getting data transfer client"), }, nil } - res, err := c.CreateTransfer(ctx, req) + res, err := c.PullTransfer(ctx, req) if err != nil { - return nil, errors.Wrap(err, "gateway: error calling CreateTransfer") + return nil, errors.Wrap(err, "gateway: error calling PullTransfer") } return res, nil @@ -47,7 +47,7 @@ func (s *svc) CreateTransfer(ctx context.Context, req *datatx.CreateTransferRequ func (s *svc) GetTransferStatus(ctx context.Context, req *datatx.GetTransferStatusRequest) (*datatx.GetTransferStatusResponse, error) { c, err := pool.GetDataTxClient(s.c.DataTxEndpoint) if err != nil { - err = errors.Wrap(err, "gateway: error calling GetOCMShareProviderClient") + err = errors.Wrap(err, "gateway: error calling GetDataTxClient") return &datatx.GetTransferStatusResponse{ Status: status.NewInternal(ctx, err, "error getting data transfer client"), }, nil @@ -64,7 +64,7 @@ func (s *svc) GetTransferStatus(ctx context.Context, req *datatx.GetTransferStat func (s *svc) CancelTransfer(ctx context.Context, req *datatx.CancelTransferRequest) (*datatx.CancelTransferResponse, error) { c, err := pool.GetDataTxClient(s.c.DataTxEndpoint) if err != nil { - err = errors.Wrap(err, "gateway: error calling GetOCMShareProviderClient") + err = errors.Wrap(err, "gateway: error calling GetDataTxClient") return &datatx.CancelTransferResponse{ Status: status.NewInternal(ctx, err, "error getting data transfer client"), }, nil @@ -77,3 +77,37 @@ func (s *svc) CancelTransfer(ctx context.Context, req *datatx.CancelTransferRequ return res, nil } + +func (s *svc) ListTransfers(ctx context.Context, req *datatx.ListTransfersRequest) (*datatx.ListTransfersResponse, error) { + c, err := pool.GetDataTxClient(s.c.DataTxEndpoint) + if err != nil { + err = errors.Wrap(err, "gateway: error calling GetDataTxClient") + return &datatx.ListTransfersResponse{ + Status: status.NewInternal(ctx, err, "error getting data transfer client"), + }, nil + } + + res, err := c.ListTransfers(ctx, req) + if err != nil { + return nil, errors.Wrap(err, "gateway: error calling ListTransfers") + } + + return res, nil +} + +func (s *svc) RetryTransfer(ctx context.Context, req *datatx.RetryTransferRequest) (*datatx.RetryTransferResponse, error) { + c, err := pool.GetDataTxClient(s.c.DataTxEndpoint) + if err != nil { + err = errors.Wrap(err, "gateway: error calling GetDataTxClient") + return &datatx.RetryTransferResponse{ + Status: status.NewInternal(ctx, err, "error getting data transfer client"), + }, nil + } + + res, err := c.RetryTransfer(ctx, req) + if err != nil { + return nil, errors.Wrap(err, "gateway: error calling RetryTransfer") + } + + return res, nil +} diff --git a/internal/grpc/services/gateway/gateway.go b/internal/grpc/services/gateway/gateway.go index 25819dfb22..5108abf501 100644 --- a/internal/grpc/services/gateway/gateway.go +++ b/internal/grpc/services/gateway/gateway.go @@ -83,7 +83,7 @@ func (c *config) init() { c.ShareFolder = strings.Trim(c.ShareFolder, "/") if c.DataTransfersFolder == "" { - c.DataTransfersFolder = "Data-Transfers" + c.DataTransfersFolder = "DataTransfers" } if c.TokenManager == "" { diff --git a/internal/grpc/services/gateway/ocmshareprovider.go b/internal/grpc/services/gateway/ocmshareprovider.go index bd7ebb76bb..d0675b7c6e 100644 --- a/internal/grpc/services/gateway/ocmshareprovider.go +++ b/internal/grpc/services/gateway/ocmshareprovider.go @@ -21,12 +21,18 @@ package gateway import ( "context" "fmt" + "net/url" "path" + "strings" + ocmprovider "github.com/cs3org/go-cs3apis/cs3/ocm/provider/v1beta1" rpc "github.com/cs3org/go-cs3apis/cs3/rpc/v1beta1" ocm "github.com/cs3org/go-cs3apis/cs3/sharing/ocm/v1beta1" provider "github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1" + datatx "github.com/cs3org/go-cs3apis/cs3/tx/v1beta1" + types "github.com/cs3org/go-cs3apis/cs3/types/v1beta1" "github.com/cs3org/reva/pkg/appctx" + ctxpkg "github.com/cs3org/reva/pkg/ctx" "github.com/cs3org/reva/pkg/errtypes" "github.com/cs3org/reva/pkg/rgrpc/status" "github.com/cs3org/reva/pkg/rgrpc/todo/pool" @@ -262,6 +268,124 @@ func (s *svc) UpdateReceivedOCMShare(ctx context.Context, req *ocm.UpdateReceive panic("gateway: error updating a received share: the share is nil") } + if share.GetShare().ShareType == ocm.Share_SHARE_TYPE_TRANSFER { + srcIdp := share.GetShare().GetOwner().GetIdp() + meshProvider, err := s.GetInfoByDomain(ctx, &ocmprovider.GetInfoByDomainRequest{ + Domain: srcIdp, + }) + if err != nil { + log.Err(err).Msg("gateway: error calling GetInfoByDomain") + return &ocm.UpdateReceivedOCMShareResponse{ + Status: &rpc.Status{Code: rpc.Code_CODE_INTERNAL}, + }, nil + } + var srcEndpoint string + var srcEndpointBaseURI string + // target URI scheme will be the webdav endpoint scheme + var srcEndpointScheme string + for _, s := range meshProvider.ProviderInfo.Services { + if strings.ToLower(s.Endpoint.Type.Name) == "webdav" { + url, err := url.Parse(s.Endpoint.Path) + if err != nil { + log.Err(err).Msg("gateway: error calling UpdateReceivedShare: unable to parse webdav endpoint " + s.Endpoint.Path) + return &ocm.UpdateReceivedOCMShareResponse{ + Status: &rpc.Status{Code: rpc.Code_CODE_INTERNAL}, + }, nil + } + srcEndpoint = url.Host + srcEndpointBaseURI = url.Path + srcEndpointScheme = url.Scheme + break + } + } + + var srcToken string + srcTokenOpaque, ok := share.GetShare().Grantee.Opaque.Map["token"] + if !ok { + return &ocm.UpdateReceivedOCMShareResponse{ + Status: status.NewNotFound(ctx, "token not found"), + }, nil + } + switch srcTokenOpaque.Decoder { + case "plain": + srcToken = string(srcTokenOpaque.Value) + default: + err := errtypes.NotSupported("opaque entry decoder not recognized: " + srcTokenOpaque.Decoder) + return &ocm.UpdateReceivedOCMShareResponse{ + Status: status.NewInternal(ctx, err, "error updating received share"), + }, nil + } + + srcPath := path.Join(srcEndpointBaseURI, share.GetShare().Name) + srcTargetURI := fmt.Sprintf("%s://%s@%s?name=%s", srcEndpointScheme, srcToken, srcEndpoint, srcPath) + + // get the webdav endpoint of the grantee's idp + var granteeIdp string + if share.GetShare().GetGrantee().Type == provider.GranteeType_GRANTEE_TYPE_USER { + granteeIdp = share.GetShare().GetGrantee().GetUserId().Idp + } + if share.GetShare().GetGrantee().Type == provider.GranteeType_GRANTEE_TYPE_GROUP { + granteeIdp = share.GetShare().GetGrantee().GetGroupId().Idp + } + destWebdavEndpoint, err := s.getWebdavEndpoint(ctx, granteeIdp) + if err != nil { + log.Err(err).Msg("gateway: error calling UpdateReceivedShare") + return &ocm.UpdateReceivedOCMShareResponse{ + Status: &rpc.Status{Code: rpc.Code_CODE_INTERNAL}, + }, nil + } + url, err := url.Parse(destWebdavEndpoint) + if err != nil { + log.Err(err).Msg("gateway: error calling UpdateReceivedShare: unable to parse webdav endpoint " + destWebdavEndpoint) + return &ocm.UpdateReceivedOCMShareResponse{ + Status: &rpc.Status{Code: rpc.Code_CODE_INTERNAL}, + }, nil + } + destEndpoint := url.Host + destEndpointBaseURI := url.Path + destEndpointScheme := url.Scheme + destToken := ctxpkg.ContextMustGetToken(ctx) + homeRes, err := s.GetHome(ctx, &provider.GetHomeRequest{}) + if err != nil { + log.Err(err).Msg("gateway: error calling UpdateReceivedShare") + return &ocm.UpdateReceivedOCMShareResponse{ + Status: &rpc.Status{Code: rpc.Code_CODE_INTERNAL}, + }, nil + } + destPath := path.Join(destEndpointBaseURI, homeRes.Path, s.c.DataTransfersFolder, path.Base(share.GetShare().Name)) + destTargetURI := fmt.Sprintf("%s://%s@%s?name=%s", destEndpointScheme, destToken, destEndpoint, destPath) + + opaqueObj := &types.Opaque{ + Map: map[string]*types.OpaqueEntry{ + "shareId": { + Decoder: "plain", + Value: []byte(share.GetShare().GetId().OpaqueId), + }, + }, + } + req := &datatx.PullTransferRequest{ + SrcTargetUri: srcTargetURI, + DestTargetUri: destTargetURI, + Opaque: opaqueObj, + } + res, err := s.PullTransfer(ctx, req) + if err != nil { + log.Err(err).Msg("gateway: error calling PullTransfer") + return &ocm.UpdateReceivedOCMShareResponse{ + Status: &rpc.Status{ + Code: rpc.Code_CODE_INTERNAL, + }, + }, err + } + + log.Info().Msgf("gateway: PullTransfer: %v", res.TxInfo) + + // do not create an OCM reference, just return + return &ocm.UpdateReceivedOCMShareResponse{ + Status: status.NewOK(ctx), + }, nil + } + createRefStatus, err := s.createOCMReference(ctx, share.Share) return &ocm.UpdateReceivedOCMShareResponse{ Status: createRefStatus, diff --git a/pkg/datatx/datatx.go b/pkg/datatx/datatx.go new file mode 100644 index 0000000000..3b1d98d6b2 --- /dev/null +++ b/pkg/datatx/datatx.go @@ -0,0 +1,38 @@ +// Copyright 2018-2020 CERN +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// In applying this license, CERN does not waive the privileges and immunities +// granted to it by virtue of its status as an Intergovernmental Organization +// or submit itself to any jurisdiction. + +package datatx + +import ( + "context" + + datatx "github.com/cs3org/go-cs3apis/cs3/tx/v1beta1" +) + +// Manager the interface any transfer driver should implement +type Manager interface { + // StartTransfer initiates a transfer job and returns a TxInfo object including a unique transfer id, and error if any. + StartTransfer(ctx context.Context, srcRemote string, srcPath string, srcToken string, destRemote string, destPath string, destToken string) (*datatx.TxInfo, error) + // GetTransferStatus returns a TxInfo object including the current status, and error if any. + GetTransferStatus(ctx context.Context, transferID string) (*datatx.TxInfo, error) + // CancelTransfer cancels the transfer and returns a TxInfo object and error if any. + CancelTransfer(ctx context.Context, transferID string) (*datatx.TxInfo, error) + // RetryTransfer retries the transfer and returns a TxInfo object and error if any. + // Note that tokens must still be valid. + RetryTransfer(ctx context.Context, transferID string) (*datatx.TxInfo, error) +} diff --git a/pkg/datatx/manager/loader/loader.go b/pkg/datatx/manager/loader/loader.go new file mode 100644 index 0000000000..28df90ed12 --- /dev/null +++ b/pkg/datatx/manager/loader/loader.go @@ -0,0 +1,25 @@ +// Copyright 2018-2020 CERN +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// In applying this license, CERN does not waive the privileges and immunities +// granted to it by virtue of its status as an Intergovernmental Organization +// or submit itself to any jurisdiction. + +package loader + +import ( + // Load datatx drivers. + _ "github.com/cs3org/reva/pkg/datatx/manager/rclone" + // Add your own here +) diff --git a/pkg/datatx/manager/rclone/rclone.go b/pkg/datatx/manager/rclone/rclone.go new file mode 100644 index 0000000000..5e4008cbbf --- /dev/null +++ b/pkg/datatx/manager/rclone/rclone.go @@ -0,0 +1,831 @@ +// Copyright 2018-2020 CERN +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// In applying this license, CERN does not waive the privileges and immunities +// granted to it by virtue of its status as an Intergovernmental Organization +// or submit itself to any jurisdiction. + +package rclone + +import ( + "bytes" + "context" + "encoding/json" + "fmt" + "io/ioutil" + "net/http" + "net/url" + "os" + "path" + "strconv" + "sync" + "time" + + datatx "github.com/cs3org/go-cs3apis/cs3/tx/v1beta1" + typespb "github.com/cs3org/go-cs3apis/cs3/types/v1beta1" + "github.com/cs3org/reva/pkg/appctx" + txdriver "github.com/cs3org/reva/pkg/datatx" + registry "github.com/cs3org/reva/pkg/datatx/manager/registry" + "github.com/cs3org/reva/pkg/rhttp" + "github.com/google/uuid" + "github.com/mitchellh/mapstructure" + "github.com/pkg/errors" +) + +func init() { + registry.Register("rclone", New) +} + +func (c *config) init(m map[string]interface{}) { + // set sane defaults + if c.File == "" { + c.File = "/var/tmp/reva/datatx-transfers.json" + } + if c.JobStatusCheckInterval == 0 { + c.JobStatusCheckInterval = 2000 + } + if c.JobTimeout == 0 { + c.JobTimeout = 50000 + } +} + +type config struct { + Endpoint string `mapstructure:"endpoint"` + AuthUser string `mapstructure:"auth_user"` // rclone basicauth user + AuthPass string `mapstructure:"auth_pass"` // rclone basicauth pass + File string `mapstructure:"file"` + JobStatusCheckInterval int `mapstructure:"job_status_check_interval"` + JobTimeout int `mapstructure:"job_timeout"` +} + +type rclone struct { + config *config + client *http.Client + pDriver *pDriver +} + +type rcloneHTTPErrorRes struct { + Error string `json:"error"` + Input map[string]interface{} `json:"input"` + Path string `json:"path"` + Status int `json:"status"` +} + +type transferModel struct { + File string + Transfers map[string]*transfer `json:"transfers"` +} + +// persistency driver +type pDriver struct { + sync.Mutex // concurrent access to the file + model *transferModel +} + +type transfer struct { + TransferID string + JobID int64 + TransferStatus datatx.Status + SrcToken string + SrcRemote string + SrcPath string + DestToken string + DestRemote string + DestPath string + Ctime string +} + +// txEndStatuses final statuses that cannot be changed anymore +var txEndStatuses = map[string]int32{ + "STATUS_INVALID": 0, + "STATUS_DESTINATION_NOT_FOUND": 1, + "STATUS_TRANSFER_COMPLETE": 6, + "STATUS_TRANSFER_FAILED": 7, + "STATUS_TRANSFER_CANCELLED": 8, + "STATUS_TRANSFER_CANCEL_FAILED": 9, + "STATUS_TRANSFER_EXPIRED": 10, +} + +// New returns a new rclone driver +func New(m map[string]interface{}) (txdriver.Manager, error) { + c, err := parseConfig(m) + if err != nil { + return nil, err + } + c.init(m) + + // TODO insecure should be configurable + client := rhttp.GetHTTPClient(rhttp.Insecure(true)) + + // The persistency driver + // Load or create 'db' + model, err := loadOrCreate(c.File) + if err != nil { + err = errors.Wrap(err, "error loading the file containing the transfers") + return nil, err + } + pDriver := &pDriver{ + model: model, + } + + return &rclone{ + config: c, + client: client, + pDriver: pDriver, + }, nil +} + +func parseConfig(m map[string]interface{}) (*config, error) { + c := &config{} + if err := mapstructure.Decode(m, c); err != nil { + err = errors.Wrap(err, "error decoding conf") + return nil, err + } + return c, nil +} + +func loadOrCreate(file string) (*transferModel, error) { + _, err := os.Stat(file) + if os.IsNotExist(err) { + if err := ioutil.WriteFile(file, []byte("{}"), 0700); err != nil { + err = errors.Wrap(err, "error creating the transfers storage file: "+file) + return nil, err + } + } + + fd, err := os.OpenFile(file, os.O_CREATE, 0644) + if err != nil { + err = errors.Wrap(err, "error opening the transfers storage file: "+file) + return nil, err + } + defer fd.Close() + + data, err := ioutil.ReadAll(fd) + if err != nil { + err = errors.Wrap(err, "error reading the data") + return nil, err + } + + model := &transferModel{} + if err := json.Unmarshal(data, model); err != nil { + err = errors.Wrap(err, "error decoding transfers data to json") + return nil, err + } + + if model.Transfers == nil { + model.Transfers = make(map[string]*transfer) + } + + model.File = file + return model, nil +} + +// saveTransfer saves the transfer. If an error is specified than that error will be returned, possibly wrapped with additional errors. +func (m *transferModel) saveTransfer(e error) error { + data, err := json.Marshal(m) + if err != nil { + e = errors.Wrap(err, "error encoding transfer data to json") + return e + } + + if err := ioutil.WriteFile(m.File, data, 0644); err != nil { + e = errors.Wrap(err, "error writing transfer data to file: "+m.File) + return e + } + + return e +} + +// StartTransfer initiates a transfer job and returns a TxInfo object that includes a unique transfer id. +func (driver *rclone) StartTransfer(ctx context.Context, srcRemote string, srcPath string, srcToken string, destRemote string, destPath string, destToken string) (*datatx.TxInfo, error) { + return driver.startJob(ctx, "", srcRemote, srcPath, srcToken, destRemote, destPath, destToken) +} + +// startJob starts a transfer job. Retries a previous job if transferID is specified. +func (driver *rclone) startJob(ctx context.Context, transferID string, srcRemote string, srcPath string, srcToken string, destRemote string, destPath string, destToken string) (*datatx.TxInfo, error) { + logger := appctx.GetLogger(ctx) + + driver.pDriver.Lock() + defer driver.pDriver.Unlock() + + var txID string + var cTime *typespb.Timestamp + + if transferID == "" { + txID = uuid.New().String() + cTime = &typespb.Timestamp{Seconds: uint64(time.Now().Unix())} + } else { // restart existing transfer if transferID is specified + logger.Debug().Msgf("Restarting transfer (txID: %s)", transferID) + txID = transferID + transfer, err := driver.pDriver.model.getTransfer(txID) + if err != nil { + err = errors.Wrap(err, "rclone: error retrying transfer (transferID: "+txID+")") + return &datatx.TxInfo{ + Id: &datatx.TxId{OpaqueId: txID}, + Status: datatx.Status_STATUS_INVALID, + Ctime: nil, + }, err + } + seconds, _ := strconv.ParseInt(transfer.Ctime, 10, 64) + cTime = &typespb.Timestamp{Seconds: uint64(seconds)} + _, endStatusFound := txEndStatuses[transfer.TransferStatus.String()] + if !endStatusFound { + err := errors.New("rclone: transfer still running, unable to restart") + return &datatx.TxInfo{ + Id: &datatx.TxId{OpaqueId: txID}, + Status: transfer.TransferStatus, + Ctime: cTime, + }, err + } + srcToken = transfer.SrcToken + srcRemote = transfer.SrcRemote + srcPath = transfer.SrcPath + destToken = transfer.DestToken + destRemote = transfer.DestRemote + destPath = transfer.DestPath + delete(driver.pDriver.model.Transfers, txID) + } + + transferStatus := datatx.Status_STATUS_TRANSFER_NEW + + transfer := &transfer{ + TransferID: txID, + JobID: int64(-1), + TransferStatus: transferStatus, + SrcToken: srcToken, + SrcRemote: srcRemote, + SrcPath: srcPath, + DestToken: destToken, + DestRemote: destRemote, + DestPath: destPath, + Ctime: fmt.Sprint(cTime.Seconds), // TODO do we need nanos here? + } + + driver.pDriver.model.Transfers[txID] = transfer + + type rcloneAsyncReqJSON struct { + SrcFs string `json:"srcFs"` + // SrcToken string `json:"srcToken"` + DstFs string `json:"dstFs"` + // DstToken string `json:"destToken"` + Async bool `json:"_async"` + } + srcFs := fmt.Sprintf(":webdav,headers=\"x-access-token,%v\",url=\"%v\":%v", srcToken, srcRemote, srcPath) + dstFs := fmt.Sprintf(":webdav,headers=\"x-access-token,%v\",url=\"%v\":%v", destToken, destRemote, destPath) + rcloneReq := &rcloneAsyncReqJSON{ + SrcFs: srcFs, + DstFs: dstFs, + Async: true, + } + data, err := json.Marshal(rcloneReq) + if err != nil { + err = errors.Wrap(err, "rclone: error pulling transfer: error marshalling rclone req data") + transfer.TransferStatus = datatx.Status_STATUS_INVALID + return &datatx.TxInfo{ + Id: &datatx.TxId{OpaqueId: txID}, + Status: datatx.Status_STATUS_INVALID, + Ctime: cTime, + }, driver.pDriver.model.saveTransfer(err) + } + + transferFileMethod := "/sync/copy" + remotePathIsFolder, err := driver.remotePathIsFolder(srcRemote, srcPath, srcToken) + if err != nil { + err = errors.Wrap(err, "rclone: error pulling transfer: error stating src path") + transfer.TransferStatus = datatx.Status_STATUS_INVALID + return &datatx.TxInfo{ + Id: &datatx.TxId{OpaqueId: txID}, + Status: datatx.Status_STATUS_INVALID, + Ctime: cTime, + }, driver.pDriver.model.saveTransfer(err) + } + if !remotePathIsFolder { + err = errors.Wrap(err, "rclone: error pulling transfer: path is a file, only folder transfer is implemented") + transfer.TransferStatus = datatx.Status_STATUS_INVALID + return &datatx.TxInfo{ + Id: &datatx.TxId{OpaqueId: txID}, + Status: datatx.Status_STATUS_INVALID, + Ctime: cTime, + }, driver.pDriver.model.saveTransfer(err) + } + + u, err := url.Parse(driver.config.Endpoint) + if err != nil { + err = errors.Wrap(err, "rclone: error pulling transfer: error parsing driver endpoint") + transfer.TransferStatus = datatx.Status_STATUS_INVALID + return &datatx.TxInfo{ + Id: &datatx.TxId{OpaqueId: txID}, + Status: datatx.Status_STATUS_INVALID, + Ctime: cTime, + }, driver.pDriver.model.saveTransfer(err) + } + u.Path = path.Join(u.Path, transferFileMethod) + requestURL := u.String() + req, err := http.NewRequest("POST", requestURL, bytes.NewReader(data)) + if err != nil { + err = errors.Wrap(err, "rclone: error pulling transfer: error framing post request") + transfer.TransferStatus = datatx.Status_STATUS_TRANSFER_FAILED + return &datatx.TxInfo{ + Id: &datatx.TxId{OpaqueId: txID}, + Status: transfer.TransferStatus, + Ctime: cTime, + }, driver.pDriver.model.saveTransfer(err) + } + + req.Header.Set("Content-Type", "application/json") + req.SetBasicAuth(driver.config.AuthUser, driver.config.AuthPass) + res, err := driver.client.Do(req) + if err != nil { + err = errors.Wrap(err, "rclone: error pulling transfer: error sending post request") + transfer.TransferStatus = datatx.Status_STATUS_TRANSFER_FAILED + return &datatx.TxInfo{ + Id: &datatx.TxId{OpaqueId: txID}, + Status: transfer.TransferStatus, + Ctime: cTime, + }, driver.pDriver.model.saveTransfer(err) + } + + defer res.Body.Close() + + if res.StatusCode != http.StatusOK { + var errorResData rcloneHTTPErrorRes + if err = json.NewDecoder(res.Body).Decode(&errorResData); err != nil { + err = errors.Wrap(err, "rclone driver: error decoding response data") + transfer.TransferStatus = datatx.Status_STATUS_TRANSFER_FAILED + return &datatx.TxInfo{ + Id: &datatx.TxId{OpaqueId: txID}, + Status: transfer.TransferStatus, + Ctime: cTime, + }, driver.pDriver.model.saveTransfer(err) + } + e := errors.New("rclone: rclone request responded with error, " + fmt.Sprintf(" status: %v, error: %v", errorResData.Status, errorResData.Error)) + transfer.TransferStatus = datatx.Status_STATUS_TRANSFER_FAILED + return &datatx.TxInfo{ + Id: &datatx.TxId{OpaqueId: txID}, + Status: transfer.TransferStatus, + Ctime: cTime, + }, driver.pDriver.model.saveTransfer(e) + } + + type rcloneAsyncResJSON struct { + JobID int64 `json:"jobid"` + } + var resData rcloneAsyncResJSON + if err = json.NewDecoder(res.Body).Decode(&resData); err != nil { + err = errors.Wrap(err, "rclone: error decoding response data") + transfer.TransferStatus = datatx.Status_STATUS_TRANSFER_FAILED + return &datatx.TxInfo{ + Id: &datatx.TxId{OpaqueId: txID}, + Status: transfer.TransferStatus, + Ctime: cTime, + }, driver.pDriver.model.saveTransfer(err) + } + + transfer.JobID = resData.JobID + + if err := driver.pDriver.model.saveTransfer(nil); err != nil { + err = errors.Wrap(err, "rclone: error pulling transfer") + return &datatx.TxInfo{ + Id: &datatx.TxId{OpaqueId: txID}, + Status: datatx.Status_STATUS_INVALID, + Ctime: cTime, + }, err + } + + // start separate dedicated process to periodically check the transfer progress + go func() { + // runs for as long as no end state or time out has been reached + startTimeMs := time.Now().Nanosecond() / 1000 + timeout := driver.config.JobTimeout + + driver.pDriver.Lock() + defer driver.pDriver.Unlock() + + for { + transfer, err := driver.pDriver.model.getTransfer(txID) + if err != nil { + transfer.TransferStatus = datatx.Status_STATUS_INVALID + err = driver.pDriver.model.saveTransfer(err) + logger.Error().Err(err).Msgf("rclone driver: unable to retrieve transfer with id: %v", txID) + break + } + + // check for end status first + _, endStatusFound := txEndStatuses[transfer.TransferStatus.String()] + if endStatusFound { + logger.Info().Msgf("rclone driver: transfer endstatus reached: %v", transfer.TransferStatus) + break + } + + // check for possible timeout and if true were done + currentTimeMs := time.Now().Nanosecond() / 1000 + timePastMs := currentTimeMs - startTimeMs + + if timePastMs > timeout { + logger.Info().Msgf("rclone driver: transfer timed out: %vms (timeout = %v)", timePastMs, timeout) + // set status to EXPIRED and save + transfer.TransferStatus = datatx.Status_STATUS_TRANSFER_EXPIRED + if err := driver.pDriver.model.saveTransfer(nil); err != nil { + logger.Error().Err(err).Msgf("rclone driver: save transfer failed: %v", err) + } + break + } + + jobID := transfer.JobID + type rcloneStatusReqJSON struct { + JobID int64 `json:"jobid"` + } + rcloneStatusReq := &rcloneStatusReqJSON{ + JobID: jobID, + } + + data, err := json.Marshal(rcloneStatusReq) + if err != nil { + logger.Error().Err(err).Msgf("rclone driver: marshalling request failed: %v", err) + transfer.TransferStatus = datatx.Status_STATUS_INVALID + if err := driver.pDriver.model.saveTransfer(nil); err != nil { + logger.Error().Err(err).Msgf("rclone driver: save transfer failed: %v", err) + } + break + } + + transferFileMethod := "/job/status" + + u, err := url.Parse(driver.config.Endpoint) + if err != nil { + logger.Error().Err(err).Msgf("rclone driver: could not parse driver endpoint: %v", err) + transfer.TransferStatus = datatx.Status_STATUS_INVALID + if err := driver.pDriver.model.saveTransfer(nil); err != nil { + logger.Error().Err(err).Msgf("rclone driver: save transfer failed: %v", err) + } + break + } + u.Path = path.Join(u.Path, transferFileMethod) + requestURL := u.String() + + req, err := http.NewRequest("POST", requestURL, bytes.NewReader(data)) + if err != nil { + logger.Error().Err(err).Msgf("rclone driver: error framing post request: %v", err) + transfer.TransferStatus = datatx.Status_STATUS_INVALID + if err := driver.pDriver.model.saveTransfer(nil); err != nil { + logger.Error().Err(err).Msgf("rclone driver: save transfer failed: %v", err) + } + break + } + req.Header.Set("Content-Type", "application/json") + req.SetBasicAuth(driver.config.AuthUser, driver.config.AuthPass) + res, err := driver.client.Do(req) + if err != nil { + logger.Error().Err(err).Msgf("rclone driver: error sending post request: %v", err) + transfer.TransferStatus = datatx.Status_STATUS_INVALID + if err := driver.pDriver.model.saveTransfer(nil); err != nil { + logger.Error().Err(err).Msgf("rclone driver: save transfer failed: %v", err) + } + break + } + + defer res.Body.Close() + + if res.StatusCode != http.StatusOK { + var errorResData rcloneHTTPErrorRes + if err = json.NewDecoder(res.Body).Decode(&errorResData); err != nil { + err = errors.Wrap(err, "rclone driver: error decoding response data") + logger.Error().Err(err).Msgf("rclone driver: error reading response body: %v", err) + } + logger.Error().Err(err).Msgf("rclone driver: rclone request responded with error, status: %v, error: %v", errorResData.Status, errorResData.Error) + transfer.TransferStatus = datatx.Status_STATUS_INVALID + if err := driver.pDriver.model.saveTransfer(nil); err != nil { + logger.Error().Err(err).Msgf("rclone driver: save transfer failed: %v", err) + } + break + } + + type rcloneStatusResJSON struct { + Finished bool `json:"finished"` + Success bool `json:"success"` + ID int64 `json:"id"` + Error string `json:"error"` + Group string `json:"group"` + StartTime string `json:"startTime"` + EndTime string `json:"endTime"` + Duration float64 `json:"duration"` + // think we don't need this + // "output": {} // output of the job as would have been returned if called synchronously + } + var resData rcloneStatusResJSON + if err = json.NewDecoder(res.Body).Decode(&resData); err != nil { + logger.Error().Err(err).Msgf("rclone driver: error decoding response data: %v", err) + break + } + + if resData.Error != "" { + logger.Error().Err(err).Msgf("rclone driver: rclone responded with error: %v", resData.Error) + transfer.TransferStatus = datatx.Status_STATUS_TRANSFER_FAILED + if err := driver.pDriver.model.saveTransfer(nil); err != nil { + logger.Error().Err(err).Msgf("rclone driver: error saving transfer: %v", err) + break + } + break + } + + // transfer complete + if resData.Finished && resData.Success { + logger.Info().Msg("rclone driver: transfer job finished") + transfer.TransferStatus = datatx.Status_STATUS_TRANSFER_COMPLETE + if err := driver.pDriver.model.saveTransfer(nil); err != nil { + logger.Error().Err(err).Msgf("rclone driver: error saving transfer: %v", err) + break + } + break + } + + // transfer completed unsuccessfully without error + if resData.Finished && !resData.Success { + logger.Info().Msgf("rclone driver: transfer job failed") + transfer.TransferStatus = datatx.Status_STATUS_TRANSFER_FAILED + if err := driver.pDriver.model.saveTransfer(nil); err != nil { + logger.Error().Err(err).Msgf("rclone driver: error saving transfer: %v", err) + break + } + break + } + + // transfer not yet finished: continue + if !resData.Finished { + logger.Info().Msgf("rclone driver: transfer job in progress") + transfer.TransferStatus = datatx.Status_STATUS_TRANSFER_IN_PROGRESS + if err := driver.pDriver.model.saveTransfer(nil); err != nil { + logger.Error().Err(err).Msgf("rclone driver: error saving transfer: %v", err) + break + } + } + + <-time.After(time.Millisecond * time.Duration(driver.config.JobStatusCheckInterval)) + } + }() + + return &datatx.TxInfo{ + Id: &datatx.TxId{OpaqueId: txID}, + Status: transferStatus, + Ctime: cTime, + }, nil +} + +// GetTransferStatus returns the status of the transfer with the specified job id +func (driver *rclone) GetTransferStatus(ctx context.Context, transferID string) (*datatx.TxInfo, error) { + transfer, err := driver.pDriver.model.getTransfer(transferID) + if err != nil { + return &datatx.TxInfo{ + Id: &datatx.TxId{OpaqueId: transferID}, + Status: datatx.Status_STATUS_INVALID, + Ctime: nil, + }, err + } + cTime, _ := strconv.ParseInt(transfer.Ctime, 10, 64) + return &datatx.TxInfo{ + Id: &datatx.TxId{OpaqueId: transferID}, + Status: transfer.TransferStatus, + Ctime: &typespb.Timestamp{Seconds: uint64(cTime)}, + }, nil +} + +// CancelTransfer cancels the transfer with the specified transfer id +func (driver *rclone) CancelTransfer(ctx context.Context, transferID string) (*datatx.TxInfo, error) { + transfer, err := driver.pDriver.model.getTransfer(transferID) + if err != nil { + return &datatx.TxInfo{ + Id: &datatx.TxId{OpaqueId: transferID}, + Status: datatx.Status_STATUS_INVALID, + Ctime: nil, + }, err + } + cTime, _ := strconv.ParseInt(transfer.Ctime, 10, 64) + _, endStatusFound := txEndStatuses[transfer.TransferStatus.String()] + if endStatusFound { + err := errors.New("rclone driver: transfer already in end state") + return &datatx.TxInfo{ + Id: &datatx.TxId{OpaqueId: transferID}, + Status: datatx.Status_STATUS_INVALID, + Ctime: &typespb.Timestamp{Seconds: uint64(cTime)}, + }, err + } + + // rcloneStop the rclone job/stop method json request + type rcloneStopRequest struct { + JobID int64 `json:"jobid"` + } + rcloneCancelTransferReq := &rcloneStopRequest{ + JobID: transfer.JobID, + } + + data, err := json.Marshal(rcloneCancelTransferReq) + if err != nil { + err = errors.Wrap(err, "rclone driver: error marshalling rclone req data") + return &datatx.TxInfo{ + Id: &datatx.TxId{OpaqueId: transferID}, + Status: datatx.Status_STATUS_INVALID, + Ctime: &typespb.Timestamp{Seconds: uint64(cTime)}, + }, err + } + + transferFileMethod := "/job/stop" + + u, err := url.Parse(driver.config.Endpoint) + if err != nil { + err = errors.Wrap(err, "rclone driver: error parsing driver endpoint") + return &datatx.TxInfo{ + Id: &datatx.TxId{OpaqueId: transferID}, + Status: datatx.Status_STATUS_INVALID, + Ctime: &typespb.Timestamp{Seconds: uint64(cTime)}, + }, err + } + u.Path = path.Join(u.Path, transferFileMethod) + requestURL := u.String() + + req, err := http.NewRequest("POST", requestURL, bytes.NewReader(data)) + if err != nil { + err = errors.Wrap(err, "rclone driver: error framing post request") + return &datatx.TxInfo{ + Id: &datatx.TxId{OpaqueId: transferID}, + Status: datatx.Status_STATUS_INVALID, + Ctime: &typespb.Timestamp{Seconds: uint64(cTime)}, + }, err + } + req.Header.Set("Content-Type", "application/json") + + req.SetBasicAuth(driver.config.AuthUser, driver.config.AuthPass) + + res, err := driver.client.Do(req) + if err != nil { + err = errors.Wrap(err, "rclone driver: error sending post request") + return &datatx.TxInfo{ + Id: &datatx.TxId{OpaqueId: transferID}, + Status: datatx.Status_STATUS_INVALID, + Ctime: &typespb.Timestamp{Seconds: uint64(cTime)}, + }, err + } + + defer res.Body.Close() + + if res.StatusCode != http.StatusOK { + var errorResData rcloneHTTPErrorRes + if err = json.NewDecoder(res.Body).Decode(&errorResData); err != nil { + err = errors.Wrap(err, "rclone driver: error decoding response data") + return &datatx.TxInfo{ + Id: &datatx.TxId{OpaqueId: transferID}, + Status: datatx.Status_STATUS_INVALID, + Ctime: &typespb.Timestamp{Seconds: uint64(cTime)}, + }, err + } + err = errors.Wrap(errors.Errorf("status: %v, error: %v", errorResData.Status, errorResData.Error), "rclone driver: rclone request responded with error") + return &datatx.TxInfo{ + Id: &datatx.TxId{OpaqueId: transferID}, + Status: datatx.Status_STATUS_INVALID, + Ctime: &typespb.Timestamp{Seconds: uint64(cTime)}, + }, err + } + + type rcloneCancelTransferResJSON struct { + Finished bool `json:"finished"` + Success bool `json:"success"` + ID int64 `json:"id"` + Error string `json:"error"` + Group string `json:"group"` + StartTime string `json:"startTime"` + EndTime string `json:"endTime"` + Duration float64 `json:"duration"` + // think we don't need this + // "output": {} // output of the job as would have been returned if called synchronously + } + var resData rcloneCancelTransferResJSON + if err = json.NewDecoder(res.Body).Decode(&resData); err != nil { + err = errors.Wrap(err, "rclone driver: error decoding response data") + return &datatx.TxInfo{ + Id: &datatx.TxId{OpaqueId: transferID}, + Status: datatx.Status_STATUS_INVALID, + Ctime: &typespb.Timestamp{Seconds: uint64(cTime)}, + }, err + } + + if resData.Error != "" { + return &datatx.TxInfo{ + Id: &datatx.TxId{OpaqueId: transferID}, + Status: datatx.Status_STATUS_TRANSFER_CANCEL_FAILED, + Ctime: &typespb.Timestamp{Seconds: uint64(cTime)}, + }, errors.New(resData.Error) + } + + transfer.TransferStatus = datatx.Status_STATUS_TRANSFER_CANCELLED + if err := driver.pDriver.model.saveTransfer(nil); err != nil { + return &datatx.TxInfo{ + Id: &datatx.TxId{OpaqueId: transferID}, + Status: datatx.Status_STATUS_INVALID, + Ctime: &typespb.Timestamp{Seconds: uint64(cTime)}, + }, err + } + + return &datatx.TxInfo{ + Id: &datatx.TxId{OpaqueId: transferID}, + Status: datatx.Status_STATUS_TRANSFER_CANCELLED, + Ctime: &typespb.Timestamp{Seconds: uint64(cTime)}, + }, nil +} + +// RetryTransfer retries the transfer with the specified transfer ID. +// Note that tokens must still be valid. +func (driver *rclone) RetryTransfer(ctx context.Context, transferID string) (*datatx.TxInfo, error) { + return driver.startJob(ctx, transferID, "", "", "", "", "", "") +} + +// getTransfer returns the transfer with the specified transfer ID +func (m *transferModel) getTransfer(transferID string) (*transfer, error) { + transfer, ok := m.Transfers[transferID] + if !ok { + return nil, errors.New("rclone driver: invalid transfer ID") + } + return transfer, nil +} + +func (driver *rclone) remotePathIsFolder(remote string, remotePath string, remoteToken string) (bool, error) { + type rcloneListReqJSON struct { + Fs string `json:"fs"` + Remote string `json:"remote"` + } + fs := fmt.Sprintf(":webdav,headers=\"x-access-token,%v\",url=\"%v\":", remoteToken, remote) + rcloneReq := &rcloneListReqJSON{ + Fs: fs, + Remote: remotePath, + } + data, err := json.Marshal(rcloneReq) + if err != nil { + return false, errors.Wrap(err, "rclone: error marshalling rclone req data") + } + + listMethod := "/operations/list" + + u, err := url.Parse(driver.config.Endpoint) + if err != nil { + return false, errors.Wrap(err, "rclone driver: error parsing driver endpoint") + } + u.Path = path.Join(u.Path, listMethod) + requestURL := u.String() + + req, err := http.NewRequest("POST", requestURL, bytes.NewReader(data)) + if err != nil { + return false, errors.Wrap(err, "rclone driver: error framing post request") + } + req.Header.Set("Content-Type", "application/json") + + req.SetBasicAuth(driver.config.AuthUser, driver.config.AuthPass) + + res, err := driver.client.Do(req) + if err != nil { + return false, errors.Wrap(err, "rclone driver: error sending post request") + } + + defer res.Body.Close() + + if res.StatusCode != http.StatusOK { + var errorResData rcloneHTTPErrorRes + if err = json.NewDecoder(res.Body).Decode(&errorResData); err != nil { + return false, errors.Wrap(err, "rclone driver: error decoding response data") + } + return false, errors.Wrap(errors.Errorf("status: %v, error: %v", errorResData.Status, errorResData.Error), "rclone driver: rclone request responded with error") + } + + type item struct { + Path string `json:"Path"` + Name string `json:"Name"` + Size int64 `json:"Size"` + MimeType string `json:"MimeType"` + ModTime string `json:"ModTime"` + IsDir bool `json:"IsDir"` + } + type rcloneListResJSON struct { + List []*item `json:"list"` + } + + var resData rcloneListResJSON + if err = json.NewDecoder(res.Body).Decode(&resData); err != nil { + return false, errors.Wrap(err, "rclone driver: error decoding response data") + } + + // a file will return one single item, the file, with path being the remote path and IsDir will be false + if len(resData.List) == 1 && resData.List[0].Path == remotePath && !resData.List[0].IsDir { + return false, nil + } + + // in all other cases the remote path is a directory + return true, nil +} diff --git a/pkg/datatx/manager/registry/registry.go b/pkg/datatx/manager/registry/registry.go new file mode 100644 index 0000000000..95a4830b3c --- /dev/null +++ b/pkg/datatx/manager/registry/registry.go @@ -0,0 +1,36 @@ +// Copyright 2018-2020 CERN +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// In applying this license, CERN does not waive the privileges and immunities +// granted to it by virtue of its status as an Intergovernmental Organization +// or submit itself to any jurisdiction. + +package registry + +import ( + "github.com/cs3org/reva/pkg/datatx" +) + +// NewFunc is the function that datatx implementations +// should register at init time. +type NewFunc func(map[string]interface{}) (datatx.Manager, error) + +// NewFuncs is a map containing all the registered datatx backends. +var NewFuncs = map[string]NewFunc{} + +// Register registers a new datatx backend new function. +// Not safe for concurrent use. Safe for use from package init. +func Register(name string, f NewFunc) { + NewFuncs[name] = f +} diff --git a/pkg/storage/utils/localfs/localfs.go b/pkg/storage/utils/localfs/localfs.go index 4c1a68f225..dce53038d8 100644 --- a/pkg/storage/utils/localfs/localfs.go +++ b/pkg/storage/utils/localfs/localfs.go @@ -76,7 +76,7 @@ func (c *Config) init() { } if c.DataTransfersFolder == "" { - c.DataTransfersFolder = "/Data-Transfers" + c.DataTransfersFolder = "/DataTransfers" } // ensure share folder always starts with slash