From 086cb169d9635d46a25824cde4e991d97c0a5745 Mon Sep 17 00:00:00 2001 From: lossyrob Date: Fri, 9 Jun 2017 12:53:47 -0400 Subject: [PATCH] Fix to issue #2231 --- .../spark/io/s3/S3InputFormat.scala | 29 ++++++++++++------- 1 file changed, 18 insertions(+), 11 deletions(-) diff --git a/s3/src/main/scala/geotrellis/spark/io/s3/S3InputFormat.scala b/s3/src/main/scala/geotrellis/spark/io/s3/S3InputFormat.scala index aeb221fd58..95660d6da1 100644 --- a/s3/src/main/scala/geotrellis/spark/io/s3/S3InputFormat.scala +++ b/s3/src/main/scala/geotrellis/spark/io/s3/S3InputFormat.scala @@ -20,9 +20,9 @@ import geotrellis.proj4.CRS import geotrellis.spark.io.hadoop._ import geotrellis.util.LazyLogging -import com.amazonaws.services.s3.model.{ListObjectsRequest, ObjectListing} import com.amazonaws.auth._ import com.amazonaws.regions._ +import com.amazonaws.services.s3.model.{ListObjectsRequest, ObjectListing, S3ObjectSummary} import org.apache.hadoop.conf.Configuration import org.apache.hadoop.mapreduce.{InputFormat, Job, JobContext} @@ -103,6 +103,21 @@ abstract class S3InputFormat[K, V] extends InputFormat[K,V] with LazyLogging { var splits: Vector[S3InputSplit] = Vector(makeNewSplit) + val s3ObjectFilter: S3ObjectSummary => Boolean = + { obj => + val key = obj.getKey + + val isDir = key.endsWith("/") + val isTiff = + if (extensions.isEmpty) + true + else { + extensions.map(key.endsWith).reduce(_ || _) + } + + !isDir && isTiff + } + if (null == partitionCountConf) { // By default attempt to make partitions the same size val maxSplitBytes = if (null == partitionSizeConf) S3InputFormat.DEFAULT_PARTITION_BYTES else partitionSizeConf.toLong @@ -125,15 +140,7 @@ abstract class S3InputFormat[K, V] extends InputFormat[K,V] with LazyLogging { s3client .listObjectsIterator(request) - .filter(!_.getKey.endsWith("/")) - .filter { obj => - if (extensions.isEmpty) - true - else { - val key = obj.getKey - extensions.map(key.endsWith).reduce(_ || _) - } - } + .filter(s3ObjectFilter) .foreach { obj => val objSize = obj.getSize val curSplit = @@ -159,7 +166,7 @@ abstract class S3InputFormat[K, V] extends InputFormat[K,V] with LazyLogging { val keys = s3client .listObjectsIterator(request) - .filter(! _.getKey.endsWith("/")) + .filter(s3ObjectFilter) .toVector val groupCount = math.max(1, keys.length / partitionCount)