Skip to content

Commit

Permalink
[Go SDK]: Retrieve file size in CreateInitialRestriction in textio.Re…
Browse files Browse the repository at this point in the history
…ad (#25535)

* Retrieve file size in CreateInitialRestriction in textio.Read

* Update readFn doc comment
  • Loading branch information
johannaojeling authored Feb 19, 2023
1 parent c160a08 commit d652d05
Showing 1 changed file with 15 additions and 24 deletions.
39 changes: 15 additions & 24 deletions sdks/go/pkg/beam/io/textio/textio.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ import (

func init() {
beam.RegisterType(reflect.TypeOf((*readFn)(nil)).Elem())
beam.RegisterFunction(sizeFn)
beam.RegisterType(reflect.TypeOf((*writeFileFn)(nil)).Elem())
beam.RegisterFunction(expandFn)
}
Expand Down Expand Up @@ -82,8 +81,7 @@ func ReadAllSdf(s beam.Scope, col beam.PCollection) beam.PCollection {
// into separate bundles.
func read(s beam.Scope, col beam.PCollection) beam.PCollection {
files := beam.ParDo(s, expandFn, col)
sized := beam.ParDo(s, sizeFn, files)
return beam.ParDo(s, &readFn{}, sized)
return beam.ParDo(s, &readFn{}, files)
}

// expandFn expands a glob pattern into all matching file names.
Expand All @@ -108,36 +106,29 @@ func expandFn(ctx context.Context, glob string, emit func(string)) error {
return nil
}

// sizeFn pairs a filename with the size of that file in bytes.
// TODO(https://github.com/apache/beam/issues/20607): Once CreateInitialRestriction supports Context params and
// error return values, this can be done in readSdfFn.CreateInitialRestriction.
func sizeFn(ctx context.Context, filename string) (string, int64, error) {
// readFn reads individual lines from a text file. Implemented as an SDF
// to allow splitting within a file.
type readFn struct {
}

// CreateInitialRestriction creates an offset range restriction representing
// the file's size in bytes.
func (fn *readFn) CreateInitialRestriction(ctx context.Context, filename string) (offsetrange.Restriction, error) {
fs, err := filesystem.New(ctx, filename)
if err != nil {
return "", -1, err
return offsetrange.Restriction{}, err
}
defer fs.Close()

size, err := fs.Size(ctx, filename)
if err != nil {
return "", -1, err
return offsetrange.Restriction{}, err
}
return filename, size, nil
}

// readFn reads individual lines from a text file, given a filename and a
// size in bytes for that file. Implemented as an SDF to allow splitting
// within a file.
type readFn struct {
}

// CreateInitialRestriction creates an offset range restriction representing
// the file, using the paired size rather than fetching the file's size.
func (fn *readFn) CreateInitialRestriction(_ string, size int64) offsetrange.Restriction {
return offsetrange.Restriction{
Start: 0,
End: size,
}
}, nil
}

const (
Expand All @@ -150,7 +141,7 @@ const (

// SplitRestriction splits each file restriction into blocks of a predeterined
// size, with some checks to avoid having small remainders.
func (fn *readFn) SplitRestriction(_ string, _ int64, rest offsetrange.Restriction) []offsetrange.Restriction {
func (fn *readFn) SplitRestriction(_ string, rest offsetrange.Restriction) []offsetrange.Restriction {
splits := rest.SizedSplits(blockSize)
numSplits := len(splits)
if numSplits > 1 {
Expand All @@ -165,7 +156,7 @@ func (fn *readFn) SplitRestriction(_ string, _ int64, rest offsetrange.Restricti
}

// Size returns the size of each restriction as its range.
func (fn *readFn) RestrictionSize(_ string, _ int64, rest offsetrange.Restriction) float64 {
func (fn *readFn) RestrictionSize(_ string, rest offsetrange.Restriction) float64 {
return rest.Size()
}

Expand All @@ -183,7 +174,7 @@ func (fn *readFn) CreateTracker(rest offsetrange.Restriction) *sdf.LockRTracker
// begin within the restriction and past the restriction (those are entirely
// output, including the portion outside the restriction). In some cases a
// valid restriction might not output any lines.
func (fn *readFn) ProcessElement(ctx context.Context, rt *sdf.LockRTracker, filename string, _ int64, emit func(string)) error {
func (fn *readFn) ProcessElement(ctx context.Context, rt *sdf.LockRTracker, filename string, emit func(string)) error {
log.Infof(ctx, "Reading from %v", filename)

fs, err := filesystem.New(ctx, filename)
Expand Down

0 comments on commit d652d05

Please sign in to comment.