Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Decode strings in python2 prior to doing pandas to pyspark conversion #673

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 7 additions & 7 deletions .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ version: 2

defaults: &defaults
docker:
- image: palantirtechnologies/circle-spark-base:0.1.3
- image: palantirtechnologies/circle-spark-base:0.2.2
resource_class: xlarge
environment: &defaults-environment
TERM: dumb
Expand Down Expand Up @@ -129,7 +129,7 @@ jobs:
<<: *defaults
# Some part of the maven setup fails if there's no R, so we need to use the R image here
docker:
- image: palantirtechnologies/circle-spark-r:0.1.3
- image: palantirtechnologies/circle-spark-r:0.2.2
steps:
# Saves us from recompiling every time...
- restore_cache:
Expand Down Expand Up @@ -296,7 +296,7 @@ jobs:
# depends on build-sbt, but we only need the assembly jars
<<: *defaults
docker:
- image: palantirtechnologies/circle-spark-python:0.1.3
- image: palantirtechnologies/circle-spark-python:0.2.2
parallelism: 2
steps:
- *checkout-code
Expand All @@ -321,7 +321,7 @@ jobs:
# depends on build-sbt, but we only need the assembly jars
<<: *defaults
docker:
- image: palantirtechnologies/circle-spark-r:0.1.3
- image: palantirtechnologies/circle-spark-r:0.2.2
steps:
- *checkout-code
- attach_workspace:
Expand Down Expand Up @@ -434,7 +434,7 @@ jobs:
<<: *defaults
# Some part of the maven setup fails if there's no R, so we need to use the R image here
docker:
- image: palantirtechnologies/circle-spark-r:0.1.3
- image: palantirtechnologies/circle-spark-r:0.2.2
steps:
- *checkout-code
- restore_cache:
Expand All @@ -454,7 +454,7 @@ jobs:
deploy-gradle:
<<: *defaults
docker:
- image: palantirtechnologies/circle-spark-r:0.1.3
- image: palantirtechnologies/circle-spark-r:0.2.2
steps:
- *checkout-code
- *restore-gradle-wrapper-cache
Expand All @@ -466,7 +466,7 @@ jobs:
<<: *defaults
# Some part of the maven setup fails if there's no R, so we need to use the R image here
docker:
- image: palantirtechnologies/circle-spark-r:0.1.3
- image: palantirtechnologies/circle-spark-r:0.2.2
steps:
# This cache contains the whole project after version was set and mvn package was called
# Restoring first (and instead of checkout) as mvn versions:set mutates real source code...
Expand Down
7 changes: 4 additions & 3 deletions dev/docker-images/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,10 @@

.PHONY: all publish base python r

BASE_IMAGE_NAME = palantirtechnologies/circle-spark-base:0.1.3
PYTHON_IMAGE_NAME = palantirtechnologies/circle-spark-python:0.1.3
R_IMAGE_NAME = palantirtechnologies/circle-spark-r:0.1.3
VERSION=0.2.2
BASE_IMAGE_NAME = "palantirtechnologies/circle-spark-base:${VERSION}"
PYTHON_IMAGE_NAME = "palantirtechnologies/circle-spark-python:${VERSION}"
R_IMAGE_NAME = "palantirtechnologies/circle-spark-r:${VERSION}"

all: base python r

Expand Down
16 changes: 10 additions & 6 deletions dev/docker-images/base/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
# limitations under the License.
#

FROM buildpack-deps:cosmic
FROM buildpack-deps:20.04

# make Apt non-interactive
RUN echo 'APT::Get::Assume-Yes "true";' > /etc/apt/apt.conf.d/90circleci \
Expand All @@ -28,12 +28,15 @@ ENV DEBIAN_FRONTEND=noninteractive
RUN mkdir -p /usr/share/man/man1 \
&& apt-get update \
&& apt-get install -y \
git \
git python2 \
locales sudo openssh-client ca-certificates tar gzip parallel \
net-tools netcat unzip zip bzip2 gnupg curl wget \
openjdk-8-jdk rsync pandoc pandoc-citeproc flake8 tzdata \
&& rm -rf /var/lib/apt/lists/*

# Make python command default to python2
RUN sudo update-alternatives --install /usr/bin/python python /usr/bin/python2 0

# If you update java, make sure this aligns
ENV JAVA_HOME=/usr/lib/jvm/java-8-openjdk-amd64

Expand All @@ -59,13 +62,13 @@ RUN JQ_URL="https://circle-downloads.s3.amazonaws.com/circleci-images/cache/linu
# The output looks like this:

#> # To install, run the following commands as root:
#> curl -fsSLO https://download.docker.com/linux/static/stable/x86_64/docker-17.05.0-ce.tgz && tar --strip-components=1 -xvzf docker-17.05.0-ce.tgz -C /usr/local/bin
#> curl -fsSLO https://download.docker.com/linux/static/stable/x86_64/docker-17.05.0.tgz && tar --strip-components=1 -xvzf docker-17.05.0.tgz -C /usr/local/bin
#>
#> # Then start docker in daemon mode:
#> /usr/local/bin/dockerd

RUN set -ex \
&& export DOCKER_VERSION=$(curl --silent --fail --retry 3 https://download.docker.com/linux/static/stable/x86_64/ | grep -o -e 'docker-[.0-9]*-ce\.tgz' | sort -r | head -n 1) \
&& export DOCKER_VERSION=$(curl --silent --fail --retry 3 https://download.docker.com/linux/static/stable/x86_64/ | grep -o -e 'docker-[.0-9]*\.tgz' | sort -r | head -n 1) \
&& DOCKER_URL="https://download.docker.com/linux/static/stable/x86_64/${DOCKER_VERSION}" \
&& echo Docker URL: $DOCKER_URL \
&& curl --silent --show-error --location --fail --retry 3 --output /tmp/docker.tgz "${DOCKER_URL}" \
Expand Down Expand Up @@ -109,9 +112,10 @@ WORKDIR $CIRCLE_HOME
ENV CONDA_ROOT=$CIRCLE_HOME/miniconda
ENV CONDA_BIN=$CIRCLE_HOME/miniconda/bin/conda
ENV MINICONDA2_VERSION=4.5.11
RUN curl -sO https://repo.continuum.io/miniconda/Miniconda2-${MINICONDA2_VERSION}-Linux-x86_64.sh \

RUN curl -sO https://repo.anaconda.com/miniconda/Miniconda2-${MINICONDA2_VERSION}-Linux-x86_64.sh \
&& bash Miniconda2-${MINICONDA2_VERSION}-Linux-x86_64.sh -b -p ${CONDA_ROOT} \
&& $CONDA_BIN clean --all \
&& $CONDA_BIN clean --all --yes \
&& sudo mkdir -m 777 /home/.conda \
&& rm -f Miniconda2-${MINICONDA2_VERSION}-Linux-x86_64.sh

Expand Down
8 changes: 4 additions & 4 deletions dev/docker-images/python/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@ RUN curl -L https://github.com/pyenv/pyenv-installer/raw/master/bin/pyenv-instal
# A version I've tested earlier that I know it breaks with is 1.14.1
RUN mkdir -p $(pyenv root)/versions \
&& ln -s $CONDA_ROOT $(pyenv root)/versions/our-miniconda \
&& $CONDA_BIN create -y -n python2 -c anaconda -c conda-forge python==2.7.15 numpy=1.14.0 pyarrow==0.8.0 pandas nomkl \
&& $CONDA_BIN create -y -n python3 -c anaconda -c conda-forge python=3.6 numpy=1.14.0 pyarrow==0.8.0 pandas nomkl \
&& $CONDA_BIN create -y -n python2 -c anaconda -c conda-forge python==2.7.15 numpy=1.14.0 pyarrow==0.12.1 pandas nomkl \
&& $CONDA_BIN create -y -n python3 -c anaconda -c conda-forge python=3.6 numpy=1.14.0 pyarrow==0.12.1 pandas nomkl \
&& $CONDA_BIN clean --all

RUN pyenv global our-miniconda/envs/python2 our-miniconda/envs/python3 \
Expand All @@ -37,5 +37,5 @@ RUN pyenv global our-miniconda/envs/python2 our-miniconda/envs/python3 \
# Expose pyenv globally
ENV PATH=$CIRCLE_HOME/.pyenv/shims:$PATH

RUN PYENV_VERSION=our-miniconda/envs/python2 $CIRCLE_HOME/.pyenv/shims/pip install unishark unittest-xml-reporting \
&& PYENV_VERSION=our-miniconda/envs/python3 $CIRCLE_HOME/.pyenv/shims/pip install unishark unittest-xml-reporting
RUN PYENV_VERSION=our-miniconda/envs/python2 $CIRCLE_HOME/.pyenv/shims/pip install unishark "unittest-xml-reporting<3"
RUN PYENV_VERSION=our-miniconda/envs/python3 $CIRCLE_HOME/.pyenv/shims/pip install unishark unittest-xml-reporting
2 changes: 1 addition & 1 deletion dev/run-tests-jenkins.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
#!/usr/bin/env python2
#!/usr/bin/env python

#
# Licensed to the Apache Software Foundation (ASF) under one or more
Expand Down
2 changes: 1 addition & 1 deletion dev/test_functions.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
#!/usr/bin/env python2
#!/usr/bin/env python

#
# Licensed to the Apache Software Foundation (ASF) under one or more
Expand Down
12 changes: 12 additions & 0 deletions python/pyspark/sql/session.py
Original file line number Diff line number Diff line change
Expand Up @@ -542,6 +542,13 @@ def _create_from_pandas_with_arrow(self, pdf, schema, timezone):
from pandas.api.types import is_datetime64_dtype, is_datetime64tz_dtype
import pyarrow as pa

if sys.version < '3' and LooseVersion(pa.__version__) >= LooseVersion("0.10.0"):
str_cols = self._get_str_columns(pdf)
if len(str_cols) > 0:
pdf = pdf.copy()
for str_col in str_cols:
pdf[str_col] = pdf[str_col].astype("unicode")

# Create the Spark schema from list of names passed in with Arrow types
if isinstance(schema, (list, tuple)):
if LooseVersion(pa.__version__) < LooseVersion("0.12.0"):
Expand Down Expand Up @@ -592,6 +599,11 @@ def create_RDD_server():
df._schema = schema
return df

def _get_str_columns(self, pandas_df):
import pandas as pd
return [col for col in pandas_df.columns
if pd.api.types.infer_dtype(pandas_df[col], skipna=True) == "string"]

@staticmethod
def _create_shell_session():
"""
Expand Down
18 changes: 18 additions & 0 deletions python/pyspark/sql/tests/test_pandas_udf_scalar.py
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,24 @@ def test_vectorized_udf_string_in_udf(self):
expected = df.select(col('id').cast('string'))
self.assertEquals(expected.collect(), actual.collect())

def test_pandas_python2_string(self):
import pandas as pd
self.spark.conf.set("spark.sql.execution.arrow.enabled", "true")
self.spark.conf.set("spark.sql.execution.arrow.fallback.enabled", "false")
pdf = pd.DataFrame([['a']], columns=["col1"])
sdf = self.spark.createDataFrame(pdf)
sdf2 = self.spark.createDataFrame([['a']], schema=['col1'])
self.assertEquals(sdf.dtypes, sdf2.dtypes)

def test_pandas_python2_nested_string(self):
import pandas as pd
self.spark.conf.set("spark.sql.execution.arrow.enabled", "true")
self.spark.conf.set("spark.sql.execution.arrow.fallback.enabled", "true")
pdf = pd.DataFrame([[['a']]], columns=["col1"])
sdf = self.spark.createDataFrame(pdf)
sdf2 = self.spark.createDataFrame([[['a']]], schema=['col1'])
self.assertEquals(sdf.dtypes, sdf2.dtypes)

def test_vectorized_udf_datatype_string(self):
df = self.spark.range(10).select(
col('id').cast('string').alias('str'),
Expand Down
4 changes: 2 additions & 2 deletions python/pyspark/sql/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -1616,7 +1616,7 @@ def to_arrow_type(dt):
# Timestamps should be in UTC, JVM Arrow timestamps require a timezone to be read
arrow_type = pa.timestamp('us', tz='UTC')
elif type(dt) == ArrayType:
if type(dt.elementType) in [StructType, TimestampType]:
if type(dt.elementType) in [StructType, TimestampType, BinaryType]:
raise TypeError("Unsupported type in conversion to Arrow: " + str(dt))
arrow_type = pa.list_(to_arrow_type(dt.elementType))
elif type(dt) == StructType:
Expand Down Expand Up @@ -1674,7 +1674,7 @@ def from_arrow_type(at):
elif types.is_timestamp(at):
spark_type = TimestampType()
elif types.is_list(at):
if types.is_timestamp(at.value_type):
if types.is_timestamp(at.value_type) or types.is_binary(at):
raise TypeError("Unsupported type in conversion from Arrow: " + str(at))
spark_type = ArrayType(from_arrow_type(at.value_type))
elif types.is_struct(at):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -688,12 +688,12 @@ class DateTimeUtilsSuite extends SparkFunSuite {

// There are some days are skipped entirely in some timezone, skip them here.
val skipped_days = Map[String, Set[Int]](
"Kwajalein" -> Set(8632),
"Kwajalein" -> Set(8632, 8633, 8634),
"Pacific/Apia" -> Set(15338),
"Pacific/Enderbury" -> Set(9130, 9131),
"Pacific/Fakaofo" -> Set(15338),
"Pacific/Kiritimati" -> Set(9130, 9131),
"Pacific/Kwajalein" -> Set(8632),
"Pacific/Kwajalein" -> Set(8632, 8633, 8634),
"MIT" -> Set(15338))
for (tz <- DateTimeTestUtils.ALL_TIMEZONES) {
val skipped = skipped_days.getOrElse(tz.getID, Set.empty)
Expand Down