diff --git a/s3/src/main/scala/geotrellis/spark/io/s3/S3GeoTiffRDD.scala b/s3/src/main/scala/geotrellis/spark/io/s3/S3GeoTiffRDD.scala index 328ca4065d..945de2fa2a 100644 --- a/s3/src/main/scala/geotrellis/spark/io/s3/S3GeoTiffRDD.scala +++ b/s3/src/main/scala/geotrellis/spark/io/s3/S3GeoTiffRDD.scala @@ -22,7 +22,7 @@ import geotrellis.raster.io.geotiff.tags.TiffTags import geotrellis.spark._ import geotrellis.spark.io.RasterReader import geotrellis.spark.io.s3.util.S3RangeReader -import geotrellis.util.StreamingByteReader +import geotrellis.util.{StreamingByteReader, LazyLogging} import geotrellis.vector._ import org.apache.hadoop.conf.Configuration @@ -36,7 +36,7 @@ import java.nio.ByteBuffer /** * The S3GeoTiffRDD object allows for the creation of whole or windowed RDD[(K, V)]s from files on S3. */ -object S3GeoTiffRDD { +object S3GeoTiffRDD extends LazyLogging { final val GEOTIFF_TIME_TAG_DEFAULT = "TIFFTAG_DATETIME" final val GEOTIFF_TIME_FORMAT_DEFAULT = "yyyy:MM:dd HH:mm:ss" @@ -51,7 +51,8 @@ object S3GeoTiffRDD { * May result in a one input GeoTiff being split amongst multiple records if it exceeds this size. * If no maximum tile size is specific, then each file file is read fully. * @param numPartitions How many partitions Spark should create when it repartitions the data. - * @param partitionBytes Desired partition size in bytes, at least one item per partition will be assigned + * @param partitionBytes Desired partition size in bytes, at least one item per partition will be assigned. + This option is incompatible with the maxTileSize option. * @param chunkSize How many bytes should be read in at a time. * @param delimiter Delimiter to use for S3 objet listings. See * @param getS3Client A function to instantiate an S3Client. Must be serializable. @@ -163,7 +164,19 @@ object S3GeoTiffRDD { val repartitioned = options.numPartitions match { case Some(p) => windows.repartition(p) - case None => windows + case None => + options.partitionBytes match { + case Some(byteCount) => + // Because we do not have cell type information, we cannot + // perform the necessary estimates for the partition bytes. + logger.warn( + s"${classOf[Options].getName}.partitionBytes set with maxTileSize, " + + "cannot perform partitioning based on byte count. Option ignored. " + + "Use numPartitions instead.") + windows + case None => + windows + } } repartitioned.map { case (objectRequest: GetObjectRequest, pixelWindow: GridBounds) => diff --git a/s3/src/main/scala/geotrellis/spark/io/s3/S3InputFormat.scala b/s3/src/main/scala/geotrellis/spark/io/s3/S3InputFormat.scala index aeb221fd58..95660d6da1 100644 --- a/s3/src/main/scala/geotrellis/spark/io/s3/S3InputFormat.scala +++ b/s3/src/main/scala/geotrellis/spark/io/s3/S3InputFormat.scala @@ -20,9 +20,9 @@ import geotrellis.proj4.CRS import geotrellis.spark.io.hadoop._ import geotrellis.util.LazyLogging -import com.amazonaws.services.s3.model.{ListObjectsRequest, ObjectListing} import com.amazonaws.auth._ import com.amazonaws.regions._ +import com.amazonaws.services.s3.model.{ListObjectsRequest, ObjectListing, S3ObjectSummary} import org.apache.hadoop.conf.Configuration import org.apache.hadoop.mapreduce.{InputFormat, Job, JobContext} @@ -103,6 +103,21 @@ abstract class S3InputFormat[K, V] extends InputFormat[K,V] with LazyLogging { var splits: Vector[S3InputSplit] = Vector(makeNewSplit) + val s3ObjectFilter: S3ObjectSummary => Boolean = + { obj => + val key = obj.getKey + + val isDir = key.endsWith("/") + val isTiff = + if (extensions.isEmpty) + true + else { + extensions.map(key.endsWith).reduce(_ || _) + } + + !isDir && isTiff + } + if (null == partitionCountConf) { // By default attempt to make partitions the same size val maxSplitBytes = if (null == partitionSizeConf) S3InputFormat.DEFAULT_PARTITION_BYTES else partitionSizeConf.toLong @@ -125,15 +140,7 @@ abstract class S3InputFormat[K, V] extends InputFormat[K,V] with LazyLogging { s3client .listObjectsIterator(request) - .filter(!_.getKey.endsWith("/")) - .filter { obj => - if (extensions.isEmpty) - true - else { - val key = obj.getKey - extensions.map(key.endsWith).reduce(_ || _) - } - } + .filter(s3ObjectFilter) .foreach { obj => val objSize = obj.getSize val curSplit = @@ -159,7 +166,7 @@ abstract class S3InputFormat[K, V] extends InputFormat[K,V] with LazyLogging { val keys = s3client .listObjectsIterator(request) - .filter(! _.getKey.endsWith("/")) + .filter(s3ObjectFilter) .toVector val groupCount = math.max(1, keys.length / partitionCount)