Skip to content

Commit

Permalink
Add system fields to input sources. (#15276)
Browse files Browse the repository at this point in the history
* Add system fields to input sources.

Main changes:

1) The SystemField enum defines system fields "__file_uri", "__file_path",
   and "__file_bucket". They are associated with each input entity.

2) The SystemFieldInputSource interface can be added to any InputSource
   to make it system-field-capable. It sets up serialization of a list
   of configured "systemFields" in the JSON form of the input source, and
   provides a method getSystemFieldValue for computing the value of each
   system field. Cloud object, HDFS, HTTP, and Local now have this.

* Fix various LocalInputSource calls.

* Fix style stuff.

* Fixups.

* Fix tests and coverage.
  • Loading branch information
gianm authored Nov 2, 2023
1 parent dc3213b commit d87d92b
Show file tree
Hide file tree
Showing 56 changed files with 1,649 additions and 422 deletions.
6 changes: 6 additions & 0 deletions docs/ingestion/input-sources.md
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,7 @@ Sample specs:
|prefixes|JSON array of URI prefixes for the locations of S3 objects to be ingested. Empty objects starting with one of the given prefixes will be skipped.|None|`uris` or `prefixes` or `objects` must be set|
|objects|JSON array of S3 Objects to be ingested.|None|`uris` or `prefixes` or `objects` must be set|
|objectGlob|A glob for the object part of the S3 URI. In the URI `s3://foo/bar/file.json`, the glob is applied to `bar/file.json`.<br /><br />The glob must match the entire object part, not just the filename. For example, the glob `*.json` does not match `s3://foo/bar/file.json`, because the object part is `bar/file.json`, and the`*` does not match the slash. To match all objects ending in `.json`, use `**.json` instead.<br /><br />For more information, refer to the documentation for [`FileSystem#getPathMatcher`](https://docs.oracle.com/javase/8/docs/api/java/nio/file/FileSystem.html#getPathMatcher-java.lang.String-).|None|no|
|systemFields|JSON array of system fields to return as part of input rows. Possible values: `__file_uri` (S3 URI starting with `s3://`), `__file_bucket` (S3 bucket), and `__file_path` (S3 object key).|None|no|
| endpointConfig |Config for overriding the default S3 endpoint and signing region. This would allow ingesting data from a different S3 store. Please see [s3 config](../development/extensions-core/s3.md#connecting-to-s3-configuration) for more information.|None|No (defaults will be used if not given)
| clientConfig |S3 client properties for the overridden s3 endpoint. This is used in conjunction with `endPointConfig`. Please see [s3 config](../development/extensions-core/s3.md#connecting-to-s3-configuration) for more information.|None|No (defaults will be used if not given)
| proxyConfig |Properties for specifying proxy information for the overridden s3 endpoint. This is used in conjunction with `clientConfig`. Please see [s3 config](../development/extensions-core/s3.md#connecting-to-s3-configuration) for more information.|None|No (defaults will be used if not given)
Expand Down Expand Up @@ -297,6 +298,7 @@ Google Cloud Storage object:
|--------|-----------|-------|---------|
|bucket|Name of the Google Cloud Storage bucket|None|yes|
|path|The path where data is located.|None|yes|
|systemFields|JSON array of system fields to return as part of input rows. Possible values: `__file_uri` (Google Cloud Storage URI starting with `gs://`), `__file_bucket` (GCS bucket), and `__file_path` (GCS key).|None|no|

## Azure input source

Expand Down Expand Up @@ -371,6 +373,7 @@ Sample specs:
|prefixes|JSON array of URI prefixes for the locations of Azure objects to ingest, in the form `azure://<container>/<prefix>`. Empty objects starting with one of the given prefixes are skipped.|None|`uris` or `prefixes` or `objects` must be set|
|objects|JSON array of Azure objects to ingest.|None|`uris` or `prefixes` or `objects` must be set|
|objectGlob|A glob for the object part of the S3 URI. In the URI `s3://foo/bar/file.json`, the glob is applied to `bar/file.json`.<br /><br />The glob must match the entire object part, not just the filename. For example, the glob `*.json` does not match `s3://foo/bar/file.json`, because the object part is `bar/file.json`, and the`*` does not match the slash. To match all objects ending in `.json`, use `**.json` instead.<br /><br />For more information, refer to the documentation for [`FileSystem#getPathMatcher`](https://docs.oracle.com/javase/8/docs/api/java/nio/file/FileSystem.html#getPathMatcher-java.lang.String-).|None|no|
|systemFields|JSON array of system fields to return as part of input rows. Possible values: `__file_uri` (Azure blob URI starting with `azure://`), `__file_bucket` (Azure bucket), and `__file_path` (Azure object path).|None|no|

Note that the Azure input source skips all empty objects only when `prefixes` is specified.

Expand Down Expand Up @@ -462,6 +465,7 @@ Sample specs:
|--------|-----------|-------|---------|
|type|Set the value to `hdfs`.|None|yes|
|paths|HDFS paths. Can be either a JSON array or comma-separated string of paths. Wildcards like `*` are supported in these paths. Empty files located under one of the given paths will be skipped.|None|yes|
|systemFields|JSON array of system fields to return as part of input rows. Possible values: `__file_uri` (URI) and `__file_path` (path component of URI).|None|no|

You can also ingest from other storage using the HDFS input source if the HDFS client supports that storage.
However, if you want to ingest from cloud storage, consider using the service-specific input source for your data storage.
Expand Down Expand Up @@ -551,6 +555,7 @@ You can also use the other existing Druid PasswordProviders. Here is an example
|uris|URIs of the input files. See below for the protocols allowed for URIs.|None|yes|
|httpAuthenticationUsername|Username to use for authentication with specified URIs. Can be optionally used if the URIs specified in the spec require a Basic Authentication Header.|None|no|
|httpAuthenticationPassword|PasswordProvider to use with specified URIs. Can be optionally used if the URIs specified in the spec require a Basic Authentication Header.|None|no|
|systemFields|JSON array of system fields to return as part of input rows. Possible values: `__file_uri` (URI including scheme) and `__file_path` (path component of URI).|None|no|

You can only use protocols listed in the `druid.ingestion.http.allowedProtocols` property as HTTP input sources.
The `http` and `https` protocols are allowed by default. See [HTTP input source security configuration](../configuration/index.md#http-input-source) for more details.
Expand Down Expand Up @@ -616,6 +621,7 @@ Sample spec:
|filter|A wildcard filter for files. See [here](http://commons.apache.org/proper/commons-io/apidocs/org/apache/commons/io/filefilter/WildcardFileFilter) for more information. Files matching the filter criteria are considered for ingestion. Files not matching the filter criteria are ignored.|yes if `baseDir` is specified|
|baseDir|Directory to search recursively for files to be ingested. Empty files under the `baseDir` will be skipped.|At least one of `baseDir` or `files` should be specified|
|files|File paths to ingest. Some files can be ignored to avoid ingesting duplicate files if they are located under the specified `baseDir`. Empty files will be skipped.|At least one of `baseDir` or `files` should be specified|
|systemFields|JSON array of system fields to return as part of input rows. Possible values: `__file_uri` (File URI starting with `file:`) and `__file_path` (file path).|None|no|

## Druid input source

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,4 +85,9 @@ public Predicate<Throwable> getRetryCondition()
{
return OssUtils.RETRYABLE;
}

CloudObjectLocation getObject()
{
return object;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
package org.apache.druid.data.input.aliyun;

import com.aliyun.oss.OSS;
import com.aliyun.oss.model.OSSObjectSummary;
import com.aliyun.oss.model.ObjectMetadata;
import com.fasterxml.jackson.annotation.JacksonInject;
import com.fasterxml.jackson.annotation.JsonCreator;
Expand All @@ -31,23 +30,21 @@
import com.google.common.base.Suppliers;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterators;
import org.apache.commons.lang.StringUtils;
import org.apache.druid.data.input.InputEntity;
import org.apache.druid.data.input.InputSplit;
import org.apache.druid.data.input.impl.CloudObjectInputSource;
import org.apache.druid.data.input.impl.CloudObjectLocation;
import org.apache.druid.data.input.impl.CloudObjectSplitWidget;
import org.apache.druid.data.input.impl.SplittableInputSource;
import org.apache.druid.data.input.impl.systemfield.SystemField;
import org.apache.druid.data.input.impl.systemfield.SystemFields;
import org.apache.druid.storage.aliyun.OssInputDataConfig;
import org.apache.druid.storage.aliyun.OssStorageDruidModule;
import org.apache.druid.storage.aliyun.OssUtils;

import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import java.net.URI;
import java.nio.file.FileSystems;
import java.nio.file.PathMatcher;
import java.nio.file.Paths;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
Expand Down Expand Up @@ -80,10 +77,11 @@ public OssInputSource(
@JsonProperty("prefixes") @Nullable List<URI> prefixes,
@JsonProperty("objects") @Nullable List<CloudObjectLocation> objects,
@JsonProperty("objectGlob") @Nullable String objectGlob,
@JsonProperty(SYSTEM_FIELDS_PROPERTY) @Nullable SystemFields systemFields,
@JsonProperty("properties") @Nullable OssClientConfig inputSourceConfig
)
{
super(OssStorageDruidModule.SCHEME, uris, prefixes, objects, objectGlob);
super(OssStorageDruidModule.SCHEME, uris, prefixes, objects, objectGlob, systemFields);
this.inputDataConfig = Preconditions.checkNotNull(inputDataConfig, "inputDataConfig");
Preconditions.checkNotNull(client, "client");
this.inputSourceConfig = inputSourceConfig;
Expand Down Expand Up @@ -164,10 +162,28 @@ public SplittableInputSource<List<CloudObjectLocation>> withSplit(InputSplit<Lis
null,
split.get(),
getObjectGlob(),
systemFields,
getOssInputSourceConfig()
);
}

@Override
public Object getSystemFieldValue(InputEntity entity, SystemField field)
{
final OssEntity s3Entity = (OssEntity) entity;

switch (field) {
case URI:
return s3Entity.getUri().toString();
case BUCKET:
return s3Entity.getObject().getBucket();
case PATH:
return s3Entity.getObject().getPath();
default:
return null;
}
}

@Override
public int hashCode()
{
Expand Down Expand Up @@ -198,30 +214,8 @@ public String toString()
", prefixes=" + getPrefixes() +
", objects=" + getObjects() +
", objectGlob=" + getObjectGlob() +
(systemFields.getFields().isEmpty() ? "" : ", systemFields=" + systemFields) +
", ossInputSourceConfig=" + getOssInputSourceConfig() +
'}';
}

private Iterable<OSSObjectSummary> getIterableObjectsFromPrefixes()
{
return () -> {
Iterator<OSSObjectSummary> iterator = OssUtils.objectSummaryIterator(
clientSupplier.get(),
getPrefixes(),
inputDataConfig.getMaxListingLength()
);

// Skip files that didn't match glob filter.
if (StringUtils.isNotBlank(getObjectGlob())) {
PathMatcher m = FileSystems.getDefault().getPathMatcher("glob:" + getObjectGlob());

iterator = Iterators.filter(
iterator,
object -> m.matches(Paths.get(object.getKey()))
);
}

return iterator;
};
}
}
Loading

0 comments on commit d87d92b

Please sign in to comment.