Skip to content

Commit

Permalink
fix: update BaseStorageReadChannel to be left open unless explicitly …
Browse files Browse the repository at this point in the history
…closed (#1853)

Add two new tests to verify new expected close behavior surfaced from java-storage-nio.
  • Loading branch information
BenWhitehead authored Jan 13, 2023
1 parent 4491f73 commit 1425dd9
Show file tree
Hide file tree
Showing 2 changed files with 84 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,12 @@
import com.google.cloud.storage.BufferedReadableByteChannelSession.BufferedReadableByteChannel;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.ClosedChannelException;
import org.checkerframework.checker.nullness.qual.Nullable;

abstract class BaseStorageReadChannel<T> implements StorageReadChannel {

private boolean open;
private ByteRangeSpec byteRangeSpec;
private int chunkSize = _2MiB;
private BufferHandle bufferHandle;
Expand All @@ -34,6 +36,7 @@ abstract class BaseStorageReadChannel<T> implements StorageReadChannel {
@Nullable private T resolvedObject;

protected BaseStorageReadChannel() {
this.open = true;
this.byteRangeSpec = ByteRangeSpec.nullRange();
}

Expand All @@ -45,16 +48,12 @@ public final synchronized void setChunkSize(int chunkSize) {

@Override
public final synchronized boolean isOpen() {
if (lazyReadChannel == null) {
return true;
} else {
LazyReadChannel<T> tmp = internalGetLazyChannel();
return tmp.isOpen();
}
return open;
}

@Override
public final synchronized void close() {
open = false;
if (internalGetLazyChannel().isOpen()) {
StorageException.wrapIOException(internalGetLazyChannel().getChannel()::close);
}
Expand All @@ -75,17 +74,23 @@ public final ByteRangeSpec getByteRangeSpec() {

@Override
public final synchronized int read(ByteBuffer dst) throws IOException {
// BlobReadChannel only considered itself closed if close had been called on it.
if (!open) {
throw new ClosedChannelException();
}
long diff = byteRangeSpec.length();
if (diff <= 0) {
close();
return -1;
}
try {
int read = internalGetLazyChannel().getChannel().read(dst);
// trap if the fact that tmp is already closed, and instead return -1
BufferedReadableByteChannel tmp = internalGetLazyChannel().getChannel();
if (!tmp.isOpen()) {
return -1;
}
int read = tmp.read(dst);
if (read != -1) {
byteRangeSpec = byteRangeSpec.withShiftBeginOffset(read);
} else {
close();
}
return read;
} catch (StorageException e) {
Expand Down Expand Up @@ -128,15 +133,16 @@ protected void setResolvedObject(@Nullable T resolvedObject) {
protected abstract LazyReadChannel<T> newLazyReadChannel();

private void maybeResetChannel(boolean freeBuffer) throws IOException {
if (lazyReadChannel != null && lazyReadChannel.isOpen()) {
try (BufferedReadableByteChannel ignore = lazyReadChannel.getChannel()) {
if (bufferHandle != null && !freeBuffer) {
bufferHandle.get().clear();
} else if (freeBuffer) {
bufferHandle = null;
}
lazyReadChannel = null;
if (lazyReadChannel != null) {
if (lazyReadChannel.isOpen()) {
lazyReadChannel.getChannel().close();
}
if (bufferHandle != null && !freeBuffer) {
bufferHandle.get().clear();
} else if (freeBuffer) {
bufferHandle = null;
}
lazyReadChannel = null;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,13 @@

package com.google.cloud.storage.it;

import static com.google.cloud.storage.TestUtils.assertAll;
import static com.google.cloud.storage.TestUtils.xxd;
import static com.google.common.truth.Truth.assertThat;
import static java.nio.charset.StandardCharsets.UTF_8;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertThrows;
import static org.junit.Assert.fail;

import com.google.cloud.ReadChannel;
Expand Down Expand Up @@ -52,6 +54,7 @@
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.Channels;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.FileChannel;
import java.nio.channels.WritableByteChannel;
import java.nio.file.Files;
Expand Down Expand Up @@ -372,6 +375,40 @@ public void seekAfterReadWorks() throws IOException {
}
}

@Test
public void seekBackToStartAfterReachingEndOfObjectWorks() throws IOException {
ObjectAndContent obj512KiB = objectsFixture.getObj512KiB();
BlobInfo gen1 = obj512KiB.getInfo();
byte[] bytes = obj512KiB.getContent().getBytes();

int from = bytes.length - 5;
byte[] expected1 = Arrays.copyOfRange(bytes, from, bytes.length);

String xxdExpected1 = xxd(expected1);
String xxdExpected2 = xxd(bytes);
try (ReadChannel reader = storage.reader(gen1.getBlobId())) {
// seek forward to a new offset
reader.seek(from);

try (ByteArrayOutputStream baos = new ByteArrayOutputStream();
WritableByteChannel out = Channels.newChannel(baos)) {
ByteStreams.copy(reader, out);
String xxd = xxd(baos.toByteArray());
assertThat(xxd).isEqualTo(xxdExpected1);
}

// seek back to the beginning
reader.seek(0);
// read again
try (ByteArrayOutputStream baos = new ByteArrayOutputStream();
WritableByteChannel out = Channels.newChannel(baos)) {
ByteStreams.copy(reader, out);
String xxd = xxd(baos.toByteArray());
assertThat(xxd).isEqualTo(xxdExpected2);
}
}
}

@Test
public void limitAfterReadWorks() throws IOException {
ObjectAndContent obj512KiB = objectsFixture.getObj512KiB();
Expand Down Expand Up @@ -469,6 +506,29 @@ public void responseWith416ReturnsZeroAndLeavesTheChannelOpen() throws IOExcepti
}
}

/** Read channel does not consider itself closed once it returns {@code -1} from read. */
@Test
public void readChannelIsAlwaysOpen_willReturnNegative1UntilExplicitlyClosed() throws Exception {
int length = 10;
byte[] bytes = DataGenerator.base64Characters().genBytes(length);

BlobInfo info1 = BlobInfo.newBuilder(bucket, generator.randomObjectName()).build();
Blob gen1 = storage.create(info1, bytes, BlobTargetOption.doesNotExist());

try (ReadChannel reader = storage.reader(gen1.getBlobId())) {
ByteBuffer buf = ByteBuffer.allocate(length * 2);
int read = reader.read(buf);
assertAll(
() -> assertThat(read).isEqualTo(length), () -> assertThat(reader.isOpen()).isTrue());
int read2 = reader.read(buf);
assertAll(() -> assertThat(read2).isEqualTo(-1), () -> assertThat(reader.isOpen()).isTrue());
int read3 = reader.read(buf);
assertAll(() -> assertThat(read3).isEqualTo(-1), () -> assertThat(reader.isOpen()).isTrue());
reader.close();
assertThrows(ClosedChannelException.class, () -> reader.read(buf));
}
}

private void captureAndRestoreTest(@Nullable Integer position, @Nullable Integer endOffset)
throws IOException {
ObjectAndContent obj512KiB = objectsFixture.getObj512KiB();
Expand Down

0 comments on commit 1425dd9

Please sign in to comment.