Skip to content

Commit

Permalink
[Python] Get available python version and use it for Python SDK harne…
Browse files Browse the repository at this point in the history
…ss boot entry point (#28046)

* add env variable config, use GetPythonVersion

* fix error message

* add spaces
  • Loading branch information
riteshghorse authored Aug 23, 2023
1 parent cfd9662 commit 395c4d1
Show file tree
Hide file tree
Showing 5 changed files with 76 additions and 12 deletions.
15 changes: 12 additions & 3 deletions sdks/go/pkg/beam/core/runtime/xlangx/expansionx/download.go
Original file line number Diff line number Diff line change
Expand Up @@ -382,20 +382,29 @@ func jarExists(jarPath string) bool {
return err == nil
}

func getPythonVersion() (string, error) {
// GetPythonVersion returns the Python version to use. It checks for
// env variable PYTHON_PATH and returns that it if set.
// If no PYTHON_PATH is defined then it checks for python or python3
// and returns that. Otherwise it returns an error.
func GetPythonVersion() (string, error) {
if pythonPath := os.Getenv("PYTHON_PATH"); pythonPath != "" {
return pythonPath, nil
}
for _, v := range []string{"python", "python3"} {
cmd := exec.Command(v, "--version")
if err := cmd.Run(); err == nil {
return v, nil
}
}
return "", fmt.Errorf("no python installation found")
return "", errors.New("no python installation found. If you use a " +
"custom container image, please check if python/python3 is available or specify the " +
"full path to the python interpreter in PYTHON_PATH environment variable")
}

// SetUpPythonEnvironment sets up the virtual ennvironment required for the
// Apache Beam Python SDK to run an expansion service module.
func SetUpPythonEnvironment(extraPackage string) (string, error) {
py, err := getPythonVersion()
py, err := GetPythonVersion()
if err != nil {
return "", fmt.Errorf("no python installation found: %v", err)
}
Expand Down
32 changes: 32 additions & 0 deletions sdks/go/pkg/beam/core/runtime/xlangx/expansionx/download_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,3 +217,35 @@ func TestGetJar_dev(t *testing.T) {
t.Errorf("error message does not contain gradle command %v for user, got message: %v", gradleTarget, err)
}
}

func TestGetPythonVersion(t *testing.T) {
tests := []struct {
name string
PYTHON_PATH string
}{
{
name: "PYTHON_PATH set",
PYTHON_PATH: "/bin/python",
},
{
name: "PYTHON_PATH not set",
PYTHON_PATH: "",
},
}

for _, test := range tests {
if test.PYTHON_PATH != "" {
os.Setenv("PYTHON_PATH", test.PYTHON_PATH)
}
pythonVersion, err := GetPythonVersion()
if err != nil {
t.Errorf("python installation not found: %v, when PYTHON_PATH=%v", err, test.PYTHON_PATH)
}
if test.PYTHON_PATH != "" && pythonVersion != test.PYTHON_PATH {
t.Errorf("incorrect PYTHON_PATH, want: %v, got: %v", test.PYTHON_PATH, pythonVersion)
}
if test.PYTHON_PATH != "" {
os.Unsetenv(test.PYTHON_PATH)
}
}
}
13 changes: 11 additions & 2 deletions sdks/python/container/boot.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import (

"github.com/apache/beam/sdks/v2/go/container/tools"
"github.com/apache/beam/sdks/v2/go/pkg/beam/artifact"
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/xlangx/expansionx"
pipepb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/pipeline_v1"
"github.com/apache/beam/sdks/v2/go/pkg/beam/util/execx"
"github.com/apache/beam/sdks/v2/go/pkg/beam/util/grpcx"
Expand Down Expand Up @@ -91,7 +92,11 @@ func main() {
"--container_executable=/opt/apache/beam/boot",
}
log.Printf("Starting worker pool %v: python %v", workerPoolId, strings.Join(args, " "))
if err := execx.Execute("python", args...); err != nil {
pythonVersion, err := expansionx.GetPythonVersion()
if err != nil {
log.Fatalf("Python SDK worker pool exited with error: %v", err)
}
if err := execx.Execute(pythonVersion, args...); err != nil {
log.Fatalf("Python SDK worker pool exited with error: %v", err)
}
log.Print("Python SDK worker pool exited.")
Expand Down Expand Up @@ -336,7 +341,11 @@ func setupVenv(ctx context.Context, logger *tools.Logger, baseDir, workerId stri
if err := os.MkdirAll(dir, 0750); err != nil {
return "", fmt.Errorf("failed to create Python venv directory: %s", err)
}
if err := execx.Execute("python", "-m", "venv", "--system-site-packages", dir); err != nil {
pythonVersion, err := expansionx.GetPythonVersion()
if err != nil {
return "", err
}
if err := execx.Execute(pythonVersion, "-m", "venv", "--system-site-packages", dir); err != nil {
return "", fmt.Errorf("python venv initialization failed: %s", err)
}

Expand Down
19 changes: 14 additions & 5 deletions sdks/python/container/piputil.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,26 +26,31 @@ import (
"path/filepath"
"strings"

"github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/xlangx/expansionx"
"github.com/apache/beam/sdks/v2/go/pkg/beam/util/execx"
)

// pipInstallRequirements installs the given requirement, if present.
func pipInstallRequirements(files []string, dir, name string) error {
pythonVersion, err := expansionx.GetPythonVersion()
if err != nil {
return err
}
for _, file := range files {
if file == name {
// We run the install process in two rounds in order to avoid as much
// as possible PyPI downloads. In the first round the --find-links
// option will make sure that only things staged in the worker will be
// used without following their dependencies.
args := []string{"-m", "pip", "install", "-q", "-r", filepath.Join(dir, name), "--no-cache-dir", "--disable-pip-version-check", "--no-index", "--no-deps", "--find-links", dir}
if err := execx.Execute("python", args...); err != nil {
if err := execx.Execute(pythonVersion, args...); err != nil {
fmt.Println("Some packages could not be installed solely from the requirements cache. Installing packages from PyPI.")
}
// The second install round opens up the search for packages on PyPI and
// also installs dependencies. The key is that if all the packages have
// been installed in the first round then this command will be a no-op.
args = []string{"-m", "pip", "install", "-q", "-r", filepath.Join(dir, name), "--no-cache-dir", "--disable-pip-version-check", "--find-links", dir}
return execx.Execute("python", args...)
return execx.Execute(pythonVersion, args...)
}
}
return nil
Expand All @@ -65,6 +70,10 @@ func isPackageInstalled(pkgName string) bool {

// pipInstallPackage installs the given package, if present.
func pipInstallPackage(files []string, dir, name string, force, optional bool, extras []string) error {
pythonVersion, err := expansionx.GetPythonVersion()
if err != nil {
return err
}
for _, file := range files {
if file == name {
var packageSpec = name
Expand All @@ -90,17 +99,17 @@ func pipInstallPackage(files []string, dir, name string, force, optional bool, e
// installed if necessary. This achieves our goal outlined above.
args := []string{"-m", "pip", "install", "-q", "--no-cache-dir", "--disable-pip-version-check", "--upgrade", "--force-reinstall", "--no-deps",
filepath.Join(dir, packageSpec)}
err := execx.Execute("python", args...)
err := execx.Execute(pythonVersion, args...)
if err != nil {
return err
}
args = []string{"-m", "pip", "install", "-q", "--no-cache-dir", "--disable-pip-version-check", filepath.Join(dir, packageSpec)}
return execx.Execute("python", args...)
return execx.Execute(pythonVersion, args...)
}

// Case when we do not perform a forced reinstall.
args := []string{"-m", "pip", "install", "-q", "--no-cache-dir", "--disable-pip-version-check", filepath.Join(dir, packageSpec)}
return execx.Execute("python", args...)
return execx.Execute(pythonVersion, args...)
}
}
if optional {
Expand Down
9 changes: 7 additions & 2 deletions sdks/python/expansion-service-container/boot.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"strconv"
"strings"

"github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/xlangx/expansionx"
"github.com/apache/beam/sdks/v2/go/pkg/beam/util/execx"
)

Expand Down Expand Up @@ -58,15 +59,19 @@ func main() {
}

func launchExpansionServiceProcess() error {
pythonVersion, err := expansionx.GetPythonVersion()
if err != nil {
return err
}
log.Printf("Starting Python expansion service ...")

dir := filepath.Join("/opt/apache/beam", venvDirectory)
os.Setenv("VIRTUAL_ENV", dir)
os.Setenv("PATH", strings.Join([]string{filepath.Join(dir, "bin"), os.Getenv("PATH")}, ":"))

args := []string{"-m", expansionServiceEntrypoint, "-p", strconv.Itoa(*port), "--fully_qualified_name_glob", "*"}
if err := execx.Execute("python", args...); err != nil {
return fmt.Errorf("Could not start the expansion service: %s", err)
if err := execx.Execute(pythonVersion, args...); err != nil {
return fmt.Errorf("could not start the expansion service: %s", err)
}

return nil
Expand Down

0 comments on commit 395c4d1

Please sign in to comment.