Flock is a cloud-native streaming query engine that leverages the on-demand elasticity of Function-as-a-Service (FaaS) platforms to perform real-time data analytics. Traditional server-centric deployments often suffer from resource under- or over-provisioning, leading to resource wastage or performance degradation. Flock addresses these issues by providing more fine-grained elasticity that can dynamically match the per-query basis with continuous scaling, and its billing methods are more fine-grained with millisecond granularity, making it a low-cost solution for stream processing. Our approach, payload invocation, eliminates the need for external storage services and eliminates the requirement for a query coordinator in the data architecture. Our evaluation shows that Flock significantly outperforms state-of-the-art systems in terms of cost, especially on ARM processors, making it a promising solution for real-time data analytics on FaaS platforms.
The generic lambda function code is built in advance and uploaded to AWS S3.
FaaS Service | AWS Lambda | GCP Functions | Azure Functions | Architectures | SIMD | YSB | NEXMark |
---|---|---|---|---|---|---|---|
Flock | 🏅🏅🏅🏅 | 👉 TBD | 👉 TBD | Arm, x86 | ✅ | ✅ | ✅ |
@misc{gang2023flock,
title={Flock: A Low-Cost Streaming Query Engine on FaaS Platforms},
author={Gang Liao and Amol Deshpande and Daniel J. Abadi},
year={2023},
eprint={2312.16735},
archivePrefix={arXiv},
primaryClass={cs.DB}
}
You can enable the features simd
(to use SIMD instructions) and/or mimalloc
or snmalloc
(to use either the mimalloc or snmalloc allocator) as features by passing them in as --features:
To build and deploy Flock to AWS Lambda in one step, you can use the following command:
$ ./configure -c -a x86_64
Output
============================================================
Compiling and Deploying Benchmarks
============================================================
Building x86_64-unknown-linux-gnu
[1/3] Compiling Flock Lambda Function...
[2/3] Compiling Flock CLI...
[3/3] Deploying Flock Lambda Function...
============================================================
Upload function code to S3
============================================================
Packaging code and uploading to S3...
[OK] Upload Succeed.
============================================================
If you prefer to use the cargo
command to build and deploy Flock, you can use the following commands:
Commands
-
Build Flock for x86_64
$ cargo +nightly build --target x86_64-unknown-linux-gnu --release --features "simd mimalloc"
-
Deploy Flock binary to AWS S3
$ cd ./target/x86_64-unknown-linux-gnu/release $ ./flock-cli s3 put --path ./flock --key flock_x86_64
These are multiple queries over a three entities model representing on online auction system:
- Person represents a person submitting an item for auction and/or making a bid on an auction.
- Auction represents an item under auction.
- Bid represents a bid for an item under auction.
Query | Name | Summary | Flock |
---|---|---|---|
q0 | Pass Through | Measures the monitoring overhead including the source generator. | ✅ |
q1 | Currency Conversion | Convert each bid value from dollars to euros. | ✅ |
q2 | Selection | Find bids with specific auction ids and show their bid price. | ✅ |
q3 | Local Item Suggestion | Who is selling in OR, ID or CA in category 10, and for what auction ids? | ✅ |
q4 | Average Price for a Category | Select the average of the wining bid prices for all auctions in each category. | ✅ |
q5 | Hot Items | Which auctions have seen the most bids in the last period? | ✅ |
q6 | Average Selling Price by Seller | What is the average selling price per seller for their last 10 closed auctions. | ✅ |
q7 | Highest Bid | Select the bids with the highest bid price in the last period. | ✅ |
q8 | Monitor New Users | Select people who have entered the system and created auctions in the last period. | ✅ |
q9 | Winning Bids | Find the winning bid for each auction. | ✅ |
q10 | Log to File System | Log all events to AWS S3, SQS, and DynamoDB. Illustrates windows streaming data into partitioned file system. | ✅ |
q11 | User Sessions | How many bids did a user make in each session they were active? Illustrates session windows. | ✅ |
q12 | Processing Time Windows | How many bids does a user make within a fixed processing time limit? Illustrates working in processing time window. | ✅ |
q13 | Bounded Side Input Join | Joins a stream to a bounded side input, modeling basic stream enrichment. | ✅ |
Note: q1 ~ q8 are from original NEXMark queries, q0 and q9 ~ q13 are from Apache Beam.
flock-cli
is a command line interface to Flock. It can be used to run queries or benchmarks against Flock. For example, to run the query 5 on x86_64 for 10 seconds with 1,000 events per second, and 1 generator, you can run:
$ ./flock-cli nexmark run -q 5 -s 10 -e 1000 -g 1 --arch x86_64
Client Output
/`
/:y/` `
`shdhso.
-yhddh+.
.yhhhy+-
.syyhs+/.
`+shhs++:.
`:syyyo++/.
.+ssys+++/-` `.----.`
./oyyyo+++/:.` `-/+++/-..`
-/osyso++++/:.` -/++/-`
.-/osssoo++++/:++++`
`.-/++osooo++++++++-
`-:/+oooo++++++o/
`-:/+o++++++oo- ````` `
`.-//++++++o/ `:++::::// .:++:` .:///////. .://///+- ./++:` .++/.
``..:+++++o+` os` -+ ss `/+-`//. `-+/` `+s:` `o: `so `:+-
:+++++/` os` -- ss /o` `+o` `++ +s: `` `so .+:`
`:+++++: os:::o/ ss o/ /+++` :s ss. `ss/so`
.:++++:.` os` -- ss ` /o``+/``o/:`++ +s: ` `so .oo.
`.:/++++/. os` ss :+ /+:-` .-:+/` `+s:` o/ `so `+s:
.-----:/++- `:++:- .:++::::+/ .:++//++:. ./++///+- .:o+:` :o/:
`.-:::-/:` ``
`--.``-/:`
.:-`
Flock: A Practical Serverless Streaming SQL Query Engine (https://github.com/flock-lab/flock)
Copyright (c) 2020-present, UMD Data System Group.
▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒
This program is free software: you can use, redistribute, and/or modify it under the terms of the GNU Affero
General Public License, version 3 or later ("AGPL"), as published by the Free Software Foundation.
This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the
implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
You should have received a copy of the GNU Affero General Public License along with this program. If not, see
<http://www.gnu.org/licenses/>.
▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒
================================================================
Running the benchmark
================================================================
[info] Running the NEXMark benchmark with the following options:
NexmarkBenchmarkOpt {
query_number: 5,
generators: 1,
seconds: 10,
events_per_second: 1000,
data_sink_type: "blackhole",
async_type: false,
memory_size: 128,
architecture: "x86_64",
}
[info] Creating lambda function: flock_datasource
[info] Creating lambda function group: Group(("q5-00", 8))
[info] Creating function member: q5-00-00
[info] Creating function member: q5-00-01
[info] Creating function member: q5-00-02
[info] Creating function member: q5-00-03
[info] Creating function member: q5-00-04
[info] Creating function member: q5-00-05
[info] Creating function member: q5-00-06
[info] Creating function member: q5-00-07
[info] [OK] Invoking NEXMark source function: flock_datasource by generator 0
[info] [OK] Received status from function. InvocationResponse { executed_version: None, function_error: None, log_result: None, payload: Some(b""), status_code: Some(202) }
[info] Waiting for the current invocations to be logged.
[2021-12-16T19:09:35Z INFO driver::logwatch::tail] Sending log request FilterLogEventsRequest { end_time: None, filter_pattern: None, limit: Some(100), log_group_name: "/aws/lambda/flock_datasource", log_stream_name_prefix: None, log_stream_names: None, next_token: None, start_time: Some(1639681715344) }
[2021-12-16T19:09:38Z INFO driver::logwatch::tail] [OK] Got response from AWS CloudWatch Logs.
2021-12-16 14:09:18 START RequestId: fd8ae2ad-b64a-4e02-88c8-c43f00974022 Version: $LATEST
2021-12-16 14:09:18 [2021-12-16T19:09:18Z INFO runtime::datasource::nexmark::nexmark] Generating events for 10s over 1 partitions.
2021-12-16 14:09:18 [2021-12-16T19:09:18Z INFO flock::nexmark::source] Nexmark Benchmark: Query 5
2021-12-16 14:09:18 [2021-12-16T19:09:18Z INFO flock::nexmark::source] NEXMarkSource { config: Config { args: {"threads": "1", "events-per-second": "1000", "seconds": "10"} }, window: Hopping((10, 5)) }
2021-12-16 14:09:18 [2021-12-16T19:09:18Z INFO flock::nexmark::source] [OK] Generate nexmark events.
2021-12-16 14:09:18 [2021-12-16T19:09:18Z INFO runtime::datasource::nexmark::nexmark] Epoch 0: 20 persons, 60 auctions, 920 bids.
2021-12-16 14:09:18 [2021-12-16T19:09:18Z INFO runtime::datasource::nexmark::nexmark] Epoch 1: 20 persons, 60 auctions, 920 bids.
2021-12-16 14:09:18 [2021-12-16T19:09:18Z INFO runtime::datasource::nexmark::nexmark] Epoch 2: 20 persons, 60 auctions, 920 bids.
2021-12-16 14:09:18 [2021-12-16T19:09:18Z INFO runtime::datasource::nexmark::nexmark] Epoch 3: 20 persons, 60 auctions, 920 bids.
2021-12-16 14:09:18 [2021-12-16T19:09:18Z INFO runtime::datasource::nexmark::nexmark] Epoch 4: 20 persons, 60 auctions, 920 bids.
2021-12-16 14:09:18 [2021-12-16T19:09:18Z INFO runtime::datasource::nexmark::nexmark] Epoch 5: 20 persons, 60 auctions, 920 bids.
2021-12-16 14:09:19 [2021-12-16T19:09:19Z INFO runtime::datasource::nexmark::nexmark] Epoch 6: 20 persons, 60 auctions, 920 bids.
2021-12-16 14:09:19 [2021-12-16T19:09:19Z INFO runtime::datasource::nexmark::nexmark] Epoch 7: 20 persons, 60 auctions, 920 bids.
2021-12-16 14:09:19 [2021-12-16T19:09:19Z INFO runtime::datasource::nexmark::nexmark] Epoch 8: 20 persons, 60 auctions, 920 bids.
2021-12-16 14:09:19 [2021-12-16T19:09:19Z INFO runtime::datasource::nexmark::nexmark] Epoch 9: 20 persons, 60 auctions, 920 bids.
2021-12-16 14:09:19 [2021-12-16T19:09:19Z INFO flock::window] [OK] Send 10 NexMark events from a window (epoch: 0-10) to function: q5-00-04.
2021-12-16 14:09:19 [2021-12-16T19:09:19Z INFO flock::window] [OK] Event 0 - function payload bytes: 21078
2021-12-16 14:09:19 [2021-12-16T19:09:19Z INFO flock::window] [OK] Received status from async lambda function. InvocationResponse { executed_version: None, function_error: None, log_result: None, payload: Some(b""), status_code: Some(202) }
2021-12-16 14:09:19 [2021-12-16T19:09:19Z INFO flock::window] [OK] Event 1 - function payload bytes: 22419
2021-12-16 14:09:19 [2021-12-16T19:09:19Z INFO flock::window] [OK] Received status from async lambda function. InvocationResponse { executed_version: None, function_error: None, log_result: None, payload: Some(b""), status_code: Some(202) }
2021-12-16 14:09:19 [2021-12-16T19:09:19Z INFO flock::window] [OK] Event 2 - function payload bytes: 21598
2021-12-16 14:09:19 [2021-12-16T19:09:19Z INFO flock::window] [OK] Received status from async lambda function. InvocationResponse { executed_version: None, function_error: None, log_result: None, payload: Some(b""), status_code: Some(202) }
2021-12-16 14:09:19 [2021-12-16T19:09:19Z INFO flock::window] [OK] Event 3 - function payload bytes: 21997
2021-12-16 14:09:19 [2021-12-16T19:09:19Z INFO flock::window] [OK] Received status from async lambda function. InvocationResponse { executed_version: None, function_error: None, log_result: None, payload: Some(b""), status_code: Some(202) }
2021-12-16 14:09:19 [2021-12-16T19:09:19Z INFO flock::window] [OK] Event 4 - function payload bytes: 22364
2021-12-16 14:09:19 [2021-12-16T19:09:19Z INFO flock::window] [OK] Received status from async lambda function. InvocationResponse { executed_version: None, function_error: None, log_result: None, payload: Some(b""), status_code: Some(202) }
2021-12-16 14:09:19 [2021-12-16T19:09:19Z INFO flock::window] [OK] Event 5 - function payload bytes: 22185
2021-12-16 14:09:19 [2021-12-16T19:09:19Z INFO flock::window] [OK] Received status from async lambda function. InvocationResponse { executed_version: None, function_error: None, log_result: None, payload: Some(b""), status_code: Some(202) }
2021-12-16 14:09:19 [2021-12-16T19:09:19Z INFO flock::window] [OK] Event 6 - function payload bytes: 21767
2021-12-16 14:09:19 [2021-12-16T19:09:19Z INFO flock::window] [OK] Received status from async lambda function. InvocationResponse { executed_version: None, function_error: None, log_result: None, payload: Some(b""), status_code: Some(202) }
2021-12-16 14:09:19 [2021-12-16T19:09:19Z INFO flock::window] [OK] Event 7 - function payload bytes: 21436
2021-12-16 14:09:19 [2021-12-16T19:09:19Z INFO flock::window] [OK] Received status from async lambda function. InvocationResponse { executed_version: None, function_error: None, log_result: None, payload: Some(b""), status_code: Some(202) }
2021-12-16 14:09:19 [2021-12-16T19:09:19Z INFO flock::window] [OK] Event 8 - function payload bytes: 21806
2021-12-16 14:09:19 [2021-12-16T19:09:19Z INFO flock::window] [OK] Received status from async lambda function. InvocationResponse { executed_version: None, function_error: None, log_result: None, payload: Some(b""), status_code: Some(202) }
2021-12-16 14:09:19 [2021-12-16T19:09:19Z INFO flock::window] [OK] Event 9 - function payload bytes: 21829
2021-12-16 14:09:19 [2021-12-16T19:09:19Z INFO flock::window] [OK] Received status from async lambda function. InvocationResponse { executed_version: None, function_error: None, log_result: None, payload: Some(b""), status_code: Some(202) }
2021-12-16 14:09:19 END RequestId: fd8ae2ad-b64a-4e02-88c8-c43f00974022
2021-12-16 14:09:19 REPORT RequestId: fd8ae2ad-b64a-4e02-88c8-c43f00974022 Duration: 1177.85 ms Billed Duration: 1189 ms Memory Size: 128 MB Max Memory Used: 22 MB Init Duration: 10.88 ms
[2021-12-16T19:09:38Z INFO driver::deploy::common] Got a Token response
-------------------------------------------------------------
[OK] Nexmark Benchmark Complete
Function Output
TART RequestId: 78a68707-3f3d-4244-a51a-584f9432709d Version: $LATEST
[2021-12-15T15:20:56Z INFO nexmark_lambda::actor] Receiving a data packet: Uuid { tid: "q5-1639581654", seq_num: 0, seq_len: 10 }
[2021-12-15T15:20:56Z INFO nexmark_lambda::actor] Window data collection has not been completed.
END RequestId: 78a68707-3f3d-4244-a51a-584f9432709d
REPORT RequestId: 78a68707-3f3d-4244-a51a-584f9432709d Duration: 38.83 ms Billed Duration: 66 ms Memory Size: 128 MB Max Memory Used: 17 MB Init Duration: 26.32 ms
START RequestId: 23dae113-ccf3-449f-944f-116bb925daaf Version: $LATEST
[2021-12-15T15:20:56Z INFO nexmark_lambda::actor] Receiving a data packet: Uuid { tid: "q5-1639581654", seq_num: 5, seq_len: 10 }
[2021-12-15T15:20:56Z INFO nexmark_lambda::actor] Window data collection has not been completed.
END RequestId: 23dae113-ccf3-449f-944f-116bb925daaf
REPORT RequestId: 23dae113-ccf3-449f-944f-116bb925daaf Duration: 1.71 ms Billed Duration: 2 ms Memory Size: 128 MB Max Memory Used: 17 MB
START RequestId: e5e51594-5819-494c-a6d3-c9c9ed9ab865 Version: $LATEST
[2021-12-15T15:20:56Z INFO nexmark_lambda::actor] Receiving a data packet: Uuid { tid: "q5-1639581654", seq_num: 6, seq_len: 10 }
[2021-12-15T15:20:56Z INFO nexmark_lambda::actor] Window data collection has not been completed.
END RequestId: e5e51594-5819-494c-a6d3-c9c9ed9ab865
REPORT RequestId: e5e51594-5819-494c-a6d3-c9c9ed9ab865 Duration: 1.30 ms Billed Duration: 2 ms Memory Size: 128 MB Max Memory Used: 18 MB
START RequestId: def2fc0b-61da-49f8-80b4-9e49f5f4a091 Version: $LATEST
[2021-12-15T15:20:56Z INFO nexmark_lambda::actor] Receiving a data packet: Uuid { tid: "q5-1639581654", seq_num: 7, seq_len: 10 }
[2021-12-15T15:20:56Z INFO nexmark_lambda::actor] Window data collection has not been completed.
END RequestId: def2fc0b-61da-49f8-80b4-9e49f5f4a091
REPORT RequestId: def2fc0b-61da-49f8-80b4-9e49f5f4a091 Duration: 6.89 ms Billed Duration: 7 ms Memory Size: 128 MB Max Memory Used: 18 MB
START RequestId: a18c2e75-d1a4-4595-aa84-4cde90eecad4 Version: $LATEST
[2021-12-15T15:20:56Z INFO nexmark_lambda::actor] Receiving a data packet: Uuid { tid: "q5-1639581654", seq_num: 8, seq_len: 10 }
[2021-12-15T15:20:56Z INFO nexmark_lambda::actor] Window data collection has not been completed.
END RequestId: a18c2e75-d1a4-4595-aa84-4cde90eecad4
REPORT RequestId: a18c2e75-d1a4-4595-aa84-4cde90eecad4 Duration: 1.16 ms Billed Duration: 2 ms Memory Size: 128 MB Max Memory Used: 18 MB
START RequestId: 01168950-7558-4af6-9e8c-8f71c4542149 Version: $LATEST
[2021-12-15T15:20:56Z INFO nexmark_lambda::actor] Receiving a data packet: Uuid { tid: "q5-1639581654", seq_num: 9, seq_len: 10 }
[2021-12-15T15:20:56Z INFO nexmark_lambda::actor] Window data collection has not been completed.
END RequestId: 01168950-7558-4af6-9e8c-8f71c4542149
REPORT RequestId: 01168950-7558-4af6-9e8c-8f71c4542149 Duration: 8.22 ms Billed Duration: 9 ms Memory Size: 128 MB Max Memory Used: 18 MB
START RequestId: c28193bb-b818-4cf4-863c-2e9d34dd2398 Version: $LATEST
[2021-12-15T15:20:58Z INFO nexmark_lambda::actor] Receiving a data packet: Uuid { tid: "q5-1639581654", seq_num: 1, seq_len: 10 }
[2021-12-15T15:20:58Z INFO nexmark_lambda::actor] Window data collection has not been completed.
END RequestId: c28193bb-b818-4cf4-863c-2e9d34dd2398
REPORT RequestId: c28193bb-b818-4cf4-863c-2e9d34dd2398 Duration: 1.18 ms Billed Duration: 2 ms Memory Size: 128 MB Max Memory Used: 18 MB
START RequestId: 46e54b2e-91da-4609-9f54-f152d38681c7 Version: $LATEST
[2021-12-15T15:20:58Z INFO nexmark_lambda::actor] Receiving a data packet: Uuid { tid: "q5-1639581654", seq_num: 2, seq_len: 10 }
[2021-12-15T15:20:58Z INFO nexmark_lambda::actor] Window data collection has not been completed.
END RequestId: 46e54b2e-91da-4609-9f54-f152d38681c7
REPORT RequestId: 46e54b2e-91da-4609-9f54-f152d38681c7 Duration: 1.15 ms Billed Duration: 2 ms Memory Size: 128 MB Max Memory Used: 18 MB
START RequestId: 2b1c1fe0-9556-4849-8251-39ede796f0f0 Version: $LATEST
[2021-12-15T15:20:58Z INFO nexmark_lambda::actor] Receiving a data packet: Uuid { tid: "q5-1639581654", seq_num: 3, seq_len: 10 }
[2021-12-15T15:20:58Z INFO nexmark_lambda::actor] Window data collection has not been completed.
END RequestId: 2b1c1fe0-9556-4849-8251-39ede796f0f0
REPORT RequestId: 2b1c1fe0-9556-4849-8251-39ede796f0f0 Duration: 1.08 ms Billed Duration: 2 ms Memory Size: 128 MB Max Memory Used: 18 MB
START RequestId: 78c64a1a-b312-4099-b596-541c078b04b7 Version: $LATEST
[2021-12-15T15:20:58Z INFO nexmark_lambda::actor] Receiving a data packet: Uuid { tid: "q5-1639581654", seq_num: 4, seq_len: 10 }
[2021-12-15T15:20:58Z INFO nexmark_lambda::actor] Received all data packets for the window: "q5-1639581654"
[2021-12-15T15:20:58Z INFO nexmark_lambda::actor]
+---------+-----+
| auction | num |
+---------+-----+
| 1500 | 841 |
+---------+-----+
flock-cli
has a number of advanced features that can be used to control and customize the behavior of Flock.
For example, to delete all functions, you can use the flock-cli lambda -D
command. Or use the flock-cli lambda -d <function pattern>
command to delete specific functions. To list all functions, use the flock-cli lambda -L
command.
To see the help for the nexmark run
command, issue the command: flock-cli nexmark run -h
Runs the NEXMark Benchmark
USAGE:
flock-cli nexmark run [OPTIONS]
OPTIONS:
-a, --async-type
Runs the NEXMark benchmark with async function invocations
-e, --events-per-second <events per second>
Runs the NEXMark benchmark with a number of events per second [default: 1000]
-g, --generators <data generators>
Runs the NEXMark benchmark with a number of data generators [default: 1]
-h, --help
Print help information
--log-level <log-level>
Log level [default: info] [possible values: error, warn, info, debug, trace, off]
-m, --memory-size <memory size>
Sets the memory size (MB) for the worker function [default: 128]
-q, --query <query number>
Sets the NEXMark benchmark query number [default: 3] [possible values: 0, 1, 2, 3, 4, 5,
6, 7, 8, 9, 10, 11, 12, 13]
-r, --arch <architecture>
Sets the architecture for the worker function [default: x86_64] [possible values:
x86_64, arm64]
-s, --seconds <duration>
Runs the NEXMark benchmark for a number of seconds [default: 20]
--silent
Suppress all output
-t, --data-sink-type <data sink type>
Runs the NEXMark benchmark with a data sink type [default: blackhole] [possible values:
sqs, s3, dynamodb, efs, blackhole]
--trace
Log ultra-verbose (trace level) information
Copyright (c) 2020-present UMD Database Group. The library, examples, and all source code are released under AGPL-3.0 License.