Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BEAM-13872] [Playground] Increase test coverage for the code_processing package #16891

Merged
merged 6 commits into from
Mar 3, 2022
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
171 changes: 150 additions & 21 deletions playground/backend/internal/code_processing/code_processing_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@ const (
javaConfig = "{\n \"compile_cmd\": \"javac\",\n \"run_cmd\": \"java\",\n \"test_cmd\": \"java\",\n \"compile_args\": [\n \"-d\",\n \"bin\",\n \"-classpath\"\n ],\n \"run_args\": [\n \"-cp\",\n \"bin:\"\n ],\n \"test_args\": [\n \"-cp\",\n \"bin:\",\n \"JUnit\"\n ]\n}"
pythonConfig = "{\n \"compile_cmd\": \"\",\n \"run_cmd\": \"python3\",\n \"compile_args\": [],\n \"run_args\": []\n}"
goConfig = "{\n \"compile_cmd\": \"go\",\n \"run_cmd\": \"\",\n \"compile_args\": [\n \"build\",\n \"-o\",\n \"bin\"\n ],\n \"run_args\": [\n ]\n}"
fileName = "fakeFileName"
pipelinesFolder = "executable_files"
configFolder = "configs"
)
Expand Down Expand Up @@ -99,11 +98,12 @@ func Test_Process(t *testing.T) {
if err != nil {
panic(err)
}
sdkEnv, err := environment.ConfigureBeamEnvs(appEnvs.WorkingDir())
sdkJavaEnv, err := environment.ConfigureBeamEnvs(appEnvs.WorkingDir())
pavel-avilov marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
panic(err)
}

sdkGoEnv := *sdkJavaEnv
sdkGoEnv.ApacheBeamSdk = pb.Sdk_SDK_GO
type args struct {
ctx context.Context
appEnv *environment.ApplicationEnvs
Expand Down Expand Up @@ -136,7 +136,7 @@ func Test_Process(t *testing.T) {
args: args{
ctx: context.Background(),
appEnv: &environment.ApplicationEnvs{},
sdkEnv: sdkEnv,
sdkEnv: sdkJavaEnv,
pipelineId: uuid.New(),
pipelineOptions: "",
},
Expand All @@ -155,7 +155,7 @@ func Test_Process(t *testing.T) {
args: args{
ctx: context.Background(),
appEnv: appEnvs,
sdkEnv: sdkEnv,
sdkEnv: sdkJavaEnv,
pipelineId: uuid.New(),
pipelineOptions: "",
},
Expand All @@ -174,7 +174,7 @@ func Test_Process(t *testing.T) {
args: args{
ctx: context.Background(),
appEnv: appEnvs,
sdkEnv: sdkEnv,
sdkEnv: sdkJavaEnv,
pipelineId: uuid.New(),
pipelineOptions: "",
},
Expand All @@ -193,7 +193,7 @@ func Test_Process(t *testing.T) {
args: args{
ctx: context.Background(),
appEnv: appEnvs,
sdkEnv: sdkEnv,
sdkEnv: sdkJavaEnv,
pipelineId: uuid.New(),
pipelineOptions: "",
},
Expand All @@ -212,15 +212,15 @@ func Test_Process(t *testing.T) {
args: args{
ctx: context.Background(),
appEnv: appEnvs,
sdkEnv: sdkEnv,
sdkEnv: sdkJavaEnv,
pipelineId: uuid.New(),
pipelineOptions: "",
},
},
{
// Test case with calling processCode without any error cases.
// As a result status into cache should be set as Status_STATUS_FINISHED.
name: "processing complete successfully",
name: "processing complete successfully on java sdk",
createExecFile: true,
cancelFunc: false,
code: "class HelloWorld {\n public static void main(String[] args) {\n System.out.println(\"Hello world!\");\n }\n}",
Expand All @@ -231,7 +231,26 @@ func Test_Process(t *testing.T) {
args: args{
ctx: context.Background(),
appEnv: appEnvs,
sdkEnv: sdkEnv,
sdkEnv: sdkJavaEnv,
pipelineId: uuid.New(),
pipelineOptions: "",
},
},
{
// Test case with calling processCode method with incorrect go code.
// As a result status into cache should be set as Status_STATUS_PREPARATION_ERROR.
name: "prepare step failed",
createExecFile: true,
code: "package main\nimport \"fmt\"\nfunc main() {\n fmt.Println(\"hello world\").\n}\n",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

code -> constant

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Moved to a variable, not to constant

cancelFunc: false,
expectedStatus: pb.Status_STATUS_PREPARATION_ERROR,
expectedCompileOutput: nil,
expectedRunOutput: nil,
expectedRunError: nil,
args: args{
ctx: context.Background(),
appEnv: appEnvs,
sdkEnv: &sdkGoEnv,
pipelineId: uuid.New(),
pipelineOptions: "",
},
Expand Down Expand Up @@ -683,7 +702,7 @@ func Benchmark_ProcessPython(b *testing.B) {
}

ctx := context.Background()
code := "if __name__ == \"__main__\":\n print(\"Hello world!\")\n"
code := "import argparse\nimport logging\nimport re\n\nimport apache_beam as beam\nfrom apache_beam.io import ReadFromText\nfrom apache_beam.io import WriteToText\nfrom apache_beam.options.pipeline_options import PipelineOptions\nfrom apache_beam.options.pipeline_options import SetupOptions\n\n\nclass WordExtractingDoFn(beam.DoFn):\n \"\"\"Parse each line of input text into words.\"\"\"\n def process(self, element):\n \"\"\"Returns an iterator over the words of this element.\n\n The element is a line of text. If the line is blank, note that, too.\n\n Args:\n element: the element being processed\n\n Returns:\n The processed element.\n \"\"\"\n return re.findall(r'[\\w\\']+', element, re.UNICODE)\n\n\ndef run(argv=None, save_main_session=True):\n \"\"\"Main entry point; defines and runs the wordcount pipeline.\"\"\"\n parser = argparse.ArgumentParser()\n parser.add_argument(\n '--input',\n dest='input',\n default='gs://dataflow-samples/shakespeare/kinglear.txt',\n help='Input file to process.')\n parser.add_argument(\n '--output',\n dest='output',\n required=True,\n help='Output file to write results to.')\n known_args, pipeline_args = parser.parse_known_args(argv)\n\n # We use the save_main_session option because one or more DoFn's in this\n # workflow rely on global context (e.g., a module imported at module level).\n pipeline_options = PipelineOptions(pipeline_args)\n pipeline_options.view_as(SetupOptions).save_main_session = save_main_session\n\n # The pipeline will be run on exiting the with block.\n with beam.Pipeline(options=pipeline_options) as p:\n\n # Read the text file[pattern] into a PCollection.\n lines = p | 'Read' >> ReadFromText(known_args.input)\n\n counts = (\n lines\n | 'Split' >> (beam.ParDo(WordExtractingDoFn()).with_output_types(str))\n | 'PairWithOne' >> beam.Map(lambda x: (x, 1))\n | 'GroupAndSum' >> beam.CombinePerKey(sum))\n\n # Format the counts into a PCollection of strings.\n def format_result(word, count):\n return '%s: %d' % (word, count)\n\n output = counts | 'Format' >> beam.MapTuple(format_result)\n\n # Write the output using a \"Write\" transform that has side effects.\n # pylint: disable=expression-not-assigned\n output | 'Write' >> WriteToText(known_args.output)\n\n\nif __name__ == '__main__':\n logging.getLogger().setLevel(logging.INFO)\n run()"

b.ResetTimer()
for i := 0; i < b.N; i++ {
Expand All @@ -695,7 +714,7 @@ func Benchmark_ProcessPython(b *testing.B) {
}
b.StartTimer()

Process(ctx, cacheService, lc, pipelineId, appEnv, sdkEnv, "")
Process(ctx, cacheService, lc, pipelineId, appEnv, sdkEnv, "--output t.txt")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's move it to constant as well as it's done with others)

}
}

Expand Down Expand Up @@ -948,11 +967,14 @@ func Test_runStep(t *testing.T) {
if err != nil {
panic(err)
}
sdkEnv, err := environment.ConfigureBeamEnvs(appEnvs.WorkingDir())
sdkJavaEnv, err := environment.ConfigureBeamEnvs(appEnvs.WorkingDir())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we add test cases with Go SDK (unit tests and examples) that contain errors? Our tests don't cover these cases.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I also see that we don't have test cases for the compileStep() method:

  • case with Python SDK/SCIO SDK or Go SDK unit tests -> cache should contain status Status_STATUS_EXECUTING and empty CompileOutput, RunOutput, RunError, Logs, Graph
  • case when reconcileBackgroundTask() method returns err (expired context for example) -> cache should contain the same status like it was before.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

prepareStep() method:

  • test case with an error during builder.Preparer() -> cache should contain status Status_STATUS_ERROR
  • case when reconcileBackgroundTask() method returns err (expired context for example) -> cache should contain the same status like it was before.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

validateStep() method:

  • test case with an error during builder.Preparer() -> cache should contain status Status_STATUS_ERROR

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

processSetupError() method:

  • test case with an error during utils.SetToCache() -> return err

GetLastIndex() method:

  • test case with an error during converging value to float64 -> return err

readGraphFile() method:

  • test case with reading an existed graph file -> cache should contain Graph
  • test case with ctx.Done() and exists graph file -> cache should contain Graph

processErrorWithSavingOutput method:

  • test case with an error during utils.SetToCache method (maybe use redis-mock) -> return err

processRunError method:

  • ditto

processCompileSuccess method:

  • tests cases with an error during utils.SetToCache() for each subkeys (CompileOutput/RunOutput/RunError/Logs/Graph/Status) (maybe use redis-mock) -> return err

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

if err != nil {
panic(err)
}
sdkEnv.ApacheBeamSdk = pb.Sdk_SDK_PYTHON
sdkPythonEnv := *sdkJavaEnv
sdkPythonEnv.ApacheBeamSdk = pb.Sdk_SDK_PYTHON
sdkGoEnv := *sdkJavaEnv
sdkGoEnv.ApacheBeamSdk = pb.Sdk_SDK_GO
type args struct {
ctx context.Context
cacheService cache.Cache
Expand All @@ -962,35 +984,69 @@ func Test_runStep(t *testing.T) {
pipelineOptions string
pipelineLifeCycleCtx context.Context
cancelChannel chan bool
createExecFile bool
}
tests := []struct {
name string
args args
code string
}{
{
name: "Test run step working without an error",
name: "Test run step working on python sdk without an error",
args: args{
ctx: context.Background(),
cacheService: cacheService,
pipelineId: uuid.UUID{},
isUnitTest: false,
sdkEnv: sdkEnv,
sdkEnv: &sdkPythonEnv,
pipelineOptions: "",
pipelineLifeCycleCtx: context.Background(),
cancelChannel: make(chan bool, 1),
createExecFile: true,
},
code: "if __name__ == \"__main__\":\n print(\"Hello world!\")\n",
},
{
name: "Test run step working on go sdk",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added commentaries

args: args{
ctx: context.Background(),
cacheService: cacheService,
pipelineId: uuid.New(),
isUnitTest: true,
sdkEnv: &sdkGoEnv,
pipelineOptions: "",
pipelineLifeCycleCtx: context.Background(),
cancelChannel: make(chan bool, 1),
createExecFile: true,
},
code: "package main\nimport \"fmt\"\nfunc main() {\n fmt.Println(\"hello world\")\n}\n",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would suggest to extract code variables with useful names

},
{
name: "Test run step without preparing files with code",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

success or fail?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fail, expected to get Status_STATUS_RUN_ERROR pipeline status

args: args{
ctx: context.Background(),
cacheService: cacheService,
pipelineId: uuid.UUID{},
isUnitTest: true,
sdkEnv: sdkJavaEnv,
pipelineOptions: "",
pipelineLifeCycleCtx: context.Background(),
cancelChannel: make(chan bool, 1),
createExecFile: false,
},
code: "class HelloWorld {\n public static void main(String[] args) {\n System.out.println(\"Hello world!\");\n }\n}",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
lc, _ := fs_tool.NewLifeCycle(pb.Sdk_SDK_PYTHON, tt.args.pipelineId, filepath.Join(os.Getenv("APP_WORK_DIR"), pipelinesFolder))
err := lc.CreateFolders()
if err != nil {
t.Fatalf("error during prepare folders: %s", err.Error())
lc, _ := fs_tool.NewLifeCycle(tt.args.sdkEnv.ApacheBeamSdk, tt.args.pipelineId, filepath.Join(os.Getenv("APP_WORK_DIR"), pipelinesFolder))
if tt.args.createExecFile {
err := lc.CreateFolders()
if err != nil {
t.Fatalf("error during prepare folders: %s", err.Error())
}
_ = lc.CreateSourceCodeFile(tt.code)
}
_ = lc.CreateSourceCodeFile(tt.code)
runStep(tt.args.ctx, tt.args.cacheService, &lc.Paths, tt.args.pipelineId, tt.args.isUnitTest, tt.args.sdkEnv, tt.args.pipelineOptions, tt.args.pipelineLifeCycleCtx, tt.args.cancelChannel)
})
}
Expand All @@ -1004,3 +1060,76 @@ func syncMapLen(syncMap *sync.Map) int {
})
return length
}

func TestGetGraph(t *testing.T) {
ctx := context.Background()
pipelineId1 := uuid.New()
graph := "GRAPH"
err := cacheService.SetValue(ctx, pipelineId1, cache.Graph, graph)
if err != nil {
return
}
pipelineId2 := uuid.New()
err = cacheService.SetValue(ctx, pipelineId2, cache.Graph, 1)
if err != nil {
return
}
type args struct {
ctx context.Context
cacheService cache.Cache
key uuid.UUID
errorTitle string
}
tests := []struct {
name string
args args
want string
wantErr bool
}{
{
name: "get graph when key exist in cache",
args: args{
ctx: context.Background(),
cacheService: cacheService,
key: pipelineId1,
errorTitle: "error",
},
want: graph,
wantErr: false,
},
{
name: "get graph when key doesn't exist in cache",
args: args{
ctx: context.Background(),
cacheService: cacheService,
key: uuid.New(),
errorTitle: "error",
},
want: "",
wantErr: true,
},
{
name: "get graph when value from cache by key couldn't be converted to a string",
args: args{
ctx: context.Background(),
cacheService: cacheService,
key: pipelineId2,
errorTitle: "error",
},
want: "",
wantErr: true,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got, err := GetGraph(tt.args.ctx, tt.args.cacheService, tt.args.key, tt.args.errorTitle)
if (err != nil) != tt.wantErr {
t.Errorf("GetGraph() error = %v, wantErr %v", err, tt.wantErr)
return
}
if got != tt.want {
t.Errorf("GetGraph() got = %v, want %v", got, tt.want)
}
})
}
}