Skip to content

Commit

Permalink
GEOMESA-3299 Fix push-down aggregation for Arrow and BIN queries (#51)
Browse files Browse the repository at this point in the history
  • Loading branch information
elahrvivaz committed Sep 19, 2023
1 parent 92752fd commit 3f90bc9
Show file tree
Hide file tree
Showing 3 changed files with 223 additions and 190 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import org.locationtech.geomesa.arrow.ArrowProperties
import org.locationtech.geomesa.arrow.io.FormatVersion
import org.locationtech.geomesa.arrow.vector.SimpleFeatureVector.SimpleFeatureEncoding
import org.locationtech.geomesa.index.conf.QueryHints._
import org.locationtech.geomesa.index.planning.QueryPlanner
import org.locationtech.geomesa.process.transform.ArrowConversionProcess.ArrowVisitor
import org.locationtech.geomesa.utils.collection.CloseableIterator
import org.locationtech.geomesa.utils.io.WithClose
Expand All @@ -43,7 +42,8 @@ import scala.collection.JavaConverters._
* @param geoServer geoserver
*/
class ArrowOutputFormat(geoServer: GeoServer)
extends WFSGetFeatureOutputFormat(geoServer, Set("arrow", ArrowOutputFormat.MimeType).asJava) with LazyLogging {
extends WFSGetFeatureOutputFormat(geoServer, Set("arrow", ArrowOutputFormat.MimeType).asJava)
with FormatOptionsCallback with LazyLogging {

override def getMimeType(value: AnyRef, operation: Operation): String = ArrowOutputFormat.MimeType

Expand All @@ -57,92 +57,85 @@ class ArrowOutputFormat(geoServer: GeoServer)
s"$name.${ArrowOutputFormat.FileExtension}"
}

override def write(featureCollections: FeatureCollectionResponse,
output: OutputStream,
getFeature: Operation): Unit = {
override def write(
featureCollections: FeatureCollectionResponse,
output: OutputStream,
getFeature: Operation): Unit = {

import org.locationtech.geomesa.index.conf.QueryHints.RichHints

// format_options flags for customizing the request
val request = GetFeatureRequest.adapt(getFeature.getParameters()(0))
WithClose(new BufferedOutputStream(output)) { bos =>
var i = -1
featureCollections.getFeatures.asScala.foreach { fc =>
i += 1
WithClose(CloseableIterator(fc.asInstanceOf[SimpleFeatureCollection].features())) { iter =>
val aggregated = fc.getSchema == org.locationtech.geomesa.arrow.ArrowEncodedSft
if (aggregated) {
// with distributed processing, encodings have already been computed in the servers
iter.map(_.getAttribute(0).asInstanceOf[Array[Byte]]).foreach(bos.write)
} else {
// for non-encoded fs we do the encoding here
logger.warn(s"Server side arrow aggregation is not enabled for feature collection '${fc.getClass}'")
val request = GetFeatureRequest.adapt(getFeature.getParameters()(0))
val hints = new Hints()
populateFormatOptions(request, hints)

val encoding = SimpleFeatureEncoding.min(hints.isArrowIncludeFid, hints.isArrowProxyFid)
val dictionaries = hints.getArrowDictionaryFields
val version = hints.getArrowFormatVersion.getOrElse(FormatVersion.ArrowFormatVersion.get)
val sortField = hints.getArrowSort.map(_._1)
val sortReverse = hints.getArrowSort.map(_._2)
val batchSize = hints.getArrowBatchSize.getOrElse(ArrowProperties.BatchSize.get.toInt)

val preSorted = for (field <- sortField; reverse <- sortReverse.orElse(Some(false))) yield {
request.getQueries.get(i).getSortBy match {
case list if list.size == 1 =>
val sort = list.get(0)
Option(sort.getPropertyName).exists(_.getPropertyName == field) &&
(sort.getSortOrder == SortOrder.DESCENDING) == reverse
case _ => false
}
}

val hints = new Hints()
hints.put(ARROW_ENCODE, java.lang.Boolean.TRUE)
val visitor =
new ArrowVisitor(fc.getSchema.asInstanceOf[SimpleFeatureType], encoding, version,
dictionaries, sortField, sortReverse, preSorted.getOrElse(false), batchSize)

iter.foreach(visitor.visit)

visitor.getResult().results.asScala.foreach(bos.write)
}
}
}
}
}

val options = request.getFormatOptions.asInstanceOf[java.util.Map[String, String]]
override protected def populateFormatOptions(request: GetFeatureRequest, hints: Hints): Unit = {
val options = request.getFormatOptions
hints.put(ARROW_ENCODE, java.lang.Boolean.TRUE)
Option(options.get(Fields.IncludeFids)).foreach { option =>
hints.put(ARROW_INCLUDE_FID, java.lang.Boolean.valueOf(option))
hints.put(ARROW_INCLUDE_FID, java.lang.Boolean.valueOf(option.toString))
}
Option(options.get(Fields.ProxyFids)).foreach { option =>
hints.put(ARROW_PROXY_FID, java.lang.Boolean.valueOf(option))
hints.put(ARROW_PROXY_FID, java.lang.Boolean.valueOf(option.toString))
}
Option(options.get(Fields.DictionaryFields)).foreach { option =>
hints.put(ARROW_DICTIONARY_FIELDS, option)
hints.put(ARROW_DICTIONARY_FIELDS, option.toString)
}
Option(options.get(Fields.FormatVersion)).foreach { option =>
hints.put(ARROW_FORMAT_VERSION, option)
hints.put(ARROW_FORMAT_VERSION, option.toString)
}
Option(options.get(Fields.SortField)).foreach { option =>
hints.put(ARROW_SORT_FIELD, option)
hints.put(ARROW_SORT_FIELD, option.toString)
}
Option(options.get(Fields.SortReverse)).foreach { option =>
hints.put(ARROW_SORT_REVERSE, java.lang.Boolean.valueOf(option))
hints.put(ARROW_SORT_REVERSE, java.lang.Boolean.valueOf(option.toString))
}
Option(options.get(Fields.BatchSize)).foreach { option =>
hints.put(ARROW_BATCH_SIZE, java.lang.Integer.valueOf(option))
hints.put(ARROW_BATCH_SIZE, java.lang.Integer.valueOf(option.toString))
}
Option(options.get(Fields.ProcessDeltas)).foreach { option =>
hints.put(ARROW_PROCESS_DELTAS, java.lang.Boolean.valueOf(option))
}

// set hints into thread local state - this prevents any wrapping feature collections from messing with
// the aggregation
QueryPlanner.setPerThreadQueryHints(hints.asScala.toMap)

try {
WithClose(new BufferedOutputStream(output)) { bos =>
var i = -1
featureCollections.getFeatures.asScala.foreach { fc =>
i += 1
WithClose(CloseableIterator(fc.asInstanceOf[SimpleFeatureCollection].features())) { iter =>
// this check needs to be done *after* getting the feature iterator so that the return sft will be set
val aggregated = fc.getSchema == org.locationtech.geomesa.arrow.ArrowEncodedSft
if (aggregated) {
// with distributed processing, encodings have already been computed in the servers
iter.map(_.getAttribute(0).asInstanceOf[Array[Byte]]).foreach(bos.write)
} else {
// for non-encoded fs we do the encoding here
logger.warn(s"Server side arrow aggregation is not enabled for feature collection '${fc.getClass}'")

val encoding = SimpleFeatureEncoding.min(hints.isArrowIncludeFid, hints.isArrowProxyFid)
val dictionaries = hints.getArrowDictionaryFields
val version = hints.getArrowFormatVersion.getOrElse(FormatVersion.ArrowFormatVersion.get)
val sortField = hints.getArrowSort.map(_._1)
val sortReverse = hints.getArrowSort.map(_._2)
val batchSize = hints.getArrowBatchSize.getOrElse(ArrowProperties.BatchSize.get.toInt)

val preSorted = for (field <- sortField; reverse <- sortReverse.orElse(Some(false))) yield {
request.getQueries.get(i).getSortBy match {
case list if list.size == 1 =>
val sort = list.get(0)
Option(sort.getPropertyName).exists(_.getPropertyName == field) &&
(sort.getSortOrder == SortOrder.DESCENDING) == reverse
case _ => false
}
}

val visitor = new ArrowVisitor(fc.getSchema.asInstanceOf[SimpleFeatureType], encoding, version,
dictionaries, sortField, sortReverse, preSorted.getOrElse(false), batchSize)

iter.foreach(visitor.visit)

visitor.getResult().results.asScala.foreach(bos.write)
}
}
}
}
} finally {
QueryPlanner.clearPerThreadQueryHints()
hints.put(ARROW_PROCESS_DELTAS, java.lang.Boolean.valueOf(option.toString))
}
}
}
Expand Down
Loading

0 comments on commit 3f90bc9

Please sign in to comment.