Skip to content

Commit

Permalink
Merge #3235
Browse files Browse the repository at this point in the history
3235: feat(broker): get workflow outcome r=deepthidevaki a=deepthidevaki

## Description

* Clients can create a workflow instance via a new grpc request `CreateWorkflowInstanceWithResult` and get the response when the workflow is completed.

## Related issues

closes #2896

#

Co-authored-by: Deepthi Devaki Akkoorath <[email protected]>
  • Loading branch information
zeebe-bors[bot] and deepthidevaki authored Oct 22, 2019
2 parents 9262158 + 06dfd7a commit 91928cd
Show file tree
Hide file tree
Showing 14 changed files with 469 additions and 20 deletions.
39 changes: 39 additions & 0 deletions docs/src/reference/grpc.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
* [CancelWorkflowInstance RPC](#cancelworkflowinstance-rpc)
* [CompleteJob RPC](#completejob-rpc)
* [CreateWorkflowInstance RPC](#createworkflowinstance-rpc)
* [CreateWorkflowInstanceWithResult RPC](#createworkflowinstance-rpc)
* [DeployWorkflow RPC](#deployworkflow-rpc)
* [FailJob RPC](#failjob-rpc)
* [PublishMessage RPC](#publishmessage-rpc)
Expand Down Expand Up @@ -249,6 +250,44 @@ message CreateWorkflowInstanceResponse {
}
```

### CreateWorkflowInstanceWithResult RPC

Similar to `CreateWorkflowInstance RPC` , creates and starts an instance of the specified workflow.
Unlike `CreateWorkflowInstance RPC`, the response is returned when the workflow is completed.

Note that only workflows with none start events can be started through this command.

#### Input: CreateWorkflowInstanceWithResultRequest

```protobuf
message CreateWorkflowInstanceRequest {
CreateWorkflowInstanceRequest request = 1;
// timeout in milliseconds. the request will be closed if the workflow is not completed before
// the requestTimeout.
// if requestTimeout = 0, uses the generic requestTimeout configured in the gateway.
int64 requestTimeout = 2;
}
```

#### Output: CreateWorkflowInstanceWithResultResponse

```protobuf
message CreateWorkflowInstanceResponse {
// the key of the workflow definition which was used to create the workflow instance
int64 workflowKey = 1;
// the BPMN process ID of the workflow definition which was used to create the workflow
// instance
string bpmnProcessId = 2;
// the version of the workflow definition which was used to create the workflow instance
int32 version = 3;
// the unique identifier of the created workflow instance; to be used wherever a request
// needs a workflow instance key (e.g. CancelWorkflowInstanceRequest)
int64 workflowInstanceKey = 4;
// consisting of all visible variables to the root scope
string variables = 5;
}
```

#### Errors

##### GRPC_STATUS_NOT_FOUND
Expand Down
30 changes: 30 additions & 0 deletions gateway-protocol/src/main/proto/gateway.proto
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,30 @@ message CreateWorkflowInstanceResponse {
int64 workflowInstanceKey = 4;
}

message CreateWorkflowInstanceWithResultRequest {
CreateWorkflowInstanceRequest request = 1;
// timeout in milliseconds. the request will be closed if the workflow is not completed
// before the requestTimeout.
// if requestTimeout = 0, uses the generic requestTimeout configured in the gateway.
int64 requestTimeout = 2;
}

message CreateWorkflowInstanceWithResultResponse {
// the key of the workflow definition which was used to create the workflow instance
int64 workflowKey = 1;
// the BPMN process ID of the workflow definition which was used to create the workflow
// instance
string bpmnProcessId = 2;
// the version of the workflow definition which was used to create the workflow instance
int32 version = 3;
// the unique identifier of the created workflow instance; to be used wherever a request
// needs a workflow instance key (e.g. CancelWorkflowInstanceRequest)
int64 workflowInstanceKey = 4;
// consisting of all visible variables to the root scope
string variables = 5;

}

message DeployWorkflowRequest {
// List of workflow resources to deploy
repeated WorkflowRequestObject workflows = 1;
Expand Down Expand Up @@ -329,6 +353,12 @@ service Gateway {
rpc CreateWorkflowInstance (CreateWorkflowInstanceRequest) returns (CreateWorkflowInstanceResponse) {
}

/*
Behaves similarly to `rpc CreateWorkflowInstance`, except that a successful response is received when the workflow completes successfully.
*/
rpc CreateWorkflowInstanceWithResult (CreateWorkflowInstanceWithResultRequest) returns (CreateWorkflowInstanceWithResultResponse) {
}

/*
Deploys one or more workflows to Zeebe. Note that this is an atomic call,
i.e. either all workflows are deployed, or none of them are.
Expand Down
50 changes: 50 additions & 0 deletions gateway-protocol/src/main/proto/proto.lock
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,51 @@
}
]
},
{
"name": "CreateWorkflowInstanceWithResultRequest",
"fields": [
{
"id": 1,
"name": "request",
"type": "CreateWorkflowInstanceRequest"
},
{
"id": 2,
"name": "requestTimeout",
"type": "int64"
}
]
},
{
"name": "CreateWorkflowInstanceWithResultResponse",
"fields": [
{
"id": 1,
"name": "workflowKey",
"type": "int64"
},
{
"id": 2,
"name": "bpmnProcessId",
"type": "string"
},
{
"id": 3,
"name": "version",
"type": "int32"
},
{
"id": 4,
"name": "workflowInstanceKey",
"type": "int64"
},
{
"id": 5,
"name": "variables",
"type": "string"
}
]
},
{
"name": "DeployWorkflowRequest",
"fields": [
Expand Down Expand Up @@ -510,6 +555,11 @@
"in_type": "CreateWorkflowInstanceRequest",
"out_type": "CreateWorkflowInstanceResponse"
},
{
"name": "CreateWorkflowInstanceWithResult",
"in_type": "CreateWorkflowInstanceWithResultRequest",
"out_type": "CreateWorkflowInstanceWithResultResponse"
},
{
"name": "DeployWorkflow",
"in_type": "DeployWorkflowRequest",
Expand Down
83 changes: 72 additions & 11 deletions gateway/src/main/java/io/zeebe/gateway/EndpointManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@
import io.zeebe.gateway.protocol.GatewayOuterClass.CompleteJobResponse;
import io.zeebe.gateway.protocol.GatewayOuterClass.CreateWorkflowInstanceRequest;
import io.zeebe.gateway.protocol.GatewayOuterClass.CreateWorkflowInstanceResponse;
import io.zeebe.gateway.protocol.GatewayOuterClass.CreateWorkflowInstanceWithResultRequest;
import io.zeebe.gateway.protocol.GatewayOuterClass.CreateWorkflowInstanceWithResultResponse;
import io.zeebe.gateway.protocol.GatewayOuterClass.DeployWorkflowRequest;
import io.zeebe.gateway.protocol.GatewayOuterClass.DeployWorkflowResponse;
import io.zeebe.gateway.protocol.GatewayOuterClass.FailJobRequest;
Expand All @@ -51,6 +53,7 @@
import io.zeebe.gateway.protocol.GatewayOuterClass.UpdateJobRetriesRequest;
import io.zeebe.gateway.protocol.GatewayOuterClass.UpdateJobRetriesResponse;
import io.zeebe.msgpack.MsgpackPropertyException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutionException;
Expand Down Expand Up @@ -141,6 +144,26 @@ public void createWorkflowInstance(
responseObserver);
}

@Override
public void createWorkflowInstanceWithResult(
CreateWorkflowInstanceWithResultRequest request,
StreamObserver<CreateWorkflowInstanceWithResultResponse> responseObserver) {
if (request.getRequestTimeout() > 0) {
sendRequest(
request,
RequestMapper::toCreateWorkflowInstanceWithResultRequest,
ResponseMapper::toCreateWorkflowInstanceWithResultResponse,
responseObserver,
Duration.ofMillis(request.getRequestTimeout()));
} else {
sendRequest(
request,
RequestMapper::toCreateWorkflowInstanceWithResultRequest,
ResponseMapper::toCreateWorkflowInstanceWithResultResponse,
responseObserver);
}
}

@Override
public void deployWorkflow(
final DeployWorkflowRequest request,
Expand Down Expand Up @@ -246,27 +269,65 @@ private <GrpcRequestT, BrokerResponseT, GrpcResponseT> void sendRequest(
final BrokerResponseMapper<BrokerResponseT, GrpcResponseT> responseMapper,
final StreamObserver<GrpcResponseT> streamObserver) {

final BrokerRequest<BrokerResponseT> brokerRequest =
mapRequest(grpcRequest, requestMapper, streamObserver);
if (brokerRequest == null) {
return;
}

brokerClient.sendRequest(
brokerRequest,
(key, response) -> consumeResponse(responseMapper, streamObserver, key, response),
error -> streamObserver.onError(convertThrowable(error)));
}

private <GrpcRequestT, BrokerResponseT, GrpcResponseT> void sendRequest(
final GrpcRequestT grpcRequest,
final Function<GrpcRequestT, BrokerRequest<BrokerResponseT>> requestMapper,
final BrokerResponseMapper<BrokerResponseT, GrpcResponseT> responseMapper,
final StreamObserver<GrpcResponseT> streamObserver,
Duration timeout) {

final BrokerRequest<BrokerResponseT> brokerRequest =
mapRequest(grpcRequest, requestMapper, streamObserver);
if (brokerRequest == null) {
return;
}

brokerClient.sendRequest(
brokerRequest,
(key, response) -> consumeResponse(responseMapper, streamObserver, key, response),
error -> streamObserver.onError(convertThrowable(error)),
timeout);
}

private <BrokerResponseT, GrpcResponseT> void consumeResponse(
BrokerResponseMapper<BrokerResponseT, GrpcResponseT> responseMapper,
StreamObserver<GrpcResponseT> streamObserver,
long key,
BrokerResponseT response) {
final GrpcResponseT grpcResponse = responseMapper.apply(key, response);
streamObserver.onNext(grpcResponse);
streamObserver.onCompleted();
}

private <GrpcRequestT, BrokerResponseT, GrpcResponseT> BrokerRequest<BrokerResponseT> mapRequest(
final GrpcRequestT grpcRequest,
final Function<GrpcRequestT, BrokerRequest<BrokerResponseT>> requestMapper,
final StreamObserver<GrpcResponseT> streamObserver) {
final BrokerRequest<BrokerResponseT> brokerRequest;
try {
brokerRequest = requestMapper.apply(grpcRequest);
} catch (MsgpackPropertyException e) {
streamObserver.onError(
convertThrowable(
new GrpcStatusExceptionImpl(e.getMessage(), Status.INVALID_ARGUMENT, e)));
return;
return null;
} catch (Exception e) {
streamObserver.onError(convertThrowable(e));
return;
return null;
}

brokerClient.sendRequest(
brokerRequest,
(key, response) -> {
final GrpcResponseT grpcResponse = responseMapper.apply(key, response);
streamObserver.onNext(grpcResponse);
streamObserver.onCompleted();
},
error -> streamObserver.onError(convertThrowable(error)));
return brokerRequest;
}

public static StatusRuntimeException convertThrowable(Throwable cause) {
Expand Down
18 changes: 18 additions & 0 deletions gateway/src/main/java/io/zeebe/gateway/RequestMapper.java
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import io.zeebe.gateway.impl.broker.request.BrokerCancelWorkflowInstanceRequest;
import io.zeebe.gateway.impl.broker.request.BrokerCompleteJobRequest;
import io.zeebe.gateway.impl.broker.request.BrokerCreateWorkflowInstanceRequest;
import io.zeebe.gateway.impl.broker.request.BrokerCreateWorkflowInstanceWithResultRequest;
import io.zeebe.gateway.impl.broker.request.BrokerDeployWorkflowRequest;
import io.zeebe.gateway.impl.broker.request.BrokerFailJobRequest;
import io.zeebe.gateway.impl.broker.request.BrokerPublishMessageRequest;
Expand All @@ -21,6 +22,7 @@
import io.zeebe.gateway.protocol.GatewayOuterClass.CancelWorkflowInstanceRequest;
import io.zeebe.gateway.protocol.GatewayOuterClass.CompleteJobRequest;
import io.zeebe.gateway.protocol.GatewayOuterClass.CreateWorkflowInstanceRequest;
import io.zeebe.gateway.protocol.GatewayOuterClass.CreateWorkflowInstanceWithResultRequest;
import io.zeebe.gateway.protocol.GatewayOuterClass.DeployWorkflowRequest;
import io.zeebe.gateway.protocol.GatewayOuterClass.FailJobRequest;
import io.zeebe.gateway.protocol.GatewayOuterClass.PublishMessageRequest;
Expand Down Expand Up @@ -89,6 +91,22 @@ public static BrokerCreateWorkflowInstanceRequest toCreateWorkflowInstanceReques
return brokerRequest;
}

public static BrokerCreateWorkflowInstanceWithResultRequest
toCreateWorkflowInstanceWithResultRequest(
CreateWorkflowInstanceWithResultRequest grpcRequest) {
final BrokerCreateWorkflowInstanceWithResultRequest brokerRequest =
new BrokerCreateWorkflowInstanceWithResultRequest();

final CreateWorkflowInstanceRequest request = grpcRequest.getRequest();
brokerRequest
.setBpmnProcessId(request.getBpmnProcessId())
.setKey(request.getWorkflowKey())
.setVersion(request.getVersion())
.setVariables(ensureJsonSet(request.getVariables()));

return brokerRequest;
}

public static BrokerCancelWorkflowInstanceRequest toCancelWorkflowInstanceRequest(
CancelWorkflowInstanceRequest grpcRequest) {
final BrokerCancelWorkflowInstanceRequest brokerRequest =
Expand Down
13 changes: 13 additions & 0 deletions gateway/src/main/java/io/zeebe/gateway/ResponseMapper.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import io.zeebe.gateway.protocol.GatewayOuterClass.CancelWorkflowInstanceResponse;
import io.zeebe.gateway.protocol.GatewayOuterClass.CompleteJobResponse;
import io.zeebe.gateway.protocol.GatewayOuterClass.CreateWorkflowInstanceResponse;
import io.zeebe.gateway.protocol.GatewayOuterClass.CreateWorkflowInstanceWithResultResponse;
import io.zeebe.gateway.protocol.GatewayOuterClass.DeployWorkflowResponse;
import io.zeebe.gateway.protocol.GatewayOuterClass.FailJobResponse;
import io.zeebe.gateway.protocol.GatewayOuterClass.PublishMessageResponse;
Expand All @@ -30,6 +31,7 @@
import io.zeebe.protocol.impl.record.value.variable.VariableDocumentRecord;
import io.zeebe.protocol.impl.record.value.workflowinstance.WorkflowInstanceCreationRecord;
import io.zeebe.protocol.impl.record.value.workflowinstance.WorkflowInstanceRecord;
import io.zeebe.protocol.impl.record.value.workflowinstance.WorkflowInstanceResultRecord;
import java.util.Iterator;
import org.agrona.DirectBuffer;

Expand Down Expand Up @@ -81,6 +83,17 @@ public static CreateWorkflowInstanceResponse toCreateWorkflowInstanceResponse(
.build();
}

public static CreateWorkflowInstanceWithResultResponse toCreateWorkflowInstanceWithResultResponse(
long key, WorkflowInstanceResultRecord brokerResponse) {
return CreateWorkflowInstanceWithResultResponse.newBuilder()
.setWorkflowKey(brokerResponse.getWorkflowKey())
.setBpmnProcessId(bufferAsString(brokerResponse.getBpmnProcessIdBuffer()))
.setVersion(brokerResponse.getVersion())
.setWorkflowInstanceKey(brokerResponse.getWorkflowInstanceKey())
.setVariables(bufferAsJson(brokerResponse.getVariablesBuffer()))
.build();
}

public static CancelWorkflowInstanceResponse toCancelWorkflowInstanceResponse(
long key, WorkflowInstanceRecord brokerResponse) {
return CancelWorkflowInstanceResponse.getDefaultInstance();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import io.zeebe.gateway.impl.broker.request.BrokerRequest;
import io.zeebe.gateway.impl.broker.response.BrokerResponse;
import io.zeebe.util.sched.future.ActorFuture;
import java.time.Duration;
import java.util.function.Consumer;

public interface BrokerClient extends AutoCloseable {
Expand All @@ -24,6 +25,14 @@ <T> void sendRequest(
BrokerResponseConsumer<T> responseConsumer,
Consumer<Throwable> throwableConsumer);

<T> ActorFuture<BrokerResponse<T>> sendRequest(BrokerRequest<T> request, Duration requestTimeout);

<T> void sendRequest(
BrokerRequest<T> request,
BrokerResponseConsumer<T> responseConsumer,
Consumer<Throwable> throwableConsumer,
Duration requestTimeout);

BrokerTopologyManager getTopologyManager();

void subscribeJobAvailableNotification(String topic, Consumer<String> handler);
Expand Down
Loading

0 comments on commit 91928cd

Please sign in to comment.