Skip to content

Commit

Permalink
Implemented ListExistingReceivedShares without caching
Browse files Browse the repository at this point in the history
  • Loading branch information
glpatcern committed Apr 25, 2024
1 parent 8d523d6 commit f4fec76
Show file tree
Hide file tree
Showing 2 changed files with 88 additions and 65 deletions.
54 changes: 52 additions & 2 deletions internal/grpc/services/gateway/usershareprovider.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"fmt"
"path"

"github.com/alitto/pond"
gateway "github.com/cs3org/go-cs3apis/cs3/gateway/v1beta1"
userpb "github.com/cs3org/go-cs3apis/cs3/identity/user/v1beta1"
rpc "github.com/cs3org/go-cs3apis/cs3/rpc/v1beta1"
Expand Down Expand Up @@ -253,8 +254,57 @@ func (s *svc) ListReceivedShares(ctx context.Context, req *collaboration.ListRec
return res, nil
}

func (s *svc) ListExistingReceivedShares(context.Context, *collaboration.ListReceivedSharesRequest) (*gateway.ListExistingReceivedSharesResponse, error) {
panic("ListExistingReceivedShares: not yet implemented")
func (s *svc) ListExistingReceivedShares(ctx context.Context, req *collaboration.ListReceivedSharesRequest) (*gateway.ListExistingReceivedSharesResponse, error) {
rshares, err := s.ListReceivedShares(ctx, req)
if err != nil {
return nil, err
}

sharesCh := make(chan *gateway.SharedResourceInfo, len(rshares.Shares))
pool := pond.New(50, len(rshares.Shares))
for _, rs := range rshares.Shares {
rs := rs
pool.Submit(func() {
if rs.State == collaboration.ShareState_SHARE_STATE_REJECTED || rs.State == collaboration.ShareState_SHARE_STATE_INVALID {
return
}

// TODO(lopresti) incorporate the cache layer from internal/http/services/owncloud/ocs/handlers/apps/sharing/shares/shares.go
stat, err := s.Stat(ctx, &provider.StatRequest{
Ref: &provider.Reference{
ResourceId: rs.Share.ResourceId,
},
})
if err != nil {
return
}
if stat.Status.Code != rpc.Code_CODE_OK {
return
}

sharesCh <- &gateway.SharedResourceInfo{
ResourceInfo: stat.Info,
Share: rs,
}
})
}

sris := make([]*gateway.SharedResourceInfo, 0, len(rshares.Shares))
done := make(chan struct{})
go func() {
for s := range sharesCh {
sris = append(sris, s)
}
done <- struct{}{}
}()
pool.StopAndWait()
close(sharesCh)
<-done
close(done)

return &gateway.ListExistingReceivedSharesResponse{
Shares: sris,
}, nil
}

func (s *svc) GetReceivedShare(ctx context.Context, req *collaboration.GetReceivedShareRequest) (*collaboration.GetReceivedShareResponse, error) {
Expand Down
99 changes: 36 additions & 63 deletions internal/http/services/owncloud/ocgraph/drives.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ import (
"time"

"github.com/CiscoM31/godata"
"github.com/alitto/pond"
gateway "github.com/cs3org/go-cs3apis/cs3/gateway/v1beta1"
userpb "github.com/cs3org/go-cs3apis/cs3/identity/user/v1beta1"
rpcv1beta1 "github.com/cs3org/go-cs3apis/cs3/rpc/v1beta1"
Expand Down Expand Up @@ -121,90 +120,47 @@ func isMountpointRequest(request *godata.GoDataRequest) bool {
const shareJailID = "a0ca6a90-a365-4782-871e-d44447bbc668"

func getDrivesForShares(ctx context.Context, gw gateway.GatewayAPIClient) ([]*libregraph.Drive, error) {
res, err := gw.ListReceivedShares(ctx, &collaborationv1beta1.ListReceivedSharesRequest{})
res, err := gw.ListExistingReceivedShares(ctx, &collaborationv1beta1.ListReceivedSharesRequest{})
if err != nil {
return nil, err
}

if res.Status.Code != rpcv1beta1.Code_CODE_OK {
return nil, errors.New(res.Status.Message)
}

pool := pond.New(50, len(res.Shares))
spaces := make(chan *libregraph.Drive, len(res.Shares))

spacesRes := make([]*libregraph.Drive, 0, len(res.Shares))
for _, s := range res.Shares {
s := s
pool.Submit(func() {
if s.State == collaborationv1beta1.ShareState_SHARE_STATE_REJECTED || s.State == collaborationv1beta1.ShareState_SHARE_STATE_INVALID {
return
}
space, err := convertShareToSpace(ctx, gw, s.Share)
if err != nil {
return
}
spaces <- space
})
spacesRes = append(spacesRes, convertShareToSpace(s))
}

done := make(chan struct{})
go func() {
for s := range spaces {
spacesRes = append(spacesRes, s)
}
done <- struct{}{}
}()

pool.StopAndWait()
close(spaces)
<-done
close(done)

return spacesRes, nil
}

func convertShareToSpace(ctx context.Context, gw gateway.GatewayAPIClient, share *collaborationv1beta1.Share) (*libregraph.Drive, error) {
stat, err := gw.Stat(ctx, &providerpb.StatRequest{
Ref: &providerpb.Reference{
ResourceId: share.ResourceId,
},
})
if err != nil {
return nil, err
}

if stat.Status.Code != rpcv1beta1.Code_CODE_OK {
return nil, errors.New(stat.Status.Message)
}

func convertShareToSpace(share *gateway.SharedResourceInfo) *libregraph.Drive {
// the prefix of the remote_item.id and rootid

space := &libregraph.Drive{
Id: libregraph.PtrString(fmt.Sprintf("%s$%s!%s", shareJailID, shareJailID, share.Id.OpaqueId)),
return &libregraph.Drive{
Id: libregraph.PtrString(fmt.Sprintf("%s$%s!%s", shareJailID, shareJailID, share.Share.Share.Id.OpaqueId)),
DriveType: libregraph.PtrString("mountpoint"),
DriveAlias: libregraph.PtrString(share.Id.OpaqueId), // this is not used, but must not be the same alias as the drive item
Name: filepath.Base(stat.Info.Path),
DriveAlias: libregraph.PtrString(share.Share.Share.Id.OpaqueId), // this is not used, but must not be the same alias as the drive item
Name: filepath.Base(share.ResourceInfo.Path),
Root: &libregraph.DriveItem{
Id: libregraph.PtrString(fmt.Sprintf("%s$%s!%s", shareJailID, shareJailID, share.Id.OpaqueId)),
Id: libregraph.PtrString(fmt.Sprintf("%s$%s!%s", shareJailID, shareJailID, share.Share.Share.Id.OpaqueId)),
RemoteItem: &libregraph.RemoteItem{
DriveAlias: libregraph.PtrString(strings.TrimSuffix(strings.TrimPrefix(stat.Info.Path, "/"), relativePathToSpaceID(stat.Info))), // the drive alias must not start with /
ETag: libregraph.PtrString(stat.Info.Etag),
DriveAlias: libregraph.PtrString(strings.TrimSuffix(strings.TrimPrefix(share.ResourceInfo.Path, "/"), relativePathToSpaceID(share.ResourceInfo))), // the drive alias must not start with /
ETag: libregraph.PtrString(share.ResourceInfo.Etag),
Folder: &libregraph.Folder{},
// The Id must correspond to the id in the OCS response, for the time being
// It is in the form <something>!<something-else>
Id: libregraph.PtrString(spaces.EncodeResourceID(stat.Info.Id)),
LastModifiedDateTime: libregraph.PtrTime(time.Unix(int64(stat.Info.Mtime.Seconds), int64(stat.Info.Mtime.Nanos))),
Name: libregraph.PtrString(filepath.Base(stat.Info.Path)),
Path: libregraph.PtrString(relativePathToSpaceID(stat.Info)),
Id: libregraph.PtrString(spaces.EncodeResourceID(share.ResourceInfo.Id)),
LastModifiedDateTime: libregraph.PtrTime(time.Unix(int64(share.ResourceInfo.Mtime.Seconds), int64(share.ResourceInfo.Mtime.Nanos))),
Name: libregraph.PtrString(filepath.Base(share.ResourceInfo.Path)),
Path: libregraph.PtrString(relativePathToSpaceID(share.ResourceInfo)),
// RootId must have the same token before ! as Id
// the second part for the time being is not used
RootId: libregraph.PtrString(fmt.Sprintf("%s!unused_root_id", spaces.EncodeSpaceID(stat.Info.Id.StorageId, stat.Info.Id.SpaceId))),
Size: libregraph.PtrInt64(int64(stat.Info.Size)),
RootId: libregraph.PtrString(fmt.Sprintf("%s!unused_root_id", spaces.EncodeSpaceID(share.ResourceInfo.Id.StorageId, share.ResourceInfo.Id.SpaceId))),
Size: libregraph.PtrInt64(int64(share.ResourceInfo.Size)),
},
},
}
return space, nil
}

func relativePathToSpaceID(info *providerpb.ResourceInfo) string {
Expand Down Expand Up @@ -297,11 +253,28 @@ func (s *svc) getSpace(w http.ResponseWriter, r *http.Request) {
return
}

space, err := convertShareToSpace(ctx, gw, shareRes.Share.Share)
if err == nil {
_ = json.NewEncoder(w).Encode(space)
stat, err := gw.Stat(ctx, &providerpb.StatRequest{
Ref: &providerpb.Reference{
ResourceId: shareRes.Share.Share.ResourceId,
},
})
if err != nil {
log.Error().Err(err).Msg("error statting received share")
w.WriteHeader(http.StatusInternalServerError)
return
}
if stat.Status.Code != rpcv1beta1.Code_CODE_OK {
log.Error().Interface("stat.Status", stat.Status).Msg("error statting received share")
w.WriteHeader(http.StatusInternalServerError)
return
}

space := convertShareToSpace(&gateway.SharedResourceInfo{
ResourceInfo: stat.Info,
Share: shareRes.Share,
})
_ = json.NewEncoder(w).Encode(space)
return
} else {
listRes, err := gw.ListStorageSpaces(ctx, &providerpb.ListStorageSpacesRequest{
Filters: []*providerpb.ListStorageSpacesRequest_Filter{
Expand Down

0 comments on commit f4fec76

Please sign in to comment.