diff --git a/sdks/go/pkg/beam/io/fileio/file.go b/sdks/go/pkg/beam/io/fileio/file.go index 4ae7b3d3d074..fc7f64ff4f7d 100644 --- a/sdks/go/pkg/beam/io/fileio/file.go +++ b/sdks/go/pkg/beam/io/fileio/file.go @@ -21,6 +21,7 @@ import ( "io" "path/filepath" "reflect" + "time" "github.com/apache/beam/sdks/v2/go/pkg/beam" "github.com/apache/beam/sdks/v2/go/pkg/beam/io/filesystem" @@ -32,10 +33,12 @@ func init() { beam.RegisterType(reflect.TypeOf((*ReadableFile)(nil)).Elem()) } -// FileMetadata contains metadata about a file, namely its path and size in bytes. +// FileMetadata contains metadata about a file, namely its path, size in bytes and last modified +// time. type FileMetadata struct { - Path string - Size int64 + Path string + Size int64 + LastModified time.Time } // compressionType is the type of compression used to compress a file. diff --git a/sdks/go/pkg/beam/io/fileio/helper_test.go b/sdks/go/pkg/beam/io/fileio/helper_test.go index e8b61ef067fd..08029759593d 100644 --- a/sdks/go/pkg/beam/io/fileio/helper_test.go +++ b/sdks/go/pkg/beam/io/fileio/helper_test.go @@ -22,6 +22,7 @@ import ( "os" "path/filepath" "testing" + "time" ) // openFile opens a file for reading. @@ -86,3 +87,14 @@ func writeGzip(t *testing.T, path string, data []byte) { t.Fatal(err) } } + +func modTime(t *testing.T, path string) time.Time { + t.Helper() + + info, err := os.Stat(path) + if err != nil { + t.Fatal(err) + } + + return info.ModTime() +} diff --git a/sdks/go/pkg/beam/io/fileio/match.go b/sdks/go/pkg/beam/io/fileio/match.go index aeee887f7a1b..dbd4b2150257 100644 --- a/sdks/go/pkg/beam/io/fileio/match.go +++ b/sdks/go/pkg/beam/io/fileio/match.go @@ -27,6 +27,7 @@ import ( "github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/window" "github.com/apache/beam/sdks/v2/go/pkg/beam/core/state" "github.com/apache/beam/sdks/v2/go/pkg/beam/io/filesystem" + "github.com/apache/beam/sdks/v2/go/pkg/beam/log" "github.com/apache/beam/sdks/v2/go/pkg/beam/register" "github.com/apache/beam/sdks/v2/go/pkg/beam/transforms/periodic" ) @@ -37,6 +38,9 @@ func init() { register.DoFn4x1[state.Provider, string, FileMetadata, func(FileMetadata), error]( &dedupFn{}, ) + register.DoFn4x1[state.Provider, string, FileMetadata, func(FileMetadata), error]( + &dedupUnmodifiedFn{}, + ) register.Emitter1[FileMetadata]() register.Emitter1[string]() register.Function1x2[FileMetadata, string, FileMetadata](keyByPath) @@ -190,21 +194,45 @@ func metadataFromFiles( return nil, err } + mTime, err := lastModified(ctx, fs, path) + if err != nil { + return nil, err + } + metadata[i] = FileMetadata{ - Path: path, - Size: size, + Path: path, + Size: size, + LastModified: mTime, } } return metadata, nil } +func lastModified(ctx context.Context, fs filesystem.Interface, path string) (time.Time, error) { + lmGetter, ok := fs.(filesystem.LastModifiedGetter) + if !ok { + log.Warnf(ctx, "Filesystem %T does not implement filesystem.LastModifiedGetter", fs) + return time.Time{}, nil + } + + mTime, err := lmGetter.LastModified(ctx, path) + if err != nil { + return time.Time{}, fmt.Errorf("error getting last modified time for %q: %v", path, err) + } + + return mTime, nil +} + // duplicateTreatment controls how duplicate matches are treated. type duplicateTreatment int const ( // duplicateAllow allows duplicate matches. duplicateAllow duplicateTreatment = iota + // duplicateAllowIfModified allows duplicate matches only if the file has been modified since it + // was last observed. + duplicateAllowIfModified // duplicateSkip skips duplicate matches. duplicateSkip ) @@ -241,6 +269,14 @@ func MatchDuplicateAllow() MatchContOptionFn { } } +// MatchDuplicateAllowIfModified specifies that file path matches will be deduplicated unless the +// file has been modified since it was last observed. +func MatchDuplicateAllowIfModified() MatchContOptionFn { + return func(o *matchContOption) { + o.DuplicateTreatment = duplicateAllowIfModified + } +} + // MatchDuplicateSkip specifies that file path matches will be deduplicated. func MatchDuplicateSkip() MatchContOptionFn { return func(o *matchContOption) { @@ -262,6 +298,8 @@ func MatchApplyWindow() MatchContOptionFn { // - Start: start time for matching files. Defaults to the current timestamp // - End: end time for matching files. Defaults to the maximum timestamp // - DuplicateAllow: allow emitting matches that have already been observed. Defaults to false +// - DuplicateAllowIfModified: allow emitting matches that have already been observed if the file +// has been modified since the last observation. Defaults to false // - DuplicateSkip: skip emitting matches that have already been observed. Defaults to true // - ApplyWindow: assign each element to an individual window with a fixed size equivalent to the // interval. Defaults to false, i.e. all elements will reside in the global window @@ -290,14 +328,7 @@ func MatchContinuously( globs := beam.ParDo(s, &matchContFn{Glob: glob}, imp) matches := MatchAll(s, globs, MatchEmptyAllow()) - var out beam.PCollection - - if option.DuplicateTreatment == duplicateAllow { - out = matches - } else { - keyed := beam.ParDo(s, keyByPath, matches) - out = beam.ParDo(s, &dedupFn{}, keyed) - } + out := dedupIfRequired(s, matches, option.DuplicateTreatment) if option.ApplyWindow { return beam.WindowInto(s, window.NewFixedWindows(interval), out) @@ -305,6 +336,24 @@ func MatchContinuously( return out } +func dedupIfRequired( + s beam.Scope, + col beam.PCollection, + treatment duplicateTreatment, +) beam.PCollection { + if treatment == duplicateAllow { + return col + } + + keyed := beam.ParDo(s, keyByPath, col) + + if treatment == duplicateAllowIfModified { + return beam.ParDo(s, &dedupUnmodifiedFn{}, keyed) + } + + return beam.ParDo(s, &dedupFn{}, keyed) +} + type matchContFn struct { Glob string } @@ -341,3 +390,30 @@ func (fn *dedupFn) ProcessElement( return nil } + +type dedupUnmodifiedFn struct { + State state.Value[int64] +} + +func (fn *dedupUnmodifiedFn) ProcessElement( + sp state.Provider, + _ string, + md FileMetadata, + emit func(FileMetadata), +) error { + prevMTime, ok, err := fn.State.Read(sp) + if err != nil { + return fmt.Errorf("error reading state: %v", err) + } + + mTime := md.LastModified.UnixMilli() + + if !ok || mTime > prevMTime { + emit(md) + if err := fn.State.Write(sp, mTime); err != nil { + return fmt.Errorf("error writing state: %v", err) + } + } + + return nil +} diff --git a/sdks/go/pkg/beam/io/fileio/match_test.go b/sdks/go/pkg/beam/io/fileio/match_test.go index 57b2d8cfe1c5..5bc849e5057e 100644 --- a/sdks/go/pkg/beam/io/fileio/match_test.go +++ b/sdks/go/pkg/beam/io/fileio/match_test.go @@ -55,6 +55,9 @@ func TestMatchFiles(t *testing.T) { write(t, filepath.Join(testDir, tf.filename), tf.data) } + fp1 := filepath.Join(testDir, "file1.txt") + fp2 := filepath.Join(testDir, "file2.txt") + tests := []struct { name string glob string @@ -66,12 +69,14 @@ func TestMatchFiles(t *testing.T) { glob: filepath.Join(dir, "*", "file*.txt"), want: []any{ FileMetadata{ - Path: filepath.Join(testDir, "file1.txt"), - Size: 5, + Path: fp1, + Size: 5, + LastModified: modTime(t, fp1), }, FileMetadata{ - Path: filepath.Join(testDir, "file2.txt"), - Size: 0, + Path: fp2, + Size: 0, + LastModified: modTime(t, fp2), }, }, }, @@ -104,6 +109,10 @@ func TestMatchAll(t *testing.T) { write(t, filepath.Join(testDir, tf.filename), tf.data) } + fp1 := filepath.Join(testDir, "file1.txt") + fp2 := filepath.Join(testDir, "file2.txt") + fp3 := filepath.Join(testDir, "file3.csv") + tests := []struct { name string opts []MatchOptionFn @@ -119,16 +128,19 @@ func TestMatchAll(t *testing.T) { }, want: []any{ FileMetadata{ - Path: filepath.Join(testDir, "file1.txt"), - Size: 5, + Path: fp1, + Size: 5, + LastModified: modTime(t, fp1), }, FileMetadata{ - Path: filepath.Join(testDir, "file2.txt"), - Size: 0, + Path: fp2, + Size: 0, + LastModified: modTime(t, fp2), }, FileMetadata{ - Path: filepath.Join(testDir, "file3.csv"), - Size: 5, + Path: fp3, + Size: 5, + LastModified: modTime(t, fp3), }, }, }, @@ -238,6 +250,10 @@ func Test_metadataFromFiles(t *testing.T) { files[i] = file } + fp1 := filepath.Join(dir, "file1.txt") + fp2 := filepath.Join(dir, "file2.txt") + fp3 := filepath.Join(dir, "file3.csv") + tests := []struct { name string files []string @@ -248,16 +264,19 @@ func Test_metadataFromFiles(t *testing.T) { files: files, want: []FileMetadata{ { - Path: filepath.Join(dir, "file1.txt"), - Size: 5, + Path: fp1, + Size: 5, + LastModified: modTime(t, fp1), }, { - Path: filepath.Join(dir, "file2.txt"), - Size: 0, + Path: fp2, + Size: 0, + LastModified: modTime(t, fp2), }, { - Path: filepath.Join(dir, "file3.csv"), - Size: 5, + Path: fp3, + Size: 5, + LastModified: modTime(t, fp3), }, }, },