Skip to content

Commit

Permalink
Enable fileio.MatchContinuously to emit duplicate file if modified (a…
Browse files Browse the repository at this point in the history
  • Loading branch information
johannaojeling authored and cushon committed May 24, 2024
1 parent b234a96 commit f9b0333
Show file tree
Hide file tree
Showing 4 changed files with 139 additions and 29 deletions.
9 changes: 6 additions & 3 deletions sdks/go/pkg/beam/io/fileio/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"io"
"path/filepath"
"reflect"
"time"

"github.com/apache/beam/sdks/v2/go/pkg/beam"
"github.com/apache/beam/sdks/v2/go/pkg/beam/io/filesystem"
Expand All @@ -32,10 +33,12 @@ func init() {
beam.RegisterType(reflect.TypeOf((*ReadableFile)(nil)).Elem())
}

// FileMetadata contains metadata about a file, namely its path and size in bytes.
// FileMetadata contains metadata about a file, namely its path, size in bytes and last modified
// time.
type FileMetadata struct {
Path string
Size int64
Path string
Size int64
LastModified time.Time
}

// compressionType is the type of compression used to compress a file.
Expand Down
12 changes: 12 additions & 0 deletions sdks/go/pkg/beam/io/fileio/helper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"os"
"path/filepath"
"testing"
"time"
)

// openFile opens a file for reading.
Expand Down Expand Up @@ -86,3 +87,14 @@ func writeGzip(t *testing.T, path string, data []byte) {
t.Fatal(err)
}
}

func modTime(t *testing.T, path string) time.Time {
t.Helper()

info, err := os.Stat(path)
if err != nil {
t.Fatal(err)
}

return info.ModTime()
}
96 changes: 86 additions & 10 deletions sdks/go/pkg/beam/io/fileio/match.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/window"
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/state"
"github.com/apache/beam/sdks/v2/go/pkg/beam/io/filesystem"
"github.com/apache/beam/sdks/v2/go/pkg/beam/log"
"github.com/apache/beam/sdks/v2/go/pkg/beam/register"
"github.com/apache/beam/sdks/v2/go/pkg/beam/transforms/periodic"
)
Expand All @@ -37,6 +38,9 @@ func init() {
register.DoFn4x1[state.Provider, string, FileMetadata, func(FileMetadata), error](
&dedupFn{},
)
register.DoFn4x1[state.Provider, string, FileMetadata, func(FileMetadata), error](
&dedupUnmodifiedFn{},
)
register.Emitter1[FileMetadata]()
register.Emitter1[string]()
register.Function1x2[FileMetadata, string, FileMetadata](keyByPath)
Expand Down Expand Up @@ -190,21 +194,45 @@ func metadataFromFiles(
return nil, err
}

mTime, err := lastModified(ctx, fs, path)
if err != nil {
return nil, err
}

metadata[i] = FileMetadata{
Path: path,
Size: size,
Path: path,
Size: size,
LastModified: mTime,
}
}

return metadata, nil
}

func lastModified(ctx context.Context, fs filesystem.Interface, path string) (time.Time, error) {
lmGetter, ok := fs.(filesystem.LastModifiedGetter)
if !ok {
log.Warnf(ctx, "Filesystem %T does not implement filesystem.LastModifiedGetter", fs)
return time.Time{}, nil
}

mTime, err := lmGetter.LastModified(ctx, path)
if err != nil {
return time.Time{}, fmt.Errorf("error getting last modified time for %q: %v", path, err)
}

return mTime, nil
}

// duplicateTreatment controls how duplicate matches are treated.
type duplicateTreatment int

const (
// duplicateAllow allows duplicate matches.
duplicateAllow duplicateTreatment = iota
// duplicateAllowIfModified allows duplicate matches only if the file has been modified since it
// was last observed.
duplicateAllowIfModified
// duplicateSkip skips duplicate matches.
duplicateSkip
)
Expand Down Expand Up @@ -241,6 +269,14 @@ func MatchDuplicateAllow() MatchContOptionFn {
}
}

// MatchDuplicateAllowIfModified specifies that file path matches will be deduplicated unless the
// file has been modified since it was last observed.
func MatchDuplicateAllowIfModified() MatchContOptionFn {
return func(o *matchContOption) {
o.DuplicateTreatment = duplicateAllowIfModified
}
}

// MatchDuplicateSkip specifies that file path matches will be deduplicated.
func MatchDuplicateSkip() MatchContOptionFn {
return func(o *matchContOption) {
Expand All @@ -262,6 +298,8 @@ func MatchApplyWindow() MatchContOptionFn {
// - Start: start time for matching files. Defaults to the current timestamp
// - End: end time for matching files. Defaults to the maximum timestamp
// - DuplicateAllow: allow emitting matches that have already been observed. Defaults to false
// - DuplicateAllowIfModified: allow emitting matches that have already been observed if the file
// has been modified since the last observation. Defaults to false
// - DuplicateSkip: skip emitting matches that have already been observed. Defaults to true
// - ApplyWindow: assign each element to an individual window with a fixed size equivalent to the
// interval. Defaults to false, i.e. all elements will reside in the global window
Expand Down Expand Up @@ -290,21 +328,32 @@ func MatchContinuously(
globs := beam.ParDo(s, &matchContFn{Glob: glob}, imp)
matches := MatchAll(s, globs, MatchEmptyAllow())

var out beam.PCollection

if option.DuplicateTreatment == duplicateAllow {
out = matches
} else {
keyed := beam.ParDo(s, keyByPath, matches)
out = beam.ParDo(s, &dedupFn{}, keyed)
}
out := dedupIfRequired(s, matches, option.DuplicateTreatment)

if option.ApplyWindow {
return beam.WindowInto(s, window.NewFixedWindows(interval), out)
}
return out
}

func dedupIfRequired(
s beam.Scope,
col beam.PCollection,
treatment duplicateTreatment,
) beam.PCollection {
if treatment == duplicateAllow {
return col
}

keyed := beam.ParDo(s, keyByPath, col)

if treatment == duplicateAllowIfModified {
return beam.ParDo(s, &dedupUnmodifiedFn{}, keyed)
}

return beam.ParDo(s, &dedupFn{}, keyed)
}

type matchContFn struct {
Glob string
}
Expand Down Expand Up @@ -341,3 +390,30 @@ func (fn *dedupFn) ProcessElement(

return nil
}

type dedupUnmodifiedFn struct {
State state.Value[int64]
}

func (fn *dedupUnmodifiedFn) ProcessElement(
sp state.Provider,
_ string,
md FileMetadata,
emit func(FileMetadata),
) error {
prevMTime, ok, err := fn.State.Read(sp)
if err != nil {
return fmt.Errorf("error reading state: %v", err)
}

mTime := md.LastModified.UnixMilli()

if !ok || mTime > prevMTime {
emit(md)
if err := fn.State.Write(sp, mTime); err != nil {
return fmt.Errorf("error writing state: %v", err)
}
}

return nil
}
51 changes: 35 additions & 16 deletions sdks/go/pkg/beam/io/fileio/match_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,9 @@ func TestMatchFiles(t *testing.T) {
write(t, filepath.Join(testDir, tf.filename), tf.data)
}

fp1 := filepath.Join(testDir, "file1.txt")
fp2 := filepath.Join(testDir, "file2.txt")

tests := []struct {
name string
glob string
Expand All @@ -66,12 +69,14 @@ func TestMatchFiles(t *testing.T) {
glob: filepath.Join(dir, "*", "file*.txt"),
want: []any{
FileMetadata{
Path: filepath.Join(testDir, "file1.txt"),
Size: 5,
Path: fp1,
Size: 5,
LastModified: modTime(t, fp1),
},
FileMetadata{
Path: filepath.Join(testDir, "file2.txt"),
Size: 0,
Path: fp2,
Size: 0,
LastModified: modTime(t, fp2),
},
},
},
Expand Down Expand Up @@ -104,6 +109,10 @@ func TestMatchAll(t *testing.T) {
write(t, filepath.Join(testDir, tf.filename), tf.data)
}

fp1 := filepath.Join(testDir, "file1.txt")
fp2 := filepath.Join(testDir, "file2.txt")
fp3 := filepath.Join(testDir, "file3.csv")

tests := []struct {
name string
opts []MatchOptionFn
Expand All @@ -119,16 +128,19 @@ func TestMatchAll(t *testing.T) {
},
want: []any{
FileMetadata{
Path: filepath.Join(testDir, "file1.txt"),
Size: 5,
Path: fp1,
Size: 5,
LastModified: modTime(t, fp1),
},
FileMetadata{
Path: filepath.Join(testDir, "file2.txt"),
Size: 0,
Path: fp2,
Size: 0,
LastModified: modTime(t, fp2),
},
FileMetadata{
Path: filepath.Join(testDir, "file3.csv"),
Size: 5,
Path: fp3,
Size: 5,
LastModified: modTime(t, fp3),
},
},
},
Expand Down Expand Up @@ -238,6 +250,10 @@ func Test_metadataFromFiles(t *testing.T) {
files[i] = file
}

fp1 := filepath.Join(dir, "file1.txt")
fp2 := filepath.Join(dir, "file2.txt")
fp3 := filepath.Join(dir, "file3.csv")

tests := []struct {
name string
files []string
Expand All @@ -248,16 +264,19 @@ func Test_metadataFromFiles(t *testing.T) {
files: files,
want: []FileMetadata{
{
Path: filepath.Join(dir, "file1.txt"),
Size: 5,
Path: fp1,
Size: 5,
LastModified: modTime(t, fp1),
},
{
Path: filepath.Join(dir, "file2.txt"),
Size: 0,
Path: fp2,
Size: 0,
LastModified: modTime(t, fp2),
},
{
Path: filepath.Join(dir, "file3.csv"),
Size: 5,
Path: fp3,
Size: 5,
LastModified: modTime(t, fp3),
},
},
},
Expand Down

0 comments on commit f9b0333

Please sign in to comment.