diff --git a/sdks/go/pkg/beam/io/textio/textio.go b/sdks/go/pkg/beam/io/textio/textio.go index a6f909aea1a6..ed8be0a42b25 100644 --- a/sdks/go/pkg/beam/io/textio/textio.go +++ b/sdks/go/pkg/beam/io/textio/textio.go @@ -34,7 +34,6 @@ import ( func init() { beam.RegisterType(reflect.TypeOf((*readFn)(nil)).Elem()) - beam.RegisterFunction(sizeFn) beam.RegisterType(reflect.TypeOf((*writeFileFn)(nil)).Elem()) beam.RegisterFunction(expandFn) } @@ -82,8 +81,7 @@ func ReadAllSdf(s beam.Scope, col beam.PCollection) beam.PCollection { // into separate bundles. func read(s beam.Scope, col beam.PCollection) beam.PCollection { files := beam.ParDo(s, expandFn, col) - sized := beam.ParDo(s, sizeFn, files) - return beam.ParDo(s, &readFn{}, sized) + return beam.ParDo(s, &readFn{}, files) } // expandFn expands a glob pattern into all matching file names. @@ -108,36 +106,29 @@ func expandFn(ctx context.Context, glob string, emit func(string)) error { return nil } -// sizeFn pairs a filename with the size of that file in bytes. -// TODO(https://github.com/apache/beam/issues/20607): Once CreateInitialRestriction supports Context params and -// error return values, this can be done in readSdfFn.CreateInitialRestriction. -func sizeFn(ctx context.Context, filename string) (string, int64, error) { +// readFn reads individual lines from a text file. Implemented as an SDF +// to allow splitting within a file. +type readFn struct { +} + +// CreateInitialRestriction creates an offset range restriction representing +// the file's size in bytes. +func (fn *readFn) CreateInitialRestriction(ctx context.Context, filename string) (offsetrange.Restriction, error) { fs, err := filesystem.New(ctx, filename) if err != nil { - return "", -1, err + return offsetrange.Restriction{}, err } defer fs.Close() size, err := fs.Size(ctx, filename) if err != nil { - return "", -1, err + return offsetrange.Restriction{}, err } - return filename, size, nil -} -// readFn reads individual lines from a text file, given a filename and a -// size in bytes for that file. Implemented as an SDF to allow splitting -// within a file. -type readFn struct { -} - -// CreateInitialRestriction creates an offset range restriction representing -// the file, using the paired size rather than fetching the file's size. -func (fn *readFn) CreateInitialRestriction(_ string, size int64) offsetrange.Restriction { return offsetrange.Restriction{ Start: 0, End: size, - } + }, nil } const ( @@ -150,7 +141,7 @@ const ( // SplitRestriction splits each file restriction into blocks of a predeterined // size, with some checks to avoid having small remainders. -func (fn *readFn) SplitRestriction(_ string, _ int64, rest offsetrange.Restriction) []offsetrange.Restriction { +func (fn *readFn) SplitRestriction(_ string, rest offsetrange.Restriction) []offsetrange.Restriction { splits := rest.SizedSplits(blockSize) numSplits := len(splits) if numSplits > 1 { @@ -165,7 +156,7 @@ func (fn *readFn) SplitRestriction(_ string, _ int64, rest offsetrange.Restricti } // Size returns the size of each restriction as its range. -func (fn *readFn) RestrictionSize(_ string, _ int64, rest offsetrange.Restriction) float64 { +func (fn *readFn) RestrictionSize(_ string, rest offsetrange.Restriction) float64 { return rest.Size() } @@ -183,7 +174,7 @@ func (fn *readFn) CreateTracker(rest offsetrange.Restriction) *sdf.LockRTracker // begin within the restriction and past the restriction (those are entirely // output, including the portion outside the restriction). In some cases a // valid restriction might not output any lines. -func (fn *readFn) ProcessElement(ctx context.Context, rt *sdf.LockRTracker, filename string, _ int64, emit func(string)) error { +func (fn *readFn) ProcessElement(ctx context.Context, rt *sdf.LockRTracker, filename string, emit func(string)) error { log.Infof(ctx, "Reading from %v", filename) fs, err := filesystem.New(ctx, filename)