Skip to content

Commit

Permalink
Merge remote-tracking branch 'samza/master'
Browse files Browse the repository at this point in the history
  • Loading branch information
zmyer committed Feb 5, 2017
2 parents 1e8f5ed + 1f4a593 commit 9d68ba6
Show file tree
Hide file tree
Showing 86 changed files with 3,512 additions and 1,861 deletions.
58 changes: 58 additions & 0 deletions KEYS
Original file line number Diff line number Diff line change
Expand Up @@ -480,3 +480,61 @@ Bc98//GzOjSf5nEiuy3CeeIh8uQzMqQbkUFZqSGuYsRsfTrEBod0yig47CRU6Tz1
KVv5oy0NGj1TibypUsM/QjkwhUlQBj9ctQ7e
=TtlL
-----END PGP PUBLIC KEY BLOCK-----
pub 4096R/AF81FFBF 2017-02-01 [expires: 2021-02-01]
uid [ultimate] Jagadish Venkatraman <[email protected]>
sig 3 AF81FFBF 2017-02-01 Jagadish Venkatraman <[email protected]>
sub 4096R/57C790D3 2017-02-01 [expires: 2021-02-01]
sig AF81FFBF 2017-02-01 Jagadish Venkatraman <[email protected]>

-----BEGIN PGP PUBLIC KEY BLOCK-----

mQINBFiSN2ABEADcwjhOzFtV+UQ9K2Qs9I+1hFjQGm+WDa9XymCt5IAkl82mMBpg
tnYr+M8jq5KA27BdKdbnwl1e1qCeco1XG0UBTSBRV0X6iKf0QlZQLLP4xz8leHMi
xr4DUQL7wPzfR/uLN2RV63BGy++dcXmSdOVzc/p39oQu6NP8mwLft5AWjFDECPht
ftszPLZUJa+IXPebTWWc8HQ4wfpat2zCpEd4zNsHXSOUW4gdbrtHV+ZHOIVqbsjY
R9rjMklcF/wM5uGt+QmySl0Wmgftf9yrkzgGo2PZtmeu0YZGuBuZ4V2YJh3FtZj/
wS3nL4WdR1wJOSlWzTO69GqJndqbdbwKmvtdGCbSiF7cn28jSkOS0fR4yIWrICy9
Lzxsgccj4449NVeAw85gUQVceunzytyLL+mvUCKvGYLUKcl7IVQt8JcdzQrB/stG
GsZv0oXNKvI9lQIR1RuxLLRnXgSoFPRByr0XZ/F7SDVTA/PBKpwQ1aQqV4zlYK8V
tuxwX827EEBgH3FyZ+wSXKRl9cq0z7U8ThYCJjEKVoNgSuOZNFZhk3YYsSr98uA5
3/5yOaBxYvBl2UqewwTGny8RmV7Ynhvo2Jprme7PkAk3T7oxgRISKTGqEyqtT36N
ESZFO7FaI9bMimFhVy0iaIts5v9oFdXujIcrbE4DB+ESrZ2m3TXjCx7ZLQARAQAB
tCpKYWdhZGlzaCBWZW5rYXRyYW1hbiA8amFnYWRpc2hAYXBhY2hlLm9yZz6JAj0E
EwEKACcFAliSN2ACGwMFCQeGH4AFCwkIBwMFFQoJCAsFFgIDAQACHgECF4AACgkQ
44gjGK+B/79odBAAjWEXMBLXQrWT6wSstHk108EyzSgm3SuL42E0Xto9Wo3+Lyku
tqQbibRkqLn7SqdinougyBaF2bX7jwImqKed6HDEnw3Q1KdXY7/8eK0yTu+dYDE+
rqmLyT9y4FDdutRTI2x1DsqB2kLL65rwDU+ex74WkEFc2+F0PFlA8RzxO8kmA/bi
n6AtSvf+0dsxjWq0DfGByk9dLP/EfFENiW1OB0h/becmqGXooV/QmZH9tXRRCVYC
qPac8sFv+nS30OD95qPHvrTjWHHFJRcx/7TgBFMMRE/Yxw/fBRit/Y60nh3GT1RL
Qoc5E19n/NKlHHMuM5uyyqh4lL2RoAo3wPLwwzan1X5MRYIY07cjuu0mdlo4RWxJ
zkNOoQEdDZjpNSCi/kNA4AZYLx/1HPC18YyozZVxbV2cKFGH0KmSwh3LlgozXAQi
7YQqdeaH4ciPh9yg/VeaEHJ4xzhYgFQTnacm6xQwB+qeM8WxbafnEV3MxITkoBtw
FPrxhH5MfLD36xhPtoAJd2yVDnopXp+fd/ELQSXzKAdWEQHlv97I8KGMTjb0JPNV
KbBfOKWac2y4kgyjtPmNYDNeIAYUuN9QopJXd7cpJy7EDmJA65ZMiYQKNmhOETss
Fkr4jkrRAyRg7NkQDZuS0PFoGt6RY96JA81jgtsjHBRTm8Z85Iu4nyYS2jC5Ag0E
WJI3YAEQAO60ZWkCeKwVSRBOT68pv9mTLjbXvdC7qNW2Oy1B3MELZJU8j3t4Zn+p
UCjo9nrckxXjo5PTe2OTAeZeORzEBQYEs7GxZhsYiHNbElNFjAAI4dMo4dGpHY86
+OGLn/HaYc/yF4Rf82LXNunm7O5bgErEkzZ8pzDE2OeI6CK1xnqH/jSBs1Dct4c0
R45Yx5+V9Z8PrUUgaERjJBzNm/vvkkh2PwSr7j3GXxBxNlMoDv6lX7a/c3BOaHKn
R7PXmzlR1er8sUkX5pEfUHfV0SD6QOFwIJ28x0Kk7d79LS+WMau1EjfVCQ7P+/qg
w8GYiGI2pE6Ql8TC74HVyxdoBcj9vgw+nU/6g07FnHe7jAgTS6tfmvE5DZOvOJZI
Q+/b+A0S9DqE9JRfPKSc376TxghZ8DKpr8ZzpppCLpI8vIqCQT+AOR5ggjmLRLE5
XcuIcn/CMSDhxaUE/WShN6LQK1CY7otGpiXSM4FQJ/kcW/D49pUknMFh3+zmgxSZ
xulUeqtDKln5U09u3IAXWcpSMU/zJgNLck62jqKk9SqAcWL1gxbvkHM2Zp1w1xPs
6p0uIviFOMgSEMkFP9NjMQlCRDOTgVJQWt3WPZfnsptrHSM+aPErefYI+txpV3bM
hs7ICGQTYDBOeCGXDOayNk/T71/b48Igx1GlqN6oTUIxvFdy1NcnABEBAAGJAiUE
GAEKAA8FAliSN2ACGwwFCQeGH4AACgkQ44gjGK+B/79UbQ//RnE49iepoknPS8Sp
P9Xg0oSH8HKgiKCEDo8vVzri0yEAJx9YxNXLC5kJCRvY5FsHwbRvC1OpXxqMuABt
UbAsWY1Ypm8jkGar0o7V50+Kb4SJdVpZw608kcfBYcSMIsqUw38ECz4OnsKW64IE
UikOxNfNai/7atxfLpIcAM2ZxQRIxA6bOXZU1GIwFqZlrGMt9nGadHT9LyZWBcUZ
aavTD8OPocCKf5+BehVp3fIV7Sv1HwMsa84uf0VuU+oP/w3D+ZrQCYlTbvqanaeu
yNr+WLm3yBRCTXUqgUBgqqTX3bCbMMf9EeXfIiVC9s7/lfvnEOZaBGk8zEYLnY21
v13KT7FU0zApGb5YVSNXOp7h2AJxbaRQ8T7cBYCgTrH4aadFEuTiZHesX86QRrft
LpJB3S9PC8ugNzgik7z14T9bjmGLLh0zKyzzfusfGaaiC/VdZX70OskgL5j0aG/t
eu6p3VUe3evD/YuMeoDgEigMyUkz4LINWIaI0b95T4xOOQIzcM1U/NKBe0XakiaN
01z7I4SFLyuYupnllWVUBTAMkujfKJ6ciizTtCqfsOQn7uGXdwdgX7wQwzuaObWU
uj6noCYOQVNMW+SqNxoWq8gJ+Kvgbk6U3FvuUVsOdl8Njrv2h8hBzW2cu/0UB+y5
iI+q9qfZFuNUNv6bhwFG77cTA6k=
=/o0l
-----END PGP PUBLIC KEY BLOCK-----

1 change: 1 addition & 0 deletions NOTICE
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,4 @@ Copyright 2014 The Apache Software Foundation

This product includes software developed at The Apache Software
Foundation (http://www.apache.org/).

2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ Samza's key features include:
* **Pluggable:** Though Samza works out of the box with Kafka and YARN, Samza provides a pluggable API that lets you run Samza with other messaging systems and execution environments.
* **Processor isolation:** Samza works with Apache YARN, which supports Hadoop's security model, and resource isolation through Linux CGroups.

Check out [Hello Samza](https://samza.apache.org/startup/hello-samza/0.8/) to try Samza. Read the [Background](https://samza.apache.org/learn/documentation/0.8/introduction/background.html) page to learn more about Samza.
Check out [Hello Samza](https://samza.apache.org/startup/hello-samza/0.11/) to try Samza. Read the [Background](https://samza.apache.org/learn/documentation/0.8/introduction/background.html) page to learn more about Samza.

### Building Samza

Expand Down
110 changes: 110 additions & 0 deletions docs/learn/documentation/versioned/hdfs/consumer.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
---
layout: page
title: Reading from HDFS
---
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->

You can configure your Samza job to read from HDFS files. The [HdfsSystemConsumer](https://github.com/apache/samza/blob/master/samza-hdfs/src/main/java/org/apache/samza/system/hdfs/HdfsSystemConsumer.java) can read from HDFS files. Avro encoded records are supported out of the box and it is easy to extend to support other formats (plain text, csv, json etc). See `Event format` section below.

### Environment

Your job needs to run on the same YARN cluster which hosts the HDFS you want to consume from.

### Partitioning

Partitioning works at the level of individual HDFS files. Each file is treated as a stream partition, while a directory that contains these files is a stream. For example, if you want to read from a HDFS path which contains 10 individual files, there will naturally be 10 partitions created. You can configure up to 10 Samza containers to process these partitions. If you want to read from a single HDFS file, there is currently no way to break down the consumption - you can only have one container to process the file.

### Event format

[HdfsSystemConsumer](https://github.com/apache/samza/blob/master/samza-hdfs/src/main/java/org/apache/samza/system/hdfs/HdfsSystemConsumer.java) currently supports reading from avro files. The received [IncomingMessageEnvelope](../api/javadocs/org/apache/samza/system/IncomingMessageEnvelope.html) contains three significant fields:

1. The key which is empty
2. The message which is set to the avro [GenericRecord](https://avro.apache.org/docs/1.7.6/api/java/org/apache/avro/generic/GenericRecord.html)
3. The stream partition which is set to the name of the HDFS file

To extend the support beyond avro files (e.g. json, csv, etc.), you can implement the interface [SingleFileHdfsReader](https://github.com/apache/samza/blob/master/samza-hdfs/src/main/java/org/apache/samza/system/hdfs/reader/SingleFileHdfsReader.java) (take a look at the implementation of [AvroFileHdfsReader](https://github.com/apache/samza/blob/master/samza-hdfs/src/main/java/org/apache/samza/system/hdfs/reader/AvroFileHdfsReader.java) as a sample).

### End of stream support

One major difference between HDFS data and Kafka data is that while a kafka topic has an unbounded stream of messages, HDFS files are bounded and have a notion of EOF.

You can choose to implement [EndOfStreamListenerTask](../api/javadocs/org/apache/samza/task/EndOfStreamListenerTask.html) to receive a callback when all partitions are at end of stream. When all partitions being processed by the task are at end of stream (i.e. EOF has been reached for all files), the Samza job exits automatically.

### Basic Configuration

Here is a few of the basic configs to set up HdfsSystemConsumer:

```
# The HDFS system consumer is implemented under the org.apache.samza.system.hdfs package,
# so use HdfsSystemFactory as the system factory for your system
systems.hdfs-clickstream.samza.factory=org.apache.samza.system.hdfs.HdfsSystemFactory
# You need to specify the path of files you want to consume in task.inputs
task.inputs=hdfs-clickstream.hdfs:/data/clickstream/2016/09/11
# You can specify a white list of files you want your job to process (in Java Pattern style)
systems.hdfs-clickstream.partitioner.defaultPartitioner.whitelist=.*avro
# You can specify a black list of files you don't want your job to process (in Java Pattern style),
# by default it's empty.
# Note that you can have both white list and black list, in which case both will be applied.
systems.hdfs-clickstream.partitioner.defaultPartitioner.blacklist=somefile.avro
```

### Security Configuration

The following additional configs are required when accessing HDFS clusters that have kerberos enabled:

```
# Use the SamzaYarnSecurityManagerFactory, which fetches and renews the Kerberos delegation tokens when the job is running in a secure environment.
job.security.manager.factory=org.apache.samza.job.yarn.SamzaYarnSecurityManagerFactory
# Kerberos principal
yarn.kerberos.principal=your-principal-name
# Path of the keytab file (local path)
yarn.kerberos.keytab=/tmp/keytab
```

### Advanced Configuration

Some of the advanced configuration you might need to set up:

```
# Specify the group pattern for advanced partitioning.
systems.hdfs-clickstream.partitioner.defaultPartitioner.groupPattern=part-[id]-.*
```

The advanced partitioning goes beyond the basic assumption that each file is a partition. With advanced partitioning you can group files into partitions arbitrarily. For example, if you have a set of files as [part-01-a.avro, part-01-b.avro, part-02-a.avro, part-02-b.avro, part-03-a.avro] that you want to organize into three partitions as (part-01-a.avro, part-01-b.avro), (part-02-a.avro, part-02-b.avro), (part-03-a.avro), where the numbers in the middle act as a "group identifier", you can then set this property to be "part-[id]-.*" (note that **[id]** is a reserved term here, i.e. you have to literally put it as **[id]**). The partitioner will apply this pattern to all file names and extract the "group identifier" ("[id]" in the pattern), then use the "group identifier" to group files into partitions.

```
# Specify the type of files your job want to process (support avro only for now)
systems.hdfs-clickstream.consumer.reader=avro
# Max number of retries (per-partition) before the container fails.
system.hdfs-clickstream.consumer.numMaxRetries=10
```

For the list of all configs, check out the configuration table page [here](../jobs/configuration-table.html)

### More Information
[HdfsSystemConsumer design doc](https://issues.apache.org/jira/secure/attachment/12827670/HDFSSystemConsumer.pdf)

## [Security &raquo;](../operations/security.html)
2 changes: 1 addition & 1 deletion docs/learn/documentation/versioned/hdfs/producer.md
Original file line number Diff line number Diff line change
Expand Up @@ -67,4 +67,4 @@ systems.hdfs-clickstream.producer.hdfs.write.batch.size.bytes=134217728

The above configuration assumes a Metrics and Serde implemnetation has been properly configured against the `some-serde-impl` and `some-metrics-impl` labels somewhere else in the same `job.properties` file. Each of these properties has a reasonable default, so you can leave out the ones you don't need to customize for your job run.

## [Security &raquo;](../operations/security.html)
## [Reading from HDFS &raquo;](../hdfs/consumer.html)
1 change: 1 addition & 0 deletions docs/learn/documentation/versioned/index.html
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ <h4>YARN</h4>
<li><a href="yarn/isolation.html">Isolation</a></li>
<li><a href="yarn/yarn-host-affinity.html">Host Affinity & Yarn</a></li>
<li><a href="hdfs/producer.html">Writing to HDFS</a></li>
<li><a href="hdfs/consumer.html">Reading from HDFS</a></li>
<li><a href="hdfs/yarn-security.html">Yarn Security</a></li>
<!-- TODO write yarn pages
<li><a href="">Fault Tolerance</a></li>
Expand Down
55 changes: 55 additions & 0 deletions docs/learn/documentation/versioned/jobs/configuration-table.html
Original file line number Diff line number Diff line change
Expand Up @@ -1422,6 +1422,21 @@ <h1>Samza Configuration Reference</h1>
</td>
</tr>

<tr>
<td class="property" id="stores-rocksdb-log-file-size">stores.<span class="store">store-name</span>.<br>rocksdb.max.log.file.size.bytes</td>
<td class="default">67108864</td>
<td class="description">
The maximum size in bytes of the RocksDB LOG file before it is rotated.
</td>
</tr>

<tr>
<td class="property" id="stores-rocksdb-num-log-files">stores.<span class="store">store-name</span>.<br>rocksdb.keep.log.file.num</td>
<td class="default">2</td>
<td class="description">
The number of RocksDB LOG files (including rotated LOG.old.* files) to keep.
</td>
</tr>

<tr>
<th colspan="3" class="section" id="cluster-manager">
Expand Down Expand Up @@ -1809,6 +1824,46 @@ <h1>Samza Configuration Reference</h1>
<td class="description">The number of outgoing messages to write to each HDFS output file before cutting a new file. Defaults to 262144 if not set.</td>
</tr>

<tr>
<th colspan="3" class="section" id="hdfs-system-consumer"><a href="../hdfs/consumer.html">Reading from HDFS</a></th>
</tr>

<tr>
<td class="property" id="hdfs-consumer-buffer-capacity">systems.*.consumer.bufferCapacity</td>
<td class="default">10</td>
<td class="description">Capacity of the hdfs consumer buffer - the blocking queue used for storing messages. Larger buffer capacity typically leads to better throughput but consumes more memory.</td>
</tr>
<tr>
<td class="property" id="hdfs-consumer-numMaxRetries">systems.*.consumer.numMaxRetries</td>
<td class="default">10</td>
<td class="description">The number of retry attempts when there is a failure to fetch messages from HDFS, before the container fails.</td>
</tr>
<tr>
<td class="property" id="hdfs-partitioner-whitelist">systems.*.partitioner.defaultPartitioner.whitelist</td>
<td class="default">.*</td>
<td class="description">White list used by directory partitioner to select files in a hdfs directory, in Java Pattern style.</td>
</tr>
<tr>
<td class="property" id="hdfs-partitioner-blacklist">systems.*.partitioner.defaultPartitioner.blacklist</td>
<td class="default"></td>
<td class="description">Black list used by directory partitioner to filter out unwanted files in a hdfs directory, in Java Pattern style.</td>
</tr>
<tr>
<td class="property" id="hdfs-partitioner-group-pattern">systems.*.partitioner.defaultPartitioner.groupPattern</td>
<td class="default"></td>
<td class="description">Group pattern used by directory partitioner for advanced partitioning. The advanced partitioning goes beyond the basic assumption that each file is a partition. With advanced partitioning you can group files into partitions arbitrarily. For example, if you have a set of files as [part-01-a.avro, part-01-b.avro, part-02-a.avro, part-02-b.avro, part-03-a.avro], and you want to organize the partitions as (part-01-a.avro, part-01-b.avro), (part-02-a.avro, part-02-b.avro), (part-03-a.avro), where the numbers in the middle act as a "group identifier", you can then set this property to be "part-[id]-.*" (note that "[id]" is a reserved term here, i.e. you have to literally put it as "[id]"). The partitioner will apply this pattern to all file names and extract the "group identifier" ("[id]" in the pattern), then use the "group identifier" to group files into partitions. See more details in <a href="https://issues.apache.org/jira/secure/attachment/12827670/HDFSSystemConsumer.pdf">HdfsSystemConsumer design doc</a> </td>
</tr>
<tr>
<td class="property" id="hdfs-consumer-reader-type">systems.*.consumer.reader</td>
<td class="default">avro</td>
<td class="description">Type of the file reader for different event formats (avro, plain, json, etc.). "avro" is only type supported for now.</td>
</tr>
<tr>
<td class="property" id="hdfs-staging-directory">systems.*.stagingDirectory</td>
<td class="default"></td>
<td class="description">Staging directory for storing partition description. By default (if not set by users) the value is inherited from "yarn.job.staging.directory" internally. The default value is typically good enough unless you want explicitly use a separate location.</td>
</tr>

<tr>
<th colspan="3" class="section" id="task-migration">
Migrating from Samza 0.9.1 to 0.10.0<br>
Expand Down
4 changes: 2 additions & 2 deletions docs/learn/tutorials/versioned/deploy-samza-to-CDH.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ The tutorial assumes you have successfully run [hello-samza](../../../startup/he
We need to use a specific compile option to build hello-samza package for CDH 5.4.0

{% highlight bash %}
mvn clean package -Denv=cdh5.4.0
mvn clean package -Dhadoop.version=cdh5.4.0
{% endhighlight %}

### Upload Package to Cluster
Expand All @@ -37,7 +37,7 @@ There are a few ways of uploading the package to the cluster's HDFS. If you do n
hadoop fs -put path/to/hello-samza-0.11.0-dist.tar.gz /path/for/tgz
{% endhighlight %}

### Get Deloying Scripts
### Get Deploying Scripts

Untar the job package (assume you will run from the current directory)

Expand Down
2 changes: 1 addition & 1 deletion gradle.properties
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
# specific language governing permissions and limitations
# under the License.
group=org.apache.samza
version=0.11.1-SNAPSHOT
version=0.12.0
scalaVersion=2.11

gradleVersion=2.8
Expand Down
2 changes: 1 addition & 1 deletion gradle/dependency-versions.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
metricsVersion = "2.2.0"
kafkaVersion = "0.10.0.1"
commonsHttpClientVersion = "3.1"
rocksdbVersion = "3.13.1"
rocksdbVersion = "5.0.1"
yarnVersion = "2.6.1"
slf4jVersion = "1.6.2"
log4jVersion = "1.2.17"
Expand Down
Loading

0 comments on commit 9d68ba6

Please sign in to comment.