Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Delete shares from deleted spaces #4975

Merged
merged 6 commits into from
Nov 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions changelog/unreleased/delete-stale-shares.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
Bugfix: Delete stale shares in the jsoncs3 share manager

The jsoncs3 share manager now properly deletes all references to removed shares and shares that belong to a space that was deleted

https://github.com/cs3org/reva/pull/4975
100 changes: 85 additions & 15 deletions pkg/share/manager/jsoncs3/jsoncs3.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
"github.com/cs3org/reva/v2/pkg/errtypes"
"github.com/cs3org/reva/v2/pkg/events"
"github.com/cs3org/reva/v2/pkg/events/stream"
"github.com/cs3org/reva/v2/pkg/logger"
"github.com/cs3org/reva/v2/pkg/rgrpc/todo/pool"
"github.com/cs3org/reva/v2/pkg/share"
"github.com/cs3org/reva/v2/pkg/share/manager/jsoncs3/providercache"
Expand Down Expand Up @@ -114,6 +115,12 @@ func init() {
registry.Register("jsoncs3", NewDefault)
}

var (
_registeredEvents = []events.Unmarshaller{
events.SpaceDeleted{},
}
)

type config struct {
GatewayAddr string `mapstructure:"gateway_addr"`
MaxConcurrency int `mapstructure:"max_concurrency"`
Expand Down Expand Up @@ -188,7 +195,8 @@ func NewDefault(m map[string]interface{}) (share.Manager, error) {
// New returns a new manager instance.
func New(s metadata.Storage, gatewaySelector pool.Selectable[gatewayv1beta1.GatewayAPIClient], ttlSeconds int, es events.Stream, maxconcurrency int) (*Manager, error) {
ttl := time.Duration(ttlSeconds) * time.Second
return &Manager{

m := &Manager{
Cache: providercache.New(s, ttl),
CreatedCache: sharecache.New(s, "users", "created.json", ttl),
UserReceivedStates: receivedsharecache.New(s, ttl),
Expand All @@ -197,7 +205,18 @@ func New(s metadata.Storage, gatewaySelector pool.Selectable[gatewayv1beta1.Gate
gatewaySelector: gatewaySelector,
eventStream: es,
MaxConcurrency: maxconcurrency,
}, nil
}

// listen for events
if m.eventStream != nil {
ch, err := events.Consume(m.eventStream, "jsoncs3sharemanager", _registeredEvents...)
if err != nil {
appctx.GetLogger(context.Background()).Error().Err(err).Msg("error consuming events")
}
go m.ProcessEvents(ch)
}

return m, nil
}

func (m *Manager) initialize(ctx context.Context) error {
Expand Down Expand Up @@ -248,6 +267,22 @@ func (m *Manager) initialize(ctx context.Context) error {
return nil
}

func (m *Manager) ProcessEvents(ch <-chan events.Event) {
log := logger.New()
for event := range ch {
ctx := context.Background()

if err := m.initialize(ctx); err != nil {
log.Error().Err(err).Msg("error initializing manager")
}

if ev, ok := event.Event.(events.SpaceDeleted); ok {
log.Debug().Msgf("space deleted event: %v", ev)
go func() { m.purgeSpace(ctx, ev.ID) }()
}
}
}

// Share creates a new share
func (m *Manager) Share(ctx context.Context, md *provider.ResourceInfo, g *collaboration.ShareGrant) (*collaboration.Share, error) {
ctx, span := appctx.GetTracerProvider(ctx).Tracer(tracerName).Start(ctx, "Share")
Expand Down Expand Up @@ -420,7 +455,7 @@ func (m *Manager) GetShare(ctx context.Context, ref *collaboration.ShareReferenc
return nil, err
}
if share.IsExpired(s) {
if err := m.removeShare(ctx, s); err != nil {
if err := m.removeShare(ctx, s, false); err != nil {
sublog.Error().Err(err).
Msg("failed to unshare expired share")
}
Expand Down Expand Up @@ -485,7 +520,7 @@ func (m *Manager) Unshare(ctx context.Context, ref *collaboration.ShareReference
return errtypes.NotFound(ref.String())
}

return m.removeShare(ctx, s)
return m.removeShare(ctx, s, false)
}

// UpdateShare updates the mode of the given share.
Expand Down Expand Up @@ -622,7 +657,7 @@ func (m *Manager) listSharesByIDs(ctx context.Context, user *userv1beta1.User, f
resourceID := s.GetResourceId()
sublog = sublog.With().Str("storageid", resourceID.GetStorageId()).Str("spaceid", resourceID.GetSpaceId()).Str("opaqueid", resourceID.GetOpaqueId()).Logger()
if share.IsExpired(s) {
if err := m.removeShare(ctx, s); err != nil {
if err := m.removeShare(ctx, s, false); err != nil {
sublog.Error().Err(err).
Msg("failed to unshare expired share")
}
Expand Down Expand Up @@ -740,7 +775,7 @@ func (m *Manager) listCreatedShares(ctx context.Context, user *userv1beta1.User,
continue
}
if share.IsExpired(s) {
if err := m.removeShare(ctx, s); err != nil {
if err := m.removeShare(ctx, s, false); err != nil {
sublog.Error().Err(err).
Msg("failed to unshare expired share")
}
Expand Down Expand Up @@ -906,7 +941,7 @@ func (m *Manager) ListReceivedShares(ctx context.Context, filters []*collaborati
}
sublogr = sublogr.With().Str("shareid", shareID).Logger()
if share.IsExpired(s) {
if err := m.removeShare(ctx, s); err != nil {
if err := m.removeShare(ctx, s, false); err != nil {
sublogr.Error().Err(err).
Msg("failed to unshare expired share")
}
Expand Down Expand Up @@ -1009,7 +1044,7 @@ func (m *Manager) getReceived(ctx context.Context, ref *collaboration.ShareRefer
return nil, errtypes.NotFound(ref.String())
}
if share.IsExpired(s) {
if err := m.removeShare(ctx, s); err != nil {
if err := m.removeShare(ctx, s, false); err != nil {
sublog.Error().Err(err).
Msg("failed to unshare expired share")
}
Expand Down Expand Up @@ -1136,24 +1171,59 @@ func (m *Manager) Load(ctx context.Context, shareChan <-chan *collaboration.Shar
return nil
}

func (m *Manager) removeShare(ctx context.Context, s *collaboration.Share) error {
func (m *Manager) purgeSpace(ctx context.Context, id *provider.StorageSpaceId) {
log := appctx.GetLogger(ctx)
storageID, spaceID := storagespace.SplitStorageID(id.OpaqueId)

shares, err := m.Cache.ListSpace(ctx, storageID, spaceID)
if err != nil {
log.Error().Err(err).Msg("error listing shares in space")
return
}

// iterate over all shares in the space and remove them
for _, share := range shares.Shares {
err := m.removeShare(ctx, share, true)
if err != nil {
log.Error().Err(err).Msg("error removing share")
}
}

// remove all shares in the space
err = m.Cache.PurgeSpace(ctx, storageID, spaceID)
if err != nil {
log.Error().Err(err).Msg("error purging space")
}
}

func (m *Manager) removeShare(ctx context.Context, s *collaboration.Share, skipSpaceCache bool) error {
ctx, span := appctx.GetTracerProvider(ctx).Tracer(tracerName).Start(ctx, "removeShare")
defer span.End()

eg, ctx := errgroup.WithContext(ctx)
eg.Go(func() error {
storageID, spaceID, _ := shareid.Decode(s.Id.OpaqueId)
err := m.Cache.Remove(ctx, storageID, spaceID, s.Id.OpaqueId)
if !skipSpaceCache {
eg.Go(func() error {
storageID, spaceID, _ := shareid.Decode(s.Id.OpaqueId)
err := m.Cache.Remove(ctx, storageID, spaceID, s.Id.OpaqueId)

return err
})
return err
})
}

eg.Go(func() error {
// remove from created cache
return m.CreatedCache.Remove(ctx, s.GetCreator().GetOpaqueId(), s.Id.OpaqueId)
})

// TODO remove from grantee cache
eg.Go(func() error {
// remove from user received states
if s.GetGrantee().Type == provider.GranteeType_GRANTEE_TYPE_USER {
return m.UserReceivedStates.Remove(ctx, s.GetGrantee().GetUserId().GetOpaqueId(), s.GetResourceId().GetStorageId()+shareid.IDDelimiter+s.GetResourceId().GetSpaceId(), s.Id.OpaqueId)
} else if s.GetGrantee().Type == provider.GranteeType_GRANTEE_TYPE_GROUP {
return m.GroupReceivedCache.Remove(ctx, s.GetGrantee().GetGroupId().GetOpaqueId(), s.Id.OpaqueId)
}
return nil
})

return eg.Wait()
}
25 changes: 25 additions & 0 deletions pkg/share/manager/jsoncs3/providercache/providercache.go
Original file line number Diff line number Diff line change
Expand Up @@ -418,6 +418,31 @@ func (c *Cache) Persist(ctx context.Context, storageID, spaceID string) error {
return nil
}

// PurgeSpace removes a space from the cache
func (c *Cache) PurgeSpace(ctx context.Context, storageID, spaceID string) error {
ctx, span := tracer.Start(ctx, "PurgeSpace")
defer span.End()

unlock := c.LockSpace(spaceID)
defer unlock()
span.AddEvent("got lock")

if !c.isSpaceCached(storageID, spaceID) {
err := c.syncWithLock(ctx, storageID, spaceID)
if err != nil {
return err
}
}

spaces, ok := c.Providers.Load(storageID)
if !ok {
return nil
}
spaces.Spaces.Store(spaceID, &Shares{})

return c.Persist(ctx, storageID, spaceID)
}

func (c *Cache) syncWithLock(ctx context.Context, storageID, spaceID string) error {
ctx, span := tracer.Start(ctx, "syncWithLock")
defer span.End()
Expand Down
10 changes: 10 additions & 0 deletions pkg/share/manager/jsoncs3/providercache/providercache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,5 +181,15 @@ var _ = Describe("Cache", func() {
Expect(c.Persist(ctx, storageID, spaceID)).ToNot(Succeed())
})
})

Describe("PurgeSpace", func() {
It("removes the entry", func() {
Expect(c.PurgeSpace(ctx, storageID, spaceID)).To(Succeed())

s, err := c.Get(ctx, storageID, spaceID, shareID, false)
Expect(err).ToNot(HaveOccurred())
Expect(s).To(BeNil())
})
})
})
})
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,74 @@ func (c *Cache) Get(ctx context.Context, userID, spaceID, shareID string) (*Stat
return rss.Spaces[spaceID].States[shareID], nil
}

// Remove removes an entry from the cache
func (c *Cache) Remove(ctx context.Context, userID, spaceID, shareID string) error {
ctx, span := appctx.GetTracerProvider(ctx).Tracer(tracerName).Start(ctx, "Grab lock")
unlock := c.lockUser(userID)
span.End()
span.SetAttributes(attribute.String("cs3.userid", userID))
defer unlock()

ctx, span = appctx.GetTracerProvider(ctx).Tracer(tracerName).Start(ctx, "Add")
defer span.End()
span.SetAttributes(attribute.String("cs3.userid", userID), attribute.String("cs3.spaceid", spaceID))

persistFunc := func() error {
c.initializeIfNeeded(userID, spaceID)

rss, _ := c.ReceivedSpaces.Load(userID)
receivedSpace := rss.Spaces[spaceID]
if receivedSpace.States == nil {
receivedSpace.States = map[string]*State{}
}
delete(receivedSpace.States, shareID)
if len(receivedSpace.States) == 0 {
delete(rss.Spaces, spaceID)
}

return c.persist(ctx, userID)
}

log := appctx.GetLogger(ctx).With().
Str("hostname", os.Getenv("HOSTNAME")).
Str("userID", userID).
Str("spaceID", spaceID).Logger()

var err error
for retries := 100; retries > 0; retries-- {
err = persistFunc()
switch err.(type) {
case nil:
span.SetStatus(codes.Ok, "")
return nil
case errtypes.Aborted:
log.Debug().Msg("aborted when persisting added received share: etag changed. retrying...")
// this is the expected status code from the server when the if-match etag check fails
// continue with sync below
case errtypes.PreconditionFailed:
log.Debug().Msg("precondition failed when persisting added received share: etag changed. retrying...")
// actually, this is the wrong status code and we treat it like errtypes.Aborted because of inconsistencies on the server side
// continue with sync below
case errtypes.AlreadyExists:
log.Debug().Msg("already exists when persisting added received share. retrying...")
// CS3 uses an already exists error instead of precondition failed when using an If-None-Match=* header / IfExists flag in the InitiateFileUpload call.
// Thas happens when the cache thinks there is no file.
// continue with sync below
default:
span.SetStatus(codes.Error, fmt.Sprintf("persisting added received share failed. giving up: %s", err.Error()))
log.Error().Err(err).Msg("persisting added received share failed")
return err
}
if err := c.syncWithLock(ctx, userID); err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
log.Error().Err(err).Msg("persisting added received share failed. giving up.")
return err
}
}
return err
}

// List returns a list of received shares for a given user
// The return list is guaranteed to be thread-safe
func (c *Cache) List(ctx context.Context, userID string) (map[string]*Space, error) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,5 +134,26 @@ var _ = Describe("Cache", func() {
Expect(s).ToNot(BeNil())
})
})

Describe("Remove", func() {
It("removes the entry", func() {
err := c.Remove(ctx, userID, spaceID, shareID)
Expect(err).ToNot(HaveOccurred())

s, err := c.Get(ctx, userID, spaceID, shareID)
Expect(err).ToNot(HaveOccurred())
Expect(s).To(BeNil())
})

It("persists the removal", func() {
err := c.Remove(ctx, userID, spaceID, shareID)
Expect(err).ToNot(HaveOccurred())

c = receivedsharecache.New(storage, 0*time.Second)
s, err := c.Get(ctx, userID, spaceID, shareID)
Expect(err).ToNot(HaveOccurred())
Expect(s).To(BeNil())
})
})
})
})
Loading