Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MR from dotnet spark #5

Merged
merged 9 commits into from
Jun 22, 2020
2 changes: 2 additions & 0 deletions NuGet.config
Original file line number Diff line number Diff line change
Expand Up @@ -6,5 +6,7 @@
<add key="dotnet-core" value="https://dotnetfeed.blob.core.windows.net/dotnet-core/index.json" />
<add key="dotnet-tools" value="https://pkgs.dev.azure.com/dnceng/public/_packaging/dotnet-tools/nuget/v3/index.json" />
<add key="dotnet-eng" value="https://pkgs.dev.azure.com/dnceng/public/_packaging/dotnet-eng/nuget/v3/index.json" />
<add key="dotnet5" value="https://pkgs.dev.azure.com/dnceng/public/_packaging/dotnet5/nuget/v3/index.json" />
<add key="dotnet-try" value="https://dotnet.myget.org/F/dotnet-try/api/v3/index.json" />
</packageSources>
</configuration>
895 changes: 485 additions & 410 deletions azure-pipelines.yml

Large diffs are not rendered by default.

92 changes: 92 additions & 0 deletions docs/broadcast-guide.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
# Guide to using Broadcast Variables

This is a guide to show how to use broadcast variables in .NET for Apache Spark.

## What are Broadcast Variables

[Broadcast variables in Apache Spark](https://spark.apache.org/docs/2.2.0/rdd-programming-guide.html#broadcast-variables) are a mechanism for sharing variables across executors that are meant to be read-only. They allow the programmer to keep a read-only variable cached on each machine rather than shipping a copy of it with tasks. They can be used, for example, to give every node a copy of a large input dataset in an efficient manner.

### How to use broadcast variables in .NET for Apache Spark

Broadcast variables are created from a variable `v` by calling `SparkContext.Broadcast(v)`. The broadcast variable is a wrapper around `v`, and its value can be accessed by calling the `Value()` method.

Example:

```csharp
string v = "Variable to be broadcasted";
Broadcast<string> bv = SparkContext.Broadcast(v);

// Using the broadcast variable in a UDF:
Func<Column, Column> udf = Udf<string, string>(
str => $"{str}: {bv.Value()}");
```

The type parameter for `Broadcast` should be the type of the variable being broadcasted.

### Deleting broadcast variables

The broadcast variable can be deleted from all executors by calling the `Destroy()` method on it.

```csharp
// Destroying the broadcast variable bv:
bv.Destroy();
```

> Note: `Destroy()` deletes all data and metadata related to the broadcast variable. Use this with caution - once a broadcast variable has been destroyed, it cannot be used again.

#### Caveat of using Destroy

One important thing to keep in mind while using broadcast variables in UDFs is to limit the scope of the variable to only the UDF that is referencing it. The [guide to using UDFs](udf-guide.md) describes this phenomenon in detail. This is especially crucial when calling `Destroy` on the broadcast variable. If the broadcast variable that has been destroyed is visible to or accessible from other UDFs, it gets picked up for serialization by all those UDFs, even if it is not being referenced by them. This will throw an error as .NET for Apache Spark is not able to serialize the destroyed broadcast variable.

Example to demonstrate:

```csharp
string v = "Variable to be broadcasted";
Broadcast<string> bv = SparkContext.Broadcast(v);

// Using the broadcast variable in a UDF:
Func<Column, Column> udf1 = Udf<string, string>(
str => $"{str}: {bv.Value()}");

// Destroying bv
bv.Destroy();

// Calling udf1 after destroying bv throws the following expected exception:
// org.apache.spark.SparkException: Attempted to use Broadcast(0) after it was destroyed
df.Select(udf1(df["_1"])).Show();

// Different UDF udf2 that is not referencing bv
Func<Column, Column> udf2 = Udf<string, string>(
str => $"{str}: not referencing broadcast variable");

// Calling udf2 throws the following (unexpected) exception:
// [Error] [JvmBridge] org.apache.spark.SparkException: Task not serializable
df.Select(udf2(df["_1"])).Show();
```

The recommended way of implementing above desired behavior:

```csharp
string v = "Variable to be broadcasted";
// Restricting the visibility of bv to only the UDF referencing it
{
Broadcast<string> bv = SparkContext.Broadcast(v);

// Using the broadcast variable in a UDF:
Func<Column, Column> udf1 = Udf<string, string>(
str => $"{str}: {bv.Value()}");

// Destroying bv
bv.Destroy();
}

// Different UDF udf2 that is not referencing bv
Func<Column, Column> udf2 = Udf<string, string>(
str => $"{str}: not referencing broadcast variable");

// Calling udf2 works fine as expected
df.Select(udf2(df["_1"])).Show();
```
This ensures that destroying `bv` doesn't affect calling `udf2` because of unexpected serialization behavior.

Broadcast variables are useful for transmitting read-only data to all executors, as the data is sent only once and this can give performance benefits when compared with using local variables that get shipped to the executors with each task. Please refer to the [official documentation](https://spark.apache.org/docs/2.2.0/rdd-programming-guide.html#broadcast-variables) to get a deeper understanding of broadcast variables and why they are used.
18 changes: 9 additions & 9 deletions docs/building/ubuntu-instructions.md
Original file line number Diff line number Diff line change
Expand Up @@ -35,14 +35,14 @@ If you already have all the pre-requisites, skip to the [build](ubuntu-instructi
```bash
sudo update-alternatives --config java
```
3. Install **[Apache Maven 3.6.0+](https://maven.apache.org/download.cgi)**
3. Install **[Apache Maven 3.6.3+](https://maven.apache.org/download.cgi)**
- Run the following command:
```bash
mkdir -p ~/bin/maven
cd ~/bin/maven
wget https://www-us.apache.org/dist/maven/maven-3/3.6.0/binaries/apache-maven-3.6.0-bin.tar.gz
tar -xvzf apache-maven-3.6.0-bin.tar.gz
ln -s apache-maven-3.6.0 current
wget https://www-us.apache.org/dist/maven/maven-3/3.6.3/binaries/apache-maven-3.6.3-bin.tar.gz
tar -xvzf apache-maven-3.6.3-bin.tar.gz
ln -s apache-maven-3.6.3 current
export M2_HOME=~/bin/maven/current
export PATH=${M2_HOME}/bin:${PATH}
source ~/.bashrc
Expand All @@ -54,11 +54,11 @@ If you already have all the pre-requisites, skip to the [build](ubuntu-instructi
<summary>&#x1F4D9; Click to see sample mvn -version output</summary>

```
Apache Maven 3.6.0 (97c98ec64a1fdfee7767ce5ffb20918da4f719f3; 2018-10-24T18:41:47Z)
Maven home: ~/bin/apache-maven-3.6.0
Java version: 1.8.0_191, vendor: Oracle Corporation, runtime: /usr/lib/jvm/java-8-openjdk-amd64/jre
Default locale: en, platform encoding: UTF-8
OS name: "linux", version: "4.4.0-17763-microsoft", arch: "amd64", family: "unix"
Apache Maven 3.6.3 (cecedd343002696d0abb50b32b541b8a6ba2883f)
Maven home: ~/bin/apache-maven-3.6.3
Java version: 1.8.0_242, vendor: Oracle Corporation, runtime: /usr/lib/jvm/java-8-openjdk-amd64/jre
Default locale: en_US, platform encoding: ANSI_X3.4-1968
OS name: "linux", version: "4.4.0-142-generic", arch: "amd64", family: "unix"
```
4. Install **[Apache Spark 2.3+](https://spark.apache.org/downloads.html)**
- Download [Apache Spark 2.3+](https://spark.apache.org/downloads.html) and extract it into a local folder (e.g., `~/bin/spark-2.3.2-bin-hadoop2.7`)
Expand Down
171 changes: 171 additions & 0 deletions docs/udf-guide.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,171 @@
# Guide to User-Defined Functions (UDFs)

This is a guide to show how to use UDFs in .NET for Apache Spark.

## What are UDFs

[User-Defined Functions (UDFs)](https://spark.apache.org/docs/latest/api/java/org/apache/spark/sql/expressions/UserDefinedFunction.html) are a feature of Spark that allow developers to use custom functions to extend the system's built-in functionality. They transform values from a single row within a table to produce a single corresponding output value per row based on the logic defined in the UDF.

Let's take the following as an example for a UDF definition:

```csharp
string s1 = "hello";
Func<Column, Column> udf = Udf<string, string>(
str => $"{s1} {str}");

```
The above defined UDF takes a `string` as an input (in the form of a [Column](https://github.com/dotnet/spark/blob/master/src/csharp/Microsoft.Spark/Sql/Column.cs#L14) of a [Dataframe](https://github.com/dotnet/spark/blob/master/src/csharp/Microsoft.Spark/Sql/DataFrame.cs#L24)), and returns a `string` with `hello` appended in front of the input.

For a sample Dataframe, let's take the following Dataframe `df`:

```text
+-------+
| name|
+-------+
|Michael|
| Andy|
| Justin|
+-------+
```

Now let's apply the above defined `udf` to the dataframe `df`:

```csharp
DataFrame udfResult = df.Select(udf(df["name"]));
```

This would return the below as the Dataframe `udfResult`:

```text
+-------------+
| name|
+-------------+
|hello Michael|
| hello Andy|
| hello Justin|
+-------------+
```
To get a better understanding of how to implement UDFs, please take a look at the [UDF helper functions](https://github.com/dotnet/spark/blob/master/src/csharp/Microsoft.Spark/Sql/Functions.cs#L3616) and some [test examples](https://github.com/dotnet/spark/blob/master/src/csharp/Microsoft.Spark.E2ETest/UdfTests/UdfSimpleTypesTests.cs#L49).

## UDF serialization

Since UDFs are functions that need to be executed on the workers, they have to be serialized and sent to the workers as part of the payload from the driver. This involves serializing the [delegate](https://docs.microsoft.com/en-us/dotnet/csharp/programming-guide/delegates/) which is a reference to the method, along with its [target](https://docs.microsoft.com/en-us/dotnet/api/system.delegate.target?view=netframework-4.8) which is the class instance on which the current delegate invokes the instance method. Please take a look at this [code](https://github.com/dotnet/spark/blob/master/src/csharp/Microsoft.Spark/Utils/CommandSerDe.cs#L149) to get a better understanding of how UDF serialization is being done.

## Good to know while implementing UDFs

One behavior to be aware of while implementing UDFs in .NET for Apache Spark is how the target of the UDF gets serialized. .NET for Apache Spark uses .NET Core, which does not support serializing delegates, so it is instead done by using reflection to serialize the target where the delegate is defined. When multiple delegates are defined in a common scope, they have a shared closure that becomes the target of reflection for serialization. Let's take an example to illustrate what that means.

The following code snippet defines two string variables that are being referenced in two function delegates that return the respective strings as result:

```csharp
using System;

public class C {
public void M() {
string s1 = "s1";
string s2 = "s2";
Func<string, string> a = str => s1;
Func<string, string> b = str => s2;
}
}
```

The above C# code generates the following C# disassembly (credit source: [sharplab.io](https://sharplab.io)) code from the compiler:

```csharp
public class C
{
[CompilerGenerated]
private sealed class <>c__DisplayClass0_0
{
public string s1;

public string s2;

internal string <M>b__0(string str)
{
return s1;
}

internal string <M>b__1(string str)
{
return s2;
}
}

public void M()
{
<>c__DisplayClass0_0 <>c__DisplayClass0_ = new <>c__DisplayClass0_0();
<>c__DisplayClass0_.s1 = "s1";
<>c__DisplayClass0_.s2 = "s2";
Func<string, string> func = new Func<string, string>(<>c__DisplayClass0_.<M>b__0);
Func<string, string> func2 = new Func<string, string>(<>c__DisplayClass0_.<M>b__1);
}
}
```
As can be seen in the above decompiled code, both `func` and `func2` share the same closure `<>c__DisplayClass0_0`, which is the target that is serialized when serializing the delegates `func` and `func2`. Hence, even though `Func<string, string> a` is only referencing `s1`, `s2` also gets serialized when sending over the bytes to the workers.

This can lead to some unexpected behaviors at runtime (like in the case of using [broadcast variables](broadcast-guide.md)), which is why we recommend restricting the visibility of the variables used in a function to that function's scope.

Going back to the above example, the following is the recommended way to implement the desired behavior of previous code snippet:

```csharp
using System;

public class C {
public void M() {
{
string s1 = "s1";
Func<string, string> a = str => s1;
}
{
string s2 = "s2";
Func<string, string> b = str => s2;
}
}
}
```

The above C# code generates the following C# disassembly (credit source: [sharplab.io](https://sharplab.io)) code from the compiler:

```csharp
public class C
{
[CompilerGenerated]
private sealed class <>c__DisplayClass0_0
{
public string s1;

internal string <M>b__0(string str)
{
return s1;
}
}

[CompilerGenerated]
private sealed class <>c__DisplayClass0_1
{
public string s2;

internal string <M>b__1(string str)
{
return s2;
}
}

public void M()
{
<>c__DisplayClass0_0 <>c__DisplayClass0_ = new <>c__DisplayClass0_0();
<>c__DisplayClass0_.s1 = "s1";
Func<string, string> func = new Func<string, string>(<>c__DisplayClass0_.<M>b__0);
<>c__DisplayClass0_1 <>c__DisplayClass0_2 = new <>c__DisplayClass0_1();
<>c__DisplayClass0_2.s2 = "s2";
Func<string, string> func2 = new Func<string, string>(<>c__DisplayClass0_2.<M>b__1);
}
}
```

Here we see that `func` and `func2` no longer share a closure and have their own separate closures `<>c__DisplayClass0_0` and `<>c__DisplayClass0_1` respectively. When used as the target for serialization, nothing other than the referenced variables will get serialized for the delegate.

This behavior is important to keep in mind while implementing multiple UDFs in a common scope.
To learn more about UDFs in general, please review the following articles that explain UDFs and how to use them: [UDFs in databricks(scala)](https://docs.databricks.com/spark/latest/spark-sql/udf-scala.html), [Spark UDFs and some gotchas](https://medium.com/@achilleus/spark-udfs-we-can-use-them-but-should-we-use-them-2c5a561fde6d).
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ public void Run(string[] args)

SparkSession spark = SparkSession
.Builder()
.AppName(".NET for Apache Spark Sentiment Analysis")
.AppName("Sentiment Analysis using .NET for Apache Spark")
.GetOrCreate();

// Read in and display Yelp reviews
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ public void Run(string[] args)

SparkSession spark = SparkSession
.Builder()
.AppName(".NET Spark SQL basic example")
.AppName("SQL basic example using .NET for Apache Spark")
.Config("spark.some.config.option", "some-value")
.GetOrCreate();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ public void Run(string[] args)

SparkSession spark = SparkSession
.Builder()
.AppName(".NET Spark SQL Datasource example")
.AppName("SQL Datasource example using .NET for Apache Spark")
.Config("spark.some.config.option", "some-value")
.GetOrCreate();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ public void Run(string[] args)
.Builder()
// Lower the shuffle partitions to speed up groupBy() operations.
.Config("spark.sql.shuffle.partitions", "3")
.AppName(".NET Spark SQL VectorUdfs example")
.AppName("SQL VectorUdfs example using .NET for Apache Spark")
.GetOrCreate();

DataFrame df = spark.Read().Schema("age INT, name STRING").Json(args[0]);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ public void Run(string[] args)
.Builder()
// Lower the shuffle partitions to speed up groupBy() operations.
.Config("spark.sql.shuffle.partitions", "3")
.AppName(".NET Spark SQL VectorUdfs example")
.AppName("SQL VectorUdfs example using .NET for Apache Spark")
.GetOrCreate();

DataFrame df = spark.Read().Schema("age INT, name STRING").Json(args[0]);
Expand Down
4 changes: 3 additions & 1 deletion script/download-spark-distros.cmd
Original file line number Diff line number Diff line change
Expand Up @@ -23,5 +23,7 @@ curl -k -L -o spark-2.4.1.tgz https://archive.apache.org/dist/spark/spark-2.4.1/
curl -k -L -o spark-2.4.3.tgz https://archive.apache.org/dist/spark/spark-2.4.3/spark-2.4.3-bin-hadoop2.7.tgz && tar xzvf spark-2.4.3.tgz
curl -k -L -o spark-2.4.4.tgz https://archive.apache.org/dist/spark/spark-2.4.4/spark-2.4.4-bin-hadoop2.7.tgz && tar xzvf spark-2.4.4.tgz
curl -k -L -o spark-2.4.5.tgz https://archive.apache.org/dist/spark/spark-2.4.5/spark-2.4.5-bin-hadoop2.7.tgz && tar xzvf spark-2.4.5.tgz
curl -k -L -o spark-2.4.6.tgz https://archive.apache.org/dist/spark/spark-2.4.6/spark-2.4.6-bin-hadoop2.7.tgz && tar xzvf spark-2.4.6.tgz

endlocal

endlocal
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
using Microsoft.Spark.Sql;
using Microsoft.Spark.Sql.Streaming;
using Microsoft.Spark.Sql.Types;
using Microsoft.Spark.UnitTest.TestUtils;
using Xunit;

namespace Microsoft.Spark.Extensions.Delta.E2ETest
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<TargetFramework>netcoreapp3.1</TargetFramework>
<RootNamespace>Microsoft.Spark.Extensions.DotNet.Interactive.UnitTest</RootNamespace>
<IsPackable>false</IsPackable>
</PropertyGroup>

<ItemGroup>
<PackageReference Include="Moq" Version="4.10.0" />
<PackageReference Include="Microsoft.DotNet.Interactive" Version="1.0.0-beta.20319.1" />
</ItemGroup>

<ItemGroup>
<ProjectReference Include="..\Microsoft.Spark.Extensions.DotNet.Interactive\Microsoft.Spark.Extensions.DotNet.Interactive.csproj" />
<ProjectReference Include="..\..\Microsoft.Spark\Microsoft.Spark.csproj" />
</ItemGroup>

<ItemGroup>
<Compile Include="..\..\Microsoft.Spark.UnitTest\TestUtils\TemporaryDirectory.cs" />
</ItemGroup>

</Project>
Loading