Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add skip header lines in TextIO #28502

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 31 additions & 0 deletions sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,7 @@
"nullness" // TODO(https://github.com/apache/beam/issues/20497)
})
public class TextIO {

private static final long DEFAULT_BUNDLE_SIZE_BYTES = 64 * 1024 * 1024L;

/**
Expand All @@ -192,6 +193,7 @@ public static Read read() {
.setCompression(Compression.AUTO)
.setHintMatchesManyFiles(false)
.setMatchConfiguration(MatchConfiguration.create(EmptyMatchTreatment.DISALLOW))
.setSkipHeaderLines(0)
.build();
}

Expand Down Expand Up @@ -283,13 +285,16 @@ public abstract static class Read extends PTransform<PBegin, PCollection<String>

abstract Compression getCompression();

abstract int getSkipHeaderLines();

@SuppressWarnings("mutable") // this returns an array that can be mutated by the caller
abstract byte @Nullable [] getDelimiter();

abstract Builder toBuilder();

@AutoValue.Builder
abstract static class Builder {

abstract Builder setFilepattern(ValueProvider<String> filepattern);

abstract Builder setMatchConfiguration(MatchConfiguration matchConfiguration);
Expand All @@ -298,6 +303,8 @@ abstract static class Builder {

abstract Builder setCompression(Compression compression);

abstract Builder setSkipHeaderLines(int skipHeaderLines);

abstract Builder setDelimiter(byte @Nullable [] delimiter);

abstract Read build();
Expand Down Expand Up @@ -389,6 +396,17 @@ public Read withEmptyMatchTreatment(EmptyMatchTreatment treatment) {
return withMatchConfiguration(getMatchConfiguration().withEmptyMatchTreatment(treatment));
}

/**
* Sets the number of lines to skip from the beginning of the file.
*
* <p>This disables split file reading and may cause performance degradation.
*/
public Read withSkipHeaderLines(int skipHeaderLines) {
checkArgument(
skipHeaderLines > 0, "skipHeaderLines should be > 0, but was %s", skipHeaderLines);
return toBuilder().setSkipHeaderLines(skipHeaderLines).build();
}

/** Set the custom delimiter to be used in place of the default ones ('\r', '\n' or '\r\n'). */
public Read withDelimiter(byte[] delimiter) {
checkArgument(delimiter != null, "delimiter can not be null");
Expand Down Expand Up @@ -431,6 +449,7 @@ protected FileBasedSource<String> getSource() {
new TextSource(
getFilepattern(),
getMatchConfiguration().getEmptyMatchTreatment(),
getSkipHeaderLines(),
getDelimiter()))
.withCompression(getCompression());
}
Expand All @@ -444,6 +463,10 @@ public void populateDisplayData(DisplayData.Builder builder) {
.withLabel("Compression Type"))
.addIfNotNull(DisplayData.item("filePattern", getFilepattern()).withLabel("File Pattern"))
.include("matchConfiguration", getMatchConfiguration())
.addIfNotDefault(
DisplayData.item("skipHeaderLines", getSkipHeaderLines())
.withLabel("Skip Header Lines"),
0)
.addIfNotNull(
DisplayData.item("delimiter", Arrays.toString(getDelimiter()))
.withLabel("Custom delimiter to split records"));
Expand All @@ -461,6 +484,7 @@ public void populateDisplayData(DisplayData.Builder builder) {
@AutoValue
public abstract static class ReadAll
extends PTransform<PCollection<String>, PCollection<String>> {

abstract MatchConfiguration getMatchConfiguration();

abstract Compression getCompression();
Expand All @@ -472,6 +496,7 @@ public abstract static class ReadAll

@AutoValue.Builder
abstract static class Builder {

abstract Builder setMatchConfiguration(MatchConfiguration matchConfiguration);

abstract Builder setCompression(Compression compression);
Expand Down Expand Up @@ -555,6 +580,7 @@ public void populateDisplayData(DisplayData.Builder builder) {
@AutoValue
public abstract static class ReadFiles
extends PTransform<PCollection<FileIO.ReadableFile>, PCollection<String>> {

abstract long getDesiredBundleSizeBytes();

@SuppressWarnings("mutable") // this returns an array that can be mutated by the caller
Expand All @@ -564,6 +590,7 @@ public abstract static class ReadFiles

@AutoValue.Builder
abstract static class Builder {

abstract Builder setDesiredBundleSizeBytes(long desiredBundleSizeBytes);

abstract Builder setDelimiter(byte @Nullable [] delimiter);
Expand Down Expand Up @@ -601,6 +628,7 @@ public void populateDisplayData(DisplayData.Builder builder) {

private static class CreateTextSourceFn
implements SerializableFunction<String, FileBasedSource<String>> {

private byte[] delimiter;

private CreateTextSourceFn(byte[] delimiter) {
Expand Down Expand Up @@ -681,6 +709,7 @@ public abstract static class TypedWrite<UserT, DestinationT>

@AutoValue.Builder
abstract static class Builder<UserT, DestinationT> {

abstract Builder<UserT, DestinationT> setFilenamePrefix(
@Nullable ValueProvider<ResourceId> filenamePrefix);

Expand Down Expand Up @@ -1085,6 +1114,7 @@ public void populateDisplayData(DisplayData.Builder builder) {
* This class exists for backwards compatibility, and will be removed in Beam 3.0.
*/
public static class Write extends PTransform<PCollection<String>, PDone> {

@VisibleForTesting TypedWrite<String, ?> inner;

Write() {
Expand Down Expand Up @@ -1308,6 +1338,7 @@ public abstract static class Sink implements FileIO.Sink<String> {

@AutoValue.Builder
abstract static class Builder {

abstract Builder setHeader(String header);

abstract Builder setFooter(String footer);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@
"nullness" // TODO(https://github.com/apache/beam/issues/20497)
})
public class TextSource extends FileBasedSource<String> {

int skipHeaderLines;
byte[] delimiter;

public TextSource(
Expand All @@ -62,27 +64,48 @@ public TextSource(
this.delimiter = delimiter;
}

public TextSource(MatchResult.Metadata metadata, long start, long end, byte[] delimiter) {
public TextSource(
ValueProvider<String> fileSpec,
EmptyMatchTreatment emptyMatchTreatment,
int skipHeaderLines,
byte[] delimiter) {
super(fileSpec, emptyMatchTreatment, 1L);
this.skipHeaderLines = skipHeaderLines;
this.delimiter = delimiter;
}

public TextSource(
MatchResult.Metadata metadata, long start, long end, int skipHeaderLines, byte[] delimiter) {
super(metadata, 1L, start, end);
this.skipHeaderLines = skipHeaderLines;
this.delimiter = delimiter;
}

@Override
protected FileBasedSource<String> createForSubrangeOfFile(
MatchResult.Metadata metadata, long start, long end) {
return new TextSource(metadata, start, end, delimiter);
return new TextSource(metadata, start, end, skipHeaderLines, delimiter);
}

@Override
protected FileBasedReader<String> createSingleFileReader(PipelineOptions options) {
return new TextBasedReader(this, delimiter);
return new TextBasedReader(this, skipHeaderLines, delimiter);
}

@Override
public Coder<String> getOutputCoder() {
return StringUtf8Coder.of();
}

@Override
public boolean isSplittable() throws Exception {
if (skipHeaderLines > 0) {
return false;
}

return super.isSplittable();
}

/**
* A {@link FileBasedReader FileBasedReader} which can decode records delimited by delimiter
* characters.
Expand All @@ -91,12 +114,14 @@ public Coder<String> getOutputCoder() {
*/
@VisibleForTesting
static class TextBasedReader extends FileBasedReader<String> {

private static final int READ_BUFFER_SIZE = 8192;
private static final ByteString UTF8_BOM =
ByteString.copyFrom(new byte[] {(byte) 0xEF, (byte) 0xBB, (byte) 0xBF});
private static final byte CR = '\r';
private static final byte LF = '\n';

private final int skipHeaderLines;
private final byte @Nullable [] delimiter;
private final ByteArrayOutputStream str;
private final byte[] buffer;
Expand All @@ -111,11 +136,12 @@ static class TextBasedReader extends FileBasedReader<String> {
private int bufferPosn = 0; // the current position in the buffer
private boolean skipLineFeedAtStart; // skip an LF if at the start of the next buffer

private TextBasedReader(TextSource source, byte[] delimiter) {
private TextBasedReader(TextSource source, int skipHeaderLines, byte[] delimiter) {
super(source);
this.buffer = new byte[READ_BUFFER_SIZE];
this.str = new ByteArrayOutputStream();
this.byteBuffer = ByteBuffer.wrap(buffer);
this.skipHeaderLines = skipHeaderLines;
this.delimiter = delimiter;
}

Expand Down Expand Up @@ -185,6 +211,10 @@ protected void startReading(ReadableByteChannel channel) throws IOException {
if (fileStartsWithBom()) {
startOfNextRecord = bufferPosn = UTF8_BOM.size();
}

for (int i = 0; i < skipHeaderLines; i++) {
readNextRecord();
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@
/** Tests for {@link TextIO.Read}. */
@RunWith(Enclosed.class)
public class TextIOReadTest {

private static final int LINES_NUMBER_FOR_LARGE = 1000;
private static final List<String> EMPTY = Collections.emptyList();
private static final List<String> TINY =
Expand Down Expand Up @@ -276,6 +277,7 @@ private static String getFileSuffix(Compression compression) {
/** Tests for reading from different size of files with various Compression. */
@RunWith(Parameterized.class)
public static class CompressedReadTest {

@Rule public TemporaryFolder tempFolder = new TemporaryFolder();
@Rule public TestPipeline p = TestPipeline.create();

Expand Down Expand Up @@ -345,9 +347,56 @@ public void testReadWithAuto() throws Exception {
}
}

@RunWith(Parameterized.class)
public static class ReadWithSkipHeaderLinesTest {

@Rule public TemporaryFolder tempFolder = new TemporaryFolder();
@Rule public TestPipeline p = TestPipeline.create();

@Parameterized.Parameters(name = "{index}: {1}")
public static Iterable<Object[]> data() {
return ImmutableList.<Object[]>builder()
.add(new Object[] {TINY, 1, TINY.subList(1, TINY.size())})
.add(new Object[] {TINY, 2, TINY.subList(2, TINY.size())})
.add(new Object[] {TINY, 3, TINY.subList(3, TINY.size())})
.build();
}

@Parameterized.Parameter(0)
public List<String> lines;

@Parameterized.Parameter(1)
public int skipHeaderLines;

@Parameterized.Parameter(2)
public List<String> expected;

@Test
@Category(NeedsRunner.class)
public void testReadWithSkipHeaderLines() throws Exception {
File tmpFile = tempFolder.newFile();
String filename = tmpFile.getPath();

try (PrintStream writer = new PrintStream(new FileOutputStream(tmpFile))) {
for (String elem : lines) {
byte[] encodedElem = CoderUtils.encodeToByteArray(StringUtf8Coder.of(), elem);
String line = new String(encodedElem, Charsets.UTF_8);
writer.println(line);
}
}

TextIO.Read read = TextIO.read().from(filename).withSkipHeaderLines(skipHeaderLines);
PCollection<String> output = p.apply(read);

PAssert.that(output).containsInAnyOrder(expected);
p.run();
}
}

/** Tests for reading files with various delimiters. */
@RunWith(Parameterized.class)
public static class ReadWithDefaultDelimiterTest {

private static final ImmutableList<String> EXPECTED = ImmutableList.of("asdf", "hjkl", "xyz");
@Rule public TemporaryFolder tempFolder = new TemporaryFolder();

Expand Down Expand Up @@ -447,6 +496,7 @@ private void runTestReadWithData(byte[] data, List<String> expectedResults) thro
/** Tests for reading files with various delimiters. */
@RunWith(Parameterized.class)
public static class ReadWithCustomDelimiterTest {

@Rule public TemporaryFolder tempFolder = new TemporaryFolder();

@Parameterized.Parameters(name = "{index}: {0}")
Expand Down Expand Up @@ -537,6 +587,7 @@ public void close() throws IOException {
/** Tests for some basic operations in {@link TextIO.Read}. */
@RunWith(JUnit4.class)
public static class BasicIOTest {

@Rule public TemporaryFolder tempFolder = new TemporaryFolder();
@Rule public TestPipeline p = TestPipeline.create();

Expand Down Expand Up @@ -633,6 +684,7 @@ public void testReadDisplayData() {

/** Options for testing. */
public interface RuntimeTestOptions extends PipelineOptions {

ValueProvider<String> getInput();

void setInput(ValueProvider<String> value);
Expand Down Expand Up @@ -1035,6 +1087,7 @@ public void testReadWatchForNewFiles() throws IOException, InterruptedException
/** Tests for TextSource class. */
@RunWith(JUnit4.class)
public static class TextSourceTest {

@Rule public transient TestPipeline pipeline = TestPipeline.create();

@Test
Expand Down Expand Up @@ -1121,6 +1174,7 @@ public void processElement(ProcessContext c) {
/** A transform that reads CSV file records. */
private static class TextFileReadTransform
extends PTransform<PCollection<String>, PCollection<String>> {

public TextFileReadTransform() {}

@Override
Expand Down
Loading