diff --git a/crates/integration_tests/testdata/docker-compose.yaml b/crates/integration_tests/testdata/docker-compose.yaml index 490f4eb94..c56480509 100644 --- a/crates/integration_tests/testdata/docker-compose.yaml +++ b/crates/integration_tests/testdata/docker-compose.yaml @@ -70,3 +70,18 @@ services: /bin/sh -c " until (/usr/bin/mc config host add minio http://minio:9000 admin password) do echo '...waiting...' && sleep 1; done; /usr/bin/mc rm -r --force minio/icebergdata; /usr/bin/mc mb minio/icebergdata; /usr/bin/mc policy set public minio/icebergdata; tail -f /dev/null " networks: rest_bridge: + + spark-iceberg: + build: spark/ + networks: + rest_bridge: + depends_on: + - rest + - minio + environment: + - AWS_ACCESS_KEY_ID=admin + - AWS_SECRET_ACCESS_KEY=password + - AWS_REGION=us-east-1 + links: + - rest:rest + - minio:minio diff --git a/crates/integration_tests/testdata/spark/Dockerfile b/crates/integration_tests/testdata/spark/Dockerfile new file mode 100644 index 000000000..0eb6244a2 --- /dev/null +++ b/crates/integration_tests/testdata/spark/Dockerfile @@ -0,0 +1,59 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +FROM python:3.9-bullseye + +RUN apt-get -qq update && \ + apt-get -qq install -y --no-install-recommends sudo curl openjdk-11-jdk && \ + apt-get -qq clean && \ + rm -rf /var/lib/apt/lists/* + +ENV SPARK_HOME=${SPARK_HOME:-"/opt/spark"} +ENV HADOOP_HOME=${HADOOP_HOME:-"/opt/hadoop"} +ENV PYTHONPATH=$SPARK_HOME/python:$SPARK_HOME/python/lib/py4j-0.10.9.7-src.zip:$PYTHONPATH + +RUN mkdir -p ${HADOOP_HOME} && mkdir -p ${SPARK_HOME} && mkdir -p /home/iceberg/spark-events +WORKDIR ${SPARK_HOME} + +ENV SPARK_VERSION=3.5.3 +ENV ICEBERG_SPARK_RUNTIME_VERSION=3.5_2.12 +ENV ICEBERG_VERSION=1.6.0 + +RUN curl --retry 5 -s -C - https://archive.apache.org/dist/spark/spark-${SPARK_VERSION}/spark-${SPARK_VERSION}-bin-hadoop3.tgz -o spark-${SPARK_VERSION}-bin-hadoop3.tgz \ + && tar xzf spark-${SPARK_VERSION}-bin-hadoop3.tgz --directory /opt/spark --strip-components 1 \ + && rm -rf spark-${SPARK_VERSION}-bin-hadoop3.tgz + +# Download iceberg spark runtime +RUN curl --retry 5 -s https://repo1.maven.org/maven2/org/apache/iceberg/iceberg-spark-runtime-${ICEBERG_SPARK_RUNTIME_VERSION}/${ICEBERG_VERSION}/iceberg-spark-runtime-${ICEBERG_SPARK_RUNTIME_VERSION}-${ICEBERG_VERSION}.jar -Lo iceberg-spark-runtime-${ICEBERG_SPARK_RUNTIME_VERSION}-${ICEBERG_VERSION}.jar \ + && mv iceberg-spark-runtime-${ICEBERG_SPARK_RUNTIME_VERSION}-${ICEBERG_VERSION}.jar /opt/spark/jars + +# Download AWS bundle +RUN curl --retry 5 -s https://repo1.maven.org/maven2/org/apache/iceberg/iceberg-aws-bundle/${ICEBERG_VERSION}/iceberg-aws-bundle-${ICEBERG_VERSION}.jar -Lo /opt/spark/jars/iceberg-aws-bundle-${ICEBERG_VERSION}.jar + +COPY spark-defaults.conf /opt/spark/conf +ENV PATH="/opt/spark/sbin:/opt/spark/bin:${PATH}" + +RUN chmod u+x /opt/spark/sbin/* && \ + chmod u+x /opt/spark/bin/* + +WORKDIR '/home/' + +COPY entrypoint.sh . +COPY provision.py . + +HEALTHCHECK --retries=120 --interval=1s \ + CMD ls /tmp/ready || exit 1 + +ENTRYPOINT ["./entrypoint.sh"] diff --git a/crates/integration_tests/testdata/spark/entrypoint.sh b/crates/integration_tests/testdata/spark/entrypoint.sh new file mode 100755 index 000000000..abbcc9332 --- /dev/null +++ b/crates/integration_tests/testdata/spark/entrypoint.sh @@ -0,0 +1,29 @@ +#!/bin/bash +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +start-master.sh -p 7077 +start-worker.sh spark://spark-iceberg:7077 +start-history-server.sh + +python3 ./provision.py + +touch /tmp/ready + +tail -f /dev/null diff --git a/crates/integration_tests/testdata/spark/provision.py b/crates/integration_tests/testdata/spark/provision.py new file mode 100755 index 000000000..6dd74eab3 --- /dev/null +++ b/crates/integration_tests/testdata/spark/provision.py @@ -0,0 +1,99 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from pyspark.sql import SparkSession +from pyspark.sql.functions import current_date, date_add, expr + +spark = SparkSession.builder.getOrCreate() + +spark.sql( + f""" +CREATE OR REPLACE TABLE rest.default.test_positional_merge_on_read_deletes ( + dt date, + number integer, + letter string +) +USING iceberg +TBLPROPERTIES ( + 'write.delete.mode'='merge-on-read', + 'write.update.mode'='merge-on-read', + 'write.merge.mode'='merge-on-read', + 'format-version'='2' +); +""" +) + +spark.sql( + f""" +INSERT INTO rest.default.test_positional_merge_on_read_deletes +VALUES + (CAST('2023-03-01' AS date), 1, 'a'), + (CAST('2023-03-02' AS date), 2, 'b'), + (CAST('2023-03-03' AS date), 3, 'c'), + (CAST('2023-03-04' AS date), 4, 'd'), + (CAST('2023-03-05' AS date), 5, 'e'), + (CAST('2023-03-06' AS date), 6, 'f'), + (CAST('2023-03-07' AS date), 7, 'g'), + (CAST('2023-03-08' AS date), 8, 'h'), + (CAST('2023-03-09' AS date), 9, 'i'), + (CAST('2023-03-10' AS date), 10, 'j'), + (CAST('2023-03-11' AS date), 11, 'k'), + (CAST('2023-03-12' AS date), 12, 'l'); +""" +) + +spark.sql(f"DELETE FROM rest.default.test_positional_merge_on_read_deletes WHERE number = 9") + +spark.sql( + f""" + CREATE OR REPLACE TABLE rest.default.test_positional_merge_on_read_double_deletes ( + dt date, + number integer, + letter string + ) + USING iceberg + TBLPROPERTIES ( + 'write.delete.mode'='merge-on-read', + 'write.update.mode'='merge-on-read', + 'write.merge.mode'='merge-on-read', + 'format-version'='2' + ); +""" +) + +spark.sql( + f""" +INSERT INTO rest.default.test_positional_merge_on_read_double_deletes +VALUES + (CAST('2023-03-01' AS date), 1, 'a'), + (CAST('2023-03-02' AS date), 2, 'b'), + (CAST('2023-03-03' AS date), 3, 'c'), + (CAST('2023-03-04' AS date), 4, 'd'), + (CAST('2023-03-05' AS date), 5, 'e'), + (CAST('2023-03-06' AS date), 6, 'f'), + (CAST('2023-03-07' AS date), 7, 'g'), + (CAST('2023-03-08' AS date), 8, 'h'), + (CAST('2023-03-09' AS date), 9, 'i'), + (CAST('2023-03-10' AS date), 10, 'j'), + (CAST('2023-03-11' AS date), 11, 'k'), + (CAST('2023-03-12' AS date), 12, 'l'); +""" +) + +# Creates two positional deletes that should be merged +spark.sql(f"DELETE FROM rest.default.test_positional_merge_on_read_double_deletes WHERE number = 9") +spark.sql(f"DELETE FROM rest.default.test_positional_merge_on_read_double_deletes WHERE letter == 'f'") diff --git a/crates/integration_tests/testdata/spark/spark-defaults.conf b/crates/integration_tests/testdata/spark/spark-defaults.conf new file mode 100644 index 000000000..9db0c7da9 --- /dev/null +++ b/crates/integration_tests/testdata/spark/spark-defaults.conf @@ -0,0 +1,29 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +spark.sql.extensions org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions +spark.sql.catalog.rest org.apache.iceberg.spark.SparkCatalog +spark.sql.catalog.rest.type rest +spark.sql.catalog.rest.uri http://rest:8181 +spark.sql.catalog.rest.io-impl org.apache.iceberg.aws.s3.S3FileIO +spark.sql.catalog.rest.warehouse s3://warehouse/rest/ +spark.sql.catalog.rest.s3.endpoint http://minio:9000 +spark.sql.defaultCatalog rest +spark.eventLog.enabled true +spark.eventLog.dir /home/iceberg/spark-events +spark.history.fs.logDirectory /home/iceberg/spark-events +spark.sql.catalogImplementation in-memory diff --git a/crates/integration_tests/tests/read_positional_deletes.rs b/crates/integration_tests/tests/read_positional_deletes.rs new file mode 100644 index 000000000..ebd5ea67f --- /dev/null +++ b/crates/integration_tests/tests/read_positional_deletes.rs @@ -0,0 +1,39 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Integration tests for rest catalog. + +use iceberg::{Catalog, TableIdent}; +use iceberg_integration_tests::set_test_fixture; + +#[tokio::test] +async fn test_read_table_with_positional_deletes() { + let fixture = set_test_fixture("read_table_with_positional_deletes").await; + + let catalog = fixture.rest_catalog; + + let table = catalog + .load_table( + &TableIdent::from_strs(["default", "test_positional_merge_on_read_double_deletes"]) + .unwrap(), + ) + .await + .unwrap(); + + // 😱 If we don't support positional deletes, we should not be able to plan them + println!("{:?}", table.scan().build().unwrap()); +}