Skip to content

Commit

Permalink
Fix #21977: Add Search transform to Go FhirIO (#21979)
Browse files Browse the repository at this point in the history
* parent 2574f91
author Lucas Nogueira <[email protected]> 1655155614 +0000
committer Lucas Nogueira <[email protected]> 1655882203 +0000

parent 2574f91
author Lucas Nogueira <[email protected]> 1655155614 +0000
committer Lucas Nogueira <[email protected]> 1655882058 +0000

squashed rebase conflict commits

add execute bundles transform with integration test

adjust import to follow convention

fix variable scope

adjust read unit tests

add comment to execute bundles transform

include error reason

make variable exported to fix integration test

adjust integration tests after merge

update license comment

add comment explaining the purpose of unexported transform function

remove unnecessary generic usage

remove coded mistakenly added

improve transaction vs batch bundle comment

use net/http constants instead of hardcoded values

adjust import spacing

improve error message

* add search transform

* extract content of lambda to function

* add integration tests

* improve naming and fix IDE warnings

* use more appropriate function to check for status

* add documentation and improve member variable name

* add unit tests

* use while loop style to shorten line

* run integration tests in same pipeline

* fix integration test on dataflow and make it cleaner

* add pagination test and improve test utils

* fix search request when resourceType is empty

* improve readability of pagination logic

* log upon nondetrimental errors

* add retry mechanism to prevent flaky test results

* remove backoff retry on read transform test since it should never be flaky

* add dummy content to resource to improve readability

* adjust spacing for consistency
  • Loading branch information
lnogueir authored Jul 1, 2022
1 parent 6e6e115 commit 32efddc
Show file tree
Hide file tree
Showing 10 changed files with 526 additions and 90 deletions.
86 changes: 77 additions & 9 deletions sdks/go/data/fhir_bundles/transaction-success.json
Original file line number Diff line number Diff line change
Expand Up @@ -5,20 +5,48 @@
{
"request": {
"method": "POST",
"url": "Patient"
"url": "Organization"
},
"resource": {
"name": [
"resourceType": "Organization",
"id": "b0e04623-b02c-3f8b-92ea-943fc4db60da",
"identifier": [
{
"use": "official",
"given": [
"John"
]
"system": "https://github.com/synthetichealth/synthea",
"value": "b0e04623-b02c-3f8b-92ea-943fc4db60da"
}
],
"gender": "male",
"birthDate": "1973-01-21",
"resourceType": "Patient"
"active": true,
"type": [
{
"coding": [
{
"system": "http://terminology.hl7.org/CodeSystem/organization-type",
"code": "prov",
"display": "Healthcare Provider"
}
],
"text": "Healthcare Provider"
}
],
"name": "LOWELL GENERAL HOSPITAL",
"telecom": [
{
"system": "phone",
"value": "9789376000"
}
],
"address": [
{
"line": [
"295 VARNUM AVENUE"
],
"city": "LOWELL",
"state": "MA",
"postalCode": "01854",
"country": "US"
}
]
}
},
{
Expand All @@ -30,6 +58,7 @@
"name": [
{
"use": "official",
"family": "Smith",
"given": [
"Alice"
]
Expand All @@ -39,6 +68,45 @@
"birthDate": "1970-01-01",
"resourceType": "Patient"
}
},
{
"request": {
"method": "POST",
"url": "Practitioner"
},
"resource": {
"resourceType": "Practitioner",
"name": [
{
"family": "Tillman293",
"given": [
"Franklin857"
],
"prefix": [
"Dr."
]
}
],
"telecom": [
{
"system": "email",
"value": "[email protected]",
"use": "work"
}
],
"address": [
{
"line": [
"295 VARNUM AVENUE"
],
"city": "LOWELL",
"state": "MA",
"postalCode": "01854",
"country": "US"
}
],
"gender": "male"
}
}
]
}
85 changes: 49 additions & 36 deletions sdks/go/pkg/beam/io/fhirio/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,31 +23,35 @@ import (
"context"
"io"
"net/http"
"regexp"
"time"

"github.com/apache/beam/sdks/v2/go/pkg/beam"
"github.com/apache/beam/sdks/v2/go/pkg/beam/core"
"github.com/apache/beam/sdks/v2/go/pkg/beam/internal/errors"
"google.golang.org/api/googleapi"
"google.golang.org/api/healthcare/v1"
"google.golang.org/api/option"
)

const (
UserAgent = "apache-beam-io-google-cloud-platform-healthcare/" + core.SdkVersion
baseMetricPrefix = "fhirio/"
UserAgent = "apache-beam-io-google-cloud-platform-healthcare/" + core.SdkVersion
baseMetricPrefix = "fhirio/"
errorCounterName = baseMetricPrefix + "resource_error_count"
successCounterName = baseMetricPrefix + "resource_success_count"
pageTokenParameterKey = "_page_token"
)

func executeRequestAndRecordLatency(ctx context.Context, latencyMs *beam.Distribution, requestSupplier func() (*http.Response, error)) (*http.Response, error) {
func executeAndRecordLatency[T any](ctx context.Context, latencyMs *beam.Distribution, executionSupplier func() (T, error)) (T, error) {
timeBeforeReadRequest := time.Now()
response, err := requestSupplier()
result, err := executionSupplier()
latencyMs.Update(ctx, time.Since(timeBeforeReadRequest).Milliseconds())
return response, err
return result, err
}

func extractBodyFrom(response *http.Response) (string, error) {
if isBadStatusCode(response.Status) {
return "", errors.Errorf("response contains bad status: [%v]", response.Status)
err := googleapi.CheckResponse(response)
if err != nil {
return "", errors.Wrapf(err, "response contains bad status: [%v]", response.Status)
}

bodyBytes, err := io.ReadAll(response.Body)
Expand All @@ -58,38 +62,39 @@ func extractBodyFrom(response *http.Response) (string, error) {
return string(bodyBytes), nil
}

func isBadStatusCode(status string) bool {
// 2XXs are successes, otherwise failure.
isMatch, err := regexp.MatchString("^2\\d{2}", status)
if err != nil {
return true
}
return !isMatch
type fhirStoreClient interface {
readResource(resourcePath string) (*http.Response, error)
executeBundle(storePath string, bundle []byte) (*http.Response, error)
search(storePath, resourceType string, queries map[string]string, pageToken string) (*http.Response, error)
}

type fhirioFnCommon struct {
client fhirStoreClient
resourcesErrorCount beam.Counter
resourcesSuccessCount beam.Counter
latencyMs beam.Distribution
type fhirStoreClientImpl struct {
fhirService *healthcare.ProjectsLocationsDatasetsFhirStoresFhirService
}

func (fnc *fhirioFnCommon) setup(namespace string) {
if fnc.client == nil {
fnc.client = newFhirStoreClient()
}
fnc.resourcesErrorCount = beam.NewCounter(namespace, baseMetricPrefix+"resource_error_count")
fnc.resourcesSuccessCount = beam.NewCounter(namespace, baseMetricPrefix+"resource_success_count")
fnc.latencyMs = beam.NewDistribution(namespace, baseMetricPrefix+"latency_ms")
func (c *fhirStoreClientImpl) readResource(resourcePath string) (*http.Response, error) {
return c.fhirService.Read(resourcePath).Do()
}

type fhirStoreClient interface {
readResource(resourcePath string) (*http.Response, error)
executeBundle(storePath string, bundle []byte) (*http.Response, error)
func (c *fhirStoreClientImpl) executeBundle(storePath string, bundle []byte) (*http.Response, error) {
return c.fhirService.ExecuteBundle(storePath, bytes.NewReader(bundle)).Do()
}

type fhirStoreClientImpl struct {
fhirService *healthcare.ProjectsLocationsDatasetsFhirStoresFhirService
func (c *fhirStoreClientImpl) search(storePath, resourceType string, queries map[string]string, pageToken string) (*http.Response, error) {
queryParams := make([]googleapi.CallOption, 0)
for key, value := range queries {
queryParams = append(queryParams, googleapi.QueryParameter(key, value))
}

if pageToken != "" {
queryParams = append(queryParams, googleapi.QueryParameter(pageTokenParameterKey, pageToken))
}

searchRequest := &healthcare.SearchResourcesRequest{}
if resourceType == "" {
return c.fhirService.Search(storePath, searchRequest).Do(queryParams...)
}
return c.fhirService.SearchType(storePath, resourceType, searchRequest).Do(queryParams...)
}

func newFhirStoreClient() *fhirStoreClientImpl {
Expand All @@ -100,10 +105,18 @@ func newFhirStoreClient() *fhirStoreClientImpl {
return &fhirStoreClientImpl{fhirService: healthcare.NewProjectsLocationsDatasetsFhirStoresFhirService(healthcareService)}
}

func (c *fhirStoreClientImpl) readResource(resourcePath string) (*http.Response, error) {
return c.fhirService.Read(resourcePath).Do()
type fnCommonVariables struct {
client fhirStoreClient
resourcesErrorCount beam.Counter
resourcesSuccessCount beam.Counter
latencyMs beam.Distribution
}

func (c *fhirStoreClientImpl) executeBundle(storePath string, bundle []byte) (*http.Response, error) {
return c.fhirService.ExecuteBundle(storePath, bytes.NewReader(bundle)).Do()
func (fnc *fnCommonVariables) setup(namespace string) {
if fnc.client == nil {
fnc.client = newFhirStoreClient()
}
fnc.resourcesErrorCount = beam.NewCounter(namespace, errorCounterName)
fnc.resourcesSuccessCount = beam.NewCounter(namespace, successCounterName)
fnc.latencyMs = beam.NewDistribution(namespace, baseMetricPrefix+"latency_ms")
}
20 changes: 15 additions & 5 deletions sdks/go/pkg/beam/io/fhirio/execute_bundles.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"context"
"encoding/json"
"net/http"
"regexp"
"strings"

"github.com/apache/beam/sdks/v2/go/pkg/beam"
Expand All @@ -41,7 +42,7 @@ func init() {
}

type executeBundleFn struct {
fhirioFnCommon
fnCommonVariables
successesCount beam.Counter
// Path to FHIR store where bundle requests will be executed on.
FhirStorePath string
Expand All @@ -52,12 +53,12 @@ func (fn executeBundleFn) String() string {
}

func (fn *executeBundleFn) Setup() {
fn.fhirioFnCommon.setup(fn.String())
fn.fnCommonVariables.setup(fn.String())
fn.successesCount = beam.NewCounter(fn.String(), baseMetricPrefix+"success_count")
}

func (fn *executeBundleFn) ProcessElement(ctx context.Context, inputBundleBody []byte, emitSuccess, emitFailure func(string)) {
response, err := executeRequestAndRecordLatency(ctx, &fn.latencyMs, func() (*http.Response, error) {
response, err := executeAndRecordLatency(ctx, &fn.latencyMs, func() (*http.Response, error) {
return fn.client.executeBundle(fn.FhirStorePath, inputBundleBody)
})
if err != nil {
Expand Down Expand Up @@ -114,7 +115,7 @@ func (fn *executeBundleFn) processResponseBody(ctx context.Context, body string,
continue
}

if isBadStatusCode(entryFields.Response.Status) {
if batchResponseStatusIsBad(entryFields.Response.Status) {
fn.resourcesErrorCount.Inc(ctx, 1)
emitFailure(errors.Errorf("execute bundles entry contains bad status: [%v]", entryFields.Response.Status).Error())
} else {
Expand All @@ -127,6 +128,15 @@ func (fn *executeBundleFn) processResponseBody(ctx context.Context, body string,
fn.successesCount.Inc(ctx, 1)
}

func batchResponseStatusIsBad(status string) bool {
// 2XXs are successes, otherwise failure.
isMatch, err := regexp.MatchString("^2\\d{2}", status)
if err != nil {
return true
}
return !isMatch
}

// ExecuteBundles performs all the requests in the specified bundles on a given
// FHIR store. This transform takes a path to a FHIR store and a PCollection of
// bundles as JSON-encoded strings. It executes the requests defined on the
Expand All @@ -142,5 +152,5 @@ func ExecuteBundles(s beam.Scope, fhirStorePath string, bundles beam.PCollection

// This is useful as an entry point for testing because we can provide a fake FHIR store client.
func executeBundles(s beam.Scope, fhirStorePath string, bundles beam.PCollection, client fhirStoreClient) (beam.PCollection, beam.PCollection) {
return beam.ParDo2(s, &executeBundleFn{fhirioFnCommon: fhirioFnCommon{client: client}, FhirStorePath: fhirStorePath}, bundles)
return beam.ParDo2(s, &executeBundleFn{fnCommonVariables: fnCommonVariables{client: client}, FhirStorePath: fhirStorePath}, bundles)
}
24 changes: 8 additions & 16 deletions sdks/go/pkg/beam/io/fhirio/execute_bundles_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,9 @@
package fhirio

import (
"bytes"
"io"
"net/http"
"strconv"
"strings"
"testing"

Expand All @@ -39,26 +40,17 @@ func TestExecuteBundles(t *testing.T) {
{
name: "Execute Bundles request returns bad status",
client: badStatusFakeClient,
containedError: fakeBadStatus,
containedError: strconv.Itoa(http.StatusForbidden),
},
{
name: "Execute Bundles request response body fails to be read",
client: bodyReaderErrorFakeClient,
containedError: fakeBodyReaderErrorMessage,
},
{
name: "Execute Bundles request response body failed to be decoded",
client: &fakeFhirStoreClient{
fakeExecuteBundles: func(storePath string, bundle []byte) (*http.Response, error) {
return &http.Response{
Body: &fakeReaderCloser{
fakeRead: func(t []byte) (int, error) {
return bytes.NewReader([]byte("")).Read(t)
},
}, Status: "200 Ok"}, nil
},
},
containedError: "EOF",
name: "Execute Bundles request response body failed to be decoded",
client: emptyResponseBodyFakeClient,
containedError: io.EOF.Error(),
},
}

Expand All @@ -73,9 +65,9 @@ func TestExecuteBundles(t *testing.T) {
return strings.Contains(errorMsg, testCase.containedError)
})
pipelineResult := ptest.RunAndValidate(t, p)
err := validateResourceErrorCounter(pipelineResult, len(testBundles))
err := validateCounter(pipelineResult, errorCounterName, len(testBundles))
if err != nil {
t.Fatalf("validateResourceErrorCounter returned error [%v]", err.Error())
t.Fatalf("validateCounter returned error [%v]", err.Error())
}
})
}
Expand Down
8 changes: 4 additions & 4 deletions sdks/go/pkg/beam/io/fhirio/read.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,19 +33,19 @@ func init() {
}

type readResourceFn struct {
fhirioFnCommon
fnCommonVariables
}

func (fn readResourceFn) String() string {
return "readResourceFn"
}

func (fn *readResourceFn) Setup() {
fn.fhirioFnCommon.setup(fn.String())
fn.fnCommonVariables.setup(fn.String())
}

func (fn *readResourceFn) ProcessElement(ctx context.Context, resourcePath string, emitResource, emitDeadLetter func(string)) {
response, err := executeRequestAndRecordLatency(ctx, &fn.latencyMs, func() (*http.Response, error) {
response, err := executeAndRecordLatency(ctx, &fn.latencyMs, func() (*http.Response, error) {
return fn.client.readResource(resourcePath)
})
if err != nil {
Expand Down Expand Up @@ -79,5 +79,5 @@ func Read(s beam.Scope, resourcePaths beam.PCollection) (beam.PCollection, beam.

// This is useful as an entry point for testing because we can provide a fake FHIR store client.
func read(s beam.Scope, resourcePaths beam.PCollection, client fhirStoreClient) (beam.PCollection, beam.PCollection) {
return beam.ParDo2(s, &readResourceFn{fhirioFnCommon: fhirioFnCommon{client: client}}, resourcePaths)
return beam.ParDo2(s, &readResourceFn{fnCommonVariables: fnCommonVariables{client: client}}, resourcePaths)
}
Loading

0 comments on commit 32efddc

Please sign in to comment.