Skip to content

Commit

Permalink
Fix to issue #2231
Browse files Browse the repository at this point in the history
  • Loading branch information
lossyrob committed Jun 9, 2017
1 parent 86d7d9c commit 086cb16
Showing 1 changed file with 18 additions and 11 deletions.
29 changes: 18 additions & 11 deletions s3/src/main/scala/geotrellis/spark/io/s3/S3InputFormat.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,9 @@ import geotrellis.proj4.CRS
import geotrellis.spark.io.hadoop._
import geotrellis.util.LazyLogging

import com.amazonaws.services.s3.model.{ListObjectsRequest, ObjectListing}
import com.amazonaws.auth._
import com.amazonaws.regions._
import com.amazonaws.services.s3.model.{ListObjectsRequest, ObjectListing, S3ObjectSummary}
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.mapreduce.{InputFormat, Job, JobContext}

Expand Down Expand Up @@ -103,6 +103,21 @@ abstract class S3InputFormat[K, V] extends InputFormat[K,V] with LazyLogging {

var splits: Vector[S3InputSplit] = Vector(makeNewSplit)

val s3ObjectFilter: S3ObjectSummary => Boolean =
{ obj =>
val key = obj.getKey

val isDir = key.endsWith("/")
val isTiff =
if (extensions.isEmpty)
true
else {
extensions.map(key.endsWith).reduce(_ || _)
}

!isDir && isTiff
}

if (null == partitionCountConf) {
// By default attempt to make partitions the same size
val maxSplitBytes = if (null == partitionSizeConf) S3InputFormat.DEFAULT_PARTITION_BYTES else partitionSizeConf.toLong
Expand All @@ -125,15 +140,7 @@ abstract class S3InputFormat[K, V] extends InputFormat[K,V] with LazyLogging {

s3client
.listObjectsIterator(request)
.filter(!_.getKey.endsWith("/"))
.filter { obj =>
if (extensions.isEmpty)
true
else {
val key = obj.getKey
extensions.map(key.endsWith).reduce(_ || _)
}
}
.filter(s3ObjectFilter)
.foreach { obj =>
val objSize = obj.getSize
val curSplit =
Expand All @@ -159,7 +166,7 @@ abstract class S3InputFormat[K, V] extends InputFormat[K,V] with LazyLogging {
val keys =
s3client
.listObjectsIterator(request)
.filter(! _.getKey.endsWith("/"))
.filter(s3ObjectFilter)
.toVector

val groupCount = math.max(1, keys.length / partitionCount)
Expand Down

0 comments on commit 086cb16

Please sign in to comment.