Skip to content

Commit

Permalink
Add Spark to the integration test (apache#766)
Browse files Browse the repository at this point in the history
  • Loading branch information
Fokko authored Dec 9, 2024
1 parent 994ca96 commit acd52ee
Show file tree
Hide file tree
Showing 6 changed files with 270 additions and 0 deletions.
15 changes: 15 additions & 0 deletions crates/integration_tests/testdata/docker-compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -70,3 +70,18 @@ services:
/bin/sh -c " until (/usr/bin/mc config host add minio http://minio:9000 admin password) do echo '...waiting...' && sleep 1; done; /usr/bin/mc rm -r --force minio/icebergdata; /usr/bin/mc mb minio/icebergdata; /usr/bin/mc policy set public minio/icebergdata; tail -f /dev/null "
networks:
rest_bridge:

spark-iceberg:
build: spark/
networks:
rest_bridge:
depends_on:
- rest
- minio
environment:
- AWS_ACCESS_KEY_ID=admin
- AWS_SECRET_ACCESS_KEY=password
- AWS_REGION=us-east-1
links:
- rest:rest
- minio:minio
59 changes: 59 additions & 0 deletions crates/integration_tests/testdata/spark/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

FROM python:3.9-bullseye

RUN apt-get -qq update && \
apt-get -qq install -y --no-install-recommends sudo curl openjdk-11-jdk && \
apt-get -qq clean && \
rm -rf /var/lib/apt/lists/*

ENV SPARK_HOME=${SPARK_HOME:-"/opt/spark"}
ENV HADOOP_HOME=${HADOOP_HOME:-"/opt/hadoop"}
ENV PYTHONPATH=$SPARK_HOME/python:$SPARK_HOME/python/lib/py4j-0.10.9.7-src.zip:$PYTHONPATH

RUN mkdir -p ${HADOOP_HOME} && mkdir -p ${SPARK_HOME} && mkdir -p /home/iceberg/spark-events
WORKDIR ${SPARK_HOME}

ENV SPARK_VERSION=3.5.3
ENV ICEBERG_SPARK_RUNTIME_VERSION=3.5_2.12
ENV ICEBERG_VERSION=1.6.0

RUN curl --retry 5 -s -C - https://archive.apache.org/dist/spark/spark-${SPARK_VERSION}/spark-${SPARK_VERSION}-bin-hadoop3.tgz -o spark-${SPARK_VERSION}-bin-hadoop3.tgz \
&& tar xzf spark-${SPARK_VERSION}-bin-hadoop3.tgz --directory /opt/spark --strip-components 1 \
&& rm -rf spark-${SPARK_VERSION}-bin-hadoop3.tgz

# Download iceberg spark runtime
RUN curl --retry 5 -s https://repo1.maven.org/maven2/org/apache/iceberg/iceberg-spark-runtime-${ICEBERG_SPARK_RUNTIME_VERSION}/${ICEBERG_VERSION}/iceberg-spark-runtime-${ICEBERG_SPARK_RUNTIME_VERSION}-${ICEBERG_VERSION}.jar -Lo iceberg-spark-runtime-${ICEBERG_SPARK_RUNTIME_VERSION}-${ICEBERG_VERSION}.jar \
&& mv iceberg-spark-runtime-${ICEBERG_SPARK_RUNTIME_VERSION}-${ICEBERG_VERSION}.jar /opt/spark/jars

# Download AWS bundle
RUN curl --retry 5 -s https://repo1.maven.org/maven2/org/apache/iceberg/iceberg-aws-bundle/${ICEBERG_VERSION}/iceberg-aws-bundle-${ICEBERG_VERSION}.jar -Lo /opt/spark/jars/iceberg-aws-bundle-${ICEBERG_VERSION}.jar

COPY spark-defaults.conf /opt/spark/conf
ENV PATH="/opt/spark/sbin:/opt/spark/bin:${PATH}"

RUN chmod u+x /opt/spark/sbin/* && \
chmod u+x /opt/spark/bin/*

WORKDIR '/home/'

COPY entrypoint.sh .
COPY provision.py .

HEALTHCHECK --retries=120 --interval=1s \
CMD ls /tmp/ready || exit 1

ENTRYPOINT ["./entrypoint.sh"]
29 changes: 29 additions & 0 deletions crates/integration_tests/testdata/spark/entrypoint.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
#!/bin/bash
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#

start-master.sh -p 7077
start-worker.sh spark://spark-iceberg:7077
start-history-server.sh

python3 ./provision.py

touch /tmp/ready

tail -f /dev/null
99 changes: 99 additions & 0 deletions crates/integration_tests/testdata/spark/provision.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

from pyspark.sql import SparkSession
from pyspark.sql.functions import current_date, date_add, expr

spark = SparkSession.builder.getOrCreate()

spark.sql(
f"""
CREATE OR REPLACE TABLE rest.default.test_positional_merge_on_read_deletes (
dt date,
number integer,
letter string
)
USING iceberg
TBLPROPERTIES (
'write.delete.mode'='merge-on-read',
'write.update.mode'='merge-on-read',
'write.merge.mode'='merge-on-read',
'format-version'='2'
);
"""
)

spark.sql(
f"""
INSERT INTO rest.default.test_positional_merge_on_read_deletes
VALUES
(CAST('2023-03-01' AS date), 1, 'a'),
(CAST('2023-03-02' AS date), 2, 'b'),
(CAST('2023-03-03' AS date), 3, 'c'),
(CAST('2023-03-04' AS date), 4, 'd'),
(CAST('2023-03-05' AS date), 5, 'e'),
(CAST('2023-03-06' AS date), 6, 'f'),
(CAST('2023-03-07' AS date), 7, 'g'),
(CAST('2023-03-08' AS date), 8, 'h'),
(CAST('2023-03-09' AS date), 9, 'i'),
(CAST('2023-03-10' AS date), 10, 'j'),
(CAST('2023-03-11' AS date), 11, 'k'),
(CAST('2023-03-12' AS date), 12, 'l');
"""
)

spark.sql(f"DELETE FROM rest.default.test_positional_merge_on_read_deletes WHERE number = 9")

spark.sql(
f"""
CREATE OR REPLACE TABLE rest.default.test_positional_merge_on_read_double_deletes (
dt date,
number integer,
letter string
)
USING iceberg
TBLPROPERTIES (
'write.delete.mode'='merge-on-read',
'write.update.mode'='merge-on-read',
'write.merge.mode'='merge-on-read',
'format-version'='2'
);
"""
)

spark.sql(
f"""
INSERT INTO rest.default.test_positional_merge_on_read_double_deletes
VALUES
(CAST('2023-03-01' AS date), 1, 'a'),
(CAST('2023-03-02' AS date), 2, 'b'),
(CAST('2023-03-03' AS date), 3, 'c'),
(CAST('2023-03-04' AS date), 4, 'd'),
(CAST('2023-03-05' AS date), 5, 'e'),
(CAST('2023-03-06' AS date), 6, 'f'),
(CAST('2023-03-07' AS date), 7, 'g'),
(CAST('2023-03-08' AS date), 8, 'h'),
(CAST('2023-03-09' AS date), 9, 'i'),
(CAST('2023-03-10' AS date), 10, 'j'),
(CAST('2023-03-11' AS date), 11, 'k'),
(CAST('2023-03-12' AS date), 12, 'l');
"""
)

# Creates two positional deletes that should be merged
spark.sql(f"DELETE FROM rest.default.test_positional_merge_on_read_double_deletes WHERE number = 9")
spark.sql(f"DELETE FROM rest.default.test_positional_merge_on_read_double_deletes WHERE letter == 'f'")
29 changes: 29 additions & 0 deletions crates/integration_tests/testdata/spark/spark-defaults.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

spark.sql.extensions org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions
spark.sql.catalog.rest org.apache.iceberg.spark.SparkCatalog
spark.sql.catalog.rest.type rest
spark.sql.catalog.rest.uri http://rest:8181
spark.sql.catalog.rest.io-impl org.apache.iceberg.aws.s3.S3FileIO
spark.sql.catalog.rest.warehouse s3://warehouse/rest/
spark.sql.catalog.rest.s3.endpoint http://minio:9000
spark.sql.defaultCatalog rest
spark.eventLog.enabled true
spark.eventLog.dir /home/iceberg/spark-events
spark.history.fs.logDirectory /home/iceberg/spark-events
spark.sql.catalogImplementation in-memory
39 changes: 39 additions & 0 deletions crates/integration_tests/tests/read_positional_deletes.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

//! Integration tests for rest catalog.
use iceberg::{Catalog, TableIdent};
use iceberg_integration_tests::set_test_fixture;

#[tokio::test]
async fn test_read_table_with_positional_deletes() {
let fixture = set_test_fixture("read_table_with_positional_deletes").await;

let catalog = fixture.rest_catalog;

let table = catalog
.load_table(
&TableIdent::from_strs(["default", "test_positional_merge_on_read_double_deletes"])
.unwrap(),
)
.await
.unwrap();

// 😱 If we don't support positional deletes, we should not be able to plan them
println!("{:?}", table.scan().build().unwrap());
}

0 comments on commit acd52ee

Please sign in to comment.