Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BACKPORT 1.1] Fixes to #2231 and #2232 #2252

Merged
merged 2 commits into from
Jun 22, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 17 additions & 4 deletions s3/src/main/scala/geotrellis/spark/io/s3/S3GeoTiffRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import geotrellis.raster.io.geotiff.tags.TiffTags
import geotrellis.spark._
import geotrellis.spark.io.RasterReader
import geotrellis.spark.io.s3.util.S3RangeReader
import geotrellis.util.StreamingByteReader
import geotrellis.util.{StreamingByteReader, LazyLogging}
import geotrellis.vector._

import org.apache.hadoop.conf.Configuration
Expand All @@ -36,7 +36,7 @@ import java.nio.ByteBuffer
/**
* The S3GeoTiffRDD object allows for the creation of whole or windowed RDD[(K, V)]s from files on S3.
*/
object S3GeoTiffRDD {
object S3GeoTiffRDD extends LazyLogging {
final val GEOTIFF_TIME_TAG_DEFAULT = "TIFFTAG_DATETIME"
final val GEOTIFF_TIME_FORMAT_DEFAULT = "yyyy:MM:dd HH:mm:ss"

Expand All @@ -51,7 +51,8 @@ object S3GeoTiffRDD {
* May result in a one input GeoTiff being split amongst multiple records if it exceeds this size.
* If no maximum tile size is specific, then each file file is read fully.
* @param numPartitions How many partitions Spark should create when it repartitions the data.
* @param partitionBytes Desired partition size in bytes, at least one item per partition will be assigned
* @param partitionBytes Desired partition size in bytes, at least one item per partition will be assigned.
This option is incompatible with the maxTileSize option.
* @param chunkSize How many bytes should be read in at a time.
* @param delimiter Delimiter to use for S3 objet listings. See
* @param getS3Client A function to instantiate an S3Client. Must be serializable.
Expand Down Expand Up @@ -163,7 +164,19 @@ object S3GeoTiffRDD {
val repartitioned =
options.numPartitions match {
case Some(p) => windows.repartition(p)
case None => windows
case None =>
options.partitionBytes match {
case Some(byteCount) =>
// Because we do not have cell type information, we cannot
// perform the necessary estimates for the partition bytes.
logger.warn(
s"${classOf[Options].getName}.partitionBytes set with maxTileSize, " +
"cannot perform partitioning based on byte count. Option ignored. " +
"Use numPartitions instead.")
windows
case None =>
windows
}
}

repartitioned.map { case (objectRequest: GetObjectRequest, pixelWindow: GridBounds) =>
Expand Down
29 changes: 18 additions & 11 deletions s3/src/main/scala/geotrellis/spark/io/s3/S3InputFormat.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,9 @@ import geotrellis.proj4.CRS
import geotrellis.spark.io.hadoop._
import geotrellis.util.LazyLogging

import com.amazonaws.services.s3.model.{ListObjectsRequest, ObjectListing}
import com.amazonaws.auth._
import com.amazonaws.regions._
import com.amazonaws.services.s3.model.{ListObjectsRequest, ObjectListing, S3ObjectSummary}
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.mapreduce.{InputFormat, Job, JobContext}

Expand Down Expand Up @@ -103,6 +103,21 @@ abstract class S3InputFormat[K, V] extends InputFormat[K,V] with LazyLogging {

var splits: Vector[S3InputSplit] = Vector(makeNewSplit)

val s3ObjectFilter: S3ObjectSummary => Boolean =
{ obj =>
val key = obj.getKey

val isDir = key.endsWith("/")
val isTiff =
if (extensions.isEmpty)
true
else {
extensions.map(key.endsWith).reduce(_ || _)
}

!isDir && isTiff
}

if (null == partitionCountConf) {
// By default attempt to make partitions the same size
val maxSplitBytes = if (null == partitionSizeConf) S3InputFormat.DEFAULT_PARTITION_BYTES else partitionSizeConf.toLong
Expand All @@ -125,15 +140,7 @@ abstract class S3InputFormat[K, V] extends InputFormat[K,V] with LazyLogging {

s3client
.listObjectsIterator(request)
.filter(!_.getKey.endsWith("/"))
.filter { obj =>
if (extensions.isEmpty)
true
else {
val key = obj.getKey
extensions.map(key.endsWith).reduce(_ || _)
}
}
.filter(s3ObjectFilter)
.foreach { obj =>
val objSize = obj.getSize
val curSplit =
Expand All @@ -159,7 +166,7 @@ abstract class S3InputFormat[K, V] extends InputFormat[K,V] with LazyLogging {
val keys =
s3client
.listObjectsIterator(request)
.filter(! _.getKey.endsWith("/"))
.filter(s3ObjectFilter)
.toVector

val groupCount = math.max(1, keys.length / partitionCount)
Expand Down