Skip to content

Commit

Permalink
Merge branch 'main' into datatx-pull-model
Browse files Browse the repository at this point in the history
  • Loading branch information
Antoon Prins committed Nov 4, 2021
1 parent 93b3ae1 commit e15d83d
Showing 1 changed file with 64 additions and 174 deletions.
238 changes: 64 additions & 174 deletions internal/grpc/services/gateway/ocmshareprovider.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,36 +207,6 @@ func (s *svc) ListReceivedOCMShares(ctx context.Context, req *ocm.ListReceivedOC

func (s *svc) UpdateReceivedOCMShare(ctx context.Context, req *ocm.UpdateReceivedOCMShareRequest) (*ocm.UpdateReceivedOCMShareResponse, error) {
log := appctx.GetLogger(ctx)

getShareReq := &ocm.GetReceivedOCMShareRequest{Ref: req.Ref}
getShareRes, err := s.GetReceivedOCMShare(ctx, getShareReq)
if err != nil {
log.Err(err).Msg("gateway: error calling GetReceivedShare")
return &ocm.UpdateReceivedOCMShareResponse{
Status: &rpc.Status{Code: rpc.Code_CODE_INTERNAL},
}, nil
}

if getShareRes.Status.Code != rpc.Code_CODE_OK {
log.Error().Msg("gateway: error calling GetReceivedShare")
return &ocm.UpdateReceivedOCMShareResponse{
Status: &rpc.Status{Code: rpc.Code_CODE_INTERNAL},
}, nil
}

share := getShareRes.Share
if share == nil {
panic("gateway: error updating a received share: the share is nil")
}

// return early if transfer type share has already been accepted
if share.GetShare().ShareType == ocm.Share_SHARE_TYPE_TRANSFER && share.GetState() == ocm.ShareState_SHARE_STATE_ACCEPTED {
log.Err(err).Msg("gateway: transfer type share already accepted")
return &ocm.UpdateReceivedOCMShareResponse{
Status: status.NewOK(ctx),
}, nil
}

c, err := pool.GetOCMShareProviderClient(s.c.OCMShareProviderEndpoint)
if err != nil {
err = errors.Wrap(err, "gateway: error calling GetOCMShareProviderClient")
Expand Down Expand Up @@ -275,208 +245,128 @@ func (s *svc) UpdateReceivedOCMShare(ctx context.Context, req *ocm.UpdateReceive
},
}
getShareRes, err := s.GetReceivedOCMShare(ctx, getShareReq)
// we don't commit to storage invalid update fields or empty display names.
if req.Field.GetState() == ocm.ShareState_SHARE_STATE_INVALID && req.Field.GetDisplayName() == "" {
log.Error().Msg("the update field is invalid, aborting reference manipulation")
return res, nil

}

// TODO(labkode): if update field is displayName we need to do a rename on the storage to align
// share display name and storage filename.
if req.Field.GetState() != ocm.ShareState_SHARE_STATE_INVALID {
if req.Field.GetState() == ocm.ShareState_SHARE_STATE_ACCEPTED {
if share.GetShare().ShareType == ocm.Share_SHARE_TYPE_TRANSFER {
srcIdp := share.GetShare().GetOwner().GetIdp()
meshProvider, err := s.GetInfoByDomain(ctx, &ocmprovider.GetInfoByDomainRequest{
Domain: srcIdp,
})
if err != nil {
log.Err(err).Msg("gateway: error calling GetReceivedShare")
return &ocm.UpdateReceivedOCMShareResponse{
Status: &rpc.Status{Code: rpc.Code_CODE_INTERNAL},
Status: &rpc.Status{
Code: rpc.Code_CODE_INTERNAL,
},
}, nil
}

if getShareRes.Status.Code != rpc.Code_CODE_OK {
log.Error().Msg("gateway: error calling GetReceivedShare")
var srcEndpoint string
var srcEndpointBaseURI string
// target URI scheme will be the webdav endpoint scheme
var srcEndpointScheme string
for _, s := range meshProvider.ProviderInfo.Services {
if strings.ToLower(s.Endpoint.Type.Name) == "webdav" {
url, err := url.Parse(s.Endpoint.Path)
if err != nil {
log.Err(err).Msg("gateway: error calling UpdateReceivedShare: unable to parse webdav endpoint " + s.Endpoint.Path)
return &ocm.UpdateReceivedOCMShareResponse{
Status: &rpc.Status{Code: rpc.Code_CODE_INTERNAL},
}, nil
}
srcEndpoint = url.Host
srcEndpointBaseURI = url.Path
srcEndpointScheme = url.Scheme
break
}
}

var srcToken string
srcTokenOpaque, ok := share.GetShare().Grantee.Opaque.Map["token"]
if !ok {
return &ocm.UpdateReceivedOCMShareResponse{
Status: status.NewNotFound(ctx, "token not found"),
}, nil
}
switch srcTokenOpaque.Decoder {
case "plain":
srcToken = string(srcTokenOpaque.Value)
default:
err := errtypes.NotSupported("opaque entry decoder not recognized: " + srcTokenOpaque.Decoder)
return &ocm.UpdateReceivedOCMShareResponse{
Status: status.NewInternal(ctx, err, "error updating received share"),
}, nil
}

srcPath := path.Join(srcEndpointBaseURI, share.GetShare().Name)
srcTargetURI := fmt.Sprintf("%s://%s@%s?name=%s", srcEndpointScheme, srcToken, srcEndpoint, srcPath)

// get the webdav endpoint of the grantee's idp
var granteeIdp string
if share.GetShare().GetGrantee().Type == provider.GranteeType_GRANTEE_TYPE_USER {
granteeIdp = share.GetShare().GetGrantee().GetUserId().Idp
}
if share.GetShare().GetGrantee().Type == provider.GranteeType_GRANTEE_TYPE_GROUP {
granteeIdp = share.GetShare().GetGrantee().GetGroupId().Idp
}
destWebdavEndpoint, err := s.getWebdavEndpoint(ctx, granteeIdp)
if err != nil {
log.Err(err).Msg("gateway: error calling UpdateReceivedShare")
return &ocm.UpdateReceivedOCMShareResponse{
Status: &rpc.Status{Code: rpc.Code_CODE_INTERNAL},
}, nil
}
url, err := url.Parse(destWebdavEndpoint)
if err != nil {
log.Err(err).Msg("gateway: error calling UpdateReceivedShare: unable to parse webdav endpoint " + destWebdavEndpoint)
return &ocm.UpdateReceivedOCMShareResponse{
Status: &rpc.Status{Code: rpc.Code_CODE_INTERNAL},
}, nil
}
destEndpoint := url.Host
destEndpointBaseURI := url.Path
destEndpointScheme := url.Scheme
destToken := ctxpkg.ContextMustGetToken(ctx)
homeRes, err := s.GetHome(ctx, &provider.GetHomeRequest{})
if err != nil {
log.Err(err).Msg("gateway: error calling UpdateReceivedShare")
return &ocm.UpdateReceivedOCMShareResponse{
Status: &rpc.Status{Code: rpc.Code_CODE_INTERNAL},
Status: &rpc.Status{
Code: rpc.Code_CODE_INTERNAL,
},
}, nil
}
destPath := path.Join(destEndpointBaseURI, homeRes.Path, s.c.DataTransfersFolder, path.Base(share.GetShare().Name))
destTargetURI := fmt.Sprintf("%s://%s@%s?name=%s", destEndpointScheme, destToken, destEndpoint, destPath)

share := getShareRes.Share
if share == nil {
panic("gateway: error updating a received share: the share is nil")
opaqueObj := &types.Opaque{
Map: map[string]*types.OpaqueEntry{
"shareId": {
Decoder: "plain",
Value: []byte(share.GetShare().GetId().OpaqueId),
},
},
}
req := &datatx.PullTransferRequest{
SrcTargetUri: srcTargetURI,
DestTargetUri: destTargetURI,
Opaque: opaqueObj,
}

if share.GetShare().ShareType == ocm.Share_SHARE_TYPE_TRANSFER {
idp := share.GetShare().GetOwner().GetIdp()
srcIdp := share.GetShare().GetOwner().GetIdp()
meshProvider, err := s.GetInfoByDomain(ctx, &ocmprovider.GetInfoByDomainRequest{
Domain: idp,
Domain: srcIdp,
})
if err != nil {
log.Err(err).Msg("gateway: error calling GetInfoByDomain")
return &ocm.UpdateReceivedOCMShareResponse{
Status: &rpc.Status{
Code: rpc.Code_CODE_INTERNAL,
},
Status: &rpc.Status{Code: rpc.Code_CODE_INTERNAL},
}, nil
}
var endpoint string
var endpointScheme string
var srcEndpoint string
var srcEndpointBaseURI string
// target URI scheme will be the webdav endpoint scheme
var srcEndpointScheme string
for _, s := range meshProvider.ProviderInfo.Services {
fmt.Printf("provider info service: %v\n", s)
fmt.Printf(" endpoint type name: %v\n", s.Endpoint.Type.Name)
fmt.Printf(" endpoint Path: %v\n", s.Endpoint.Path)
fmt.Printf(" service host: %v\n", s.GetHost())
if strings.ToLower(s.Endpoint.Type.Name) == "webdav" {
url, _ := url.Parse(s.Endpoint.Path)
endpoint = url.Host + url.Path
endpointScheme = url.Scheme
url, err := url.Parse(s.Endpoint.Path)
if err != nil {
log.Err(err).Msg("gateway: error calling UpdateReceivedShare: unable to parse webdav endpoint " + s.Endpoint.Path)
return &ocm.UpdateReceivedOCMShareResponse{
Status: &rpc.Status{Code: rpc.Code_CODE_INTERNAL},
}, nil
}
srcEndpoint = url.Host
srcEndpointBaseURI = url.Path
srcEndpointScheme = url.Scheme
break
}
}

var token string
tokenOpaque, ok := share.GetShare().Grantee.Opaque.Map["token"]
var srcToken string
srcTokenOpaque, ok := share.GetShare().Grantee.Opaque.Map["token"]
if !ok {
return &ocm.UpdateReceivedOCMShareResponse{
Status: status.NewNotFound(ctx, "token not found"),
}, nil
}
switch tokenOpaque.Decoder {
switch srcTokenOpaque.Decoder {
case "plain":
token = string(tokenOpaque.Value)
srcToken = string(srcTokenOpaque.Value)
default:
err := errtypes.NotSupported("opaque entry decoder not recognized: " + tokenOpaque.Decoder)
err := errtypes.NotSupported("opaque entry decoder not recognized: " + srcTokenOpaque.Decoder)
return &ocm.UpdateReceivedOCMShareResponse{
Status: status.NewInternal(ctx, err, "error updating received share"),
}, nil
}

// TODO we provide all necessary info with the targetURI
// either provide this in a 'proper' way or,
// inject the necessary services into datatx service and resolve everything there
targetURI := fmt.Sprintf("://%s@%s?name=%s&endpointscheme=%s", token, endpoint, share.GetShare().Name, endpointScheme)
// src: https(from src webdav endoint)://token@srcwebdavendpoint?name=path
// target idem taken from the grantee
// /home/DataTransfers/home/mytransfer/innerfolder/
fmt.Printf("idp: %v\n", idp)
fmt.Printf("endpoint: %v\n", endpoint)
fmt.Printf("token: %v\n", token)
fmt.Printf("targetURI: %v\n", targetURI)
srcPath := path.Join(srcEndpointBaseURI, share.GetShare().Name)
srcTargetURI := fmt.Sprintf("%s://%s@%s?name=%s", srcEndpointScheme, srcToken, srcEndpoint, srcPath)

// get the webdav endpoint of the grantee's idp
// assume grantee is of type user
granteeIdpEndpoint, err := s.getWebdavEndpoint(ctx, share.GetShare().GetGrantee().GetUserId().Idp)
var granteeIdp string
if share.GetShare().GetGrantee().Type == provider.GranteeType_GRANTEE_TYPE_USER {
granteeIdp = share.GetShare().GetGrantee().GetUserId().Idp
}
if share.GetShare().GetGrantee().Type == provider.GranteeType_GRANTEE_TYPE_GROUP {
granteeIdp = share.GetShare().GetGrantee().GetGroupId().Idp
}
destWebdavEndpoint, err := s.getWebdavEndpoint(ctx, granteeIdp)
if err != nil {
log.Err(err).Msg("gateway: error calling PullTransfer")
log.Err(err).Msg("gateway: error calling UpdateReceivedShare")
return &ocm.UpdateReceivedOCMShareResponse{
Status: &rpc.Status{
Code: rpc.Code_CODE_INTERNAL,
},
Status: &rpc.Status{Code: rpc.Code_CODE_INTERNAL},
}, nil
}
url, err := url.Parse(destWebdavEndpoint)
if err != nil {
log.Err(err).Msg("gateway: error calling UpdateReceivedShare: unable to parse webdav endpoint " + destWebdavEndpoint)
return &ocm.UpdateReceivedOCMShareResponse{
Status: &rpc.Status{Code: rpc.Code_CODE_INTERNAL},
}, nil
}
destEndpoint := url.Host
destEndpointBaseURI := url.Path
destEndpointScheme := url.Scheme
destToken := ctxpkg.ContextMustGetToken(ctx)
homeRes, err := s.GetHome(ctx, &provider.GetHomeRequest{})
if err != nil {
log.Err(err).Msg("gateway: error calling UpdateReceivedShare")
return &ocm.UpdateReceivedOCMShareResponse{
Status: &rpc.Status{Code: rpc.Code_CODE_INTERNAL},
}, nil
}
destPath := path.Join(destEndpointBaseURI, homeRes.Path, s.c.DataTransfersFolder, path.Base(share.GetShare().Name))
destTargetURI := fmt.Sprintf("%s://%s@%s?name=%s", destEndpointScheme, destToken, destEndpoint, destPath)

opaqueObj := &types.Opaque{
Map: map[string]*types.OpaqueEntry{
"shareId": &types.OpaqueEntry{
"shareId": {
Decoder: "plain",
Value: []byte(share.GetShare().GetId().OpaqueId),
},
"endpoint": &types.OpaqueEntry{
Decoder: "plain",
Value: []byte(granteeIdpEndpoint),
},
},
}
req := &datatx.PullTransferRequest{
TargetUri: targetURI,
Opaque: opaqueObj,
SrcTargetUri: srcTargetURI,
DestTargetUri: destTargetURI,
Opaque: opaqueObj,
}
res, err := s.PullTransfer(ctx, req)
if err != nil {
Expand Down

0 comments on commit e15d83d

Please sign in to comment.