Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MINOR: Improve PeriodicBatchEntityTrigger Workflows performance #18664

Merged
merged 2 commits into from
Nov 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,10 @@ private WorkflowHandler(OpenMetadataApplicationConfig config) {
ProcessEngineConfiguration processEngineConfiguration =
new StandaloneProcessEngineConfiguration()
.setAsyncExecutorActivate(true)
.setAsyncExecutorCorePoolSize(50)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should these be configurable?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think they should, but we can iterate on that after having a decent default for production load.
I was thinking about having a Workflows Setting kind of thing within the OpenMetadata settings.
What do you think?

.setAsyncExecutorMaxPoolSize(100)
.setAsyncExecutorThreadPoolQueueSize(1000)
.setAsyncExecutorMaxAsyncJobsDuePerAcquisition(20)
.setJdbcUrl(config.getDataSourceFactory().getUrl())
.setJdbcUsername(config.getDataSourceFactory().getUser())
.setJdbcPassword(config.getDataSourceFactory().getPassword())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
import org.flowable.bpmn.model.FlowableListener;
import org.flowable.bpmn.model.Process;
import org.flowable.bpmn.model.StartEvent;
import org.openmetadata.service.governance.workflows.MainWorkflowHasFinishedListener;
import org.openmetadata.service.governance.workflows.MainWorkflowTerminationListener;
import org.openmetadata.service.governance.workflows.WorkflowInstanceExecutionIdSetterListener;
import org.openmetadata.service.governance.workflows.WorkflowInstanceStageListener;
Expand Down Expand Up @@ -52,15 +51,6 @@ default void attachWorkflowInstanceExecutionIdSetterListener(StartEvent startEve
startEvent.getExecutionListeners().add(listener);
}

default void attachMainWorkflowHasFinishedListener(EndEvent endEvent) {
FlowableListener listener =
new FlowableListenerBuilder()
.event("end")
.implementation(MainWorkflowHasFinishedListener.class.getName())
.build();
endEvent.getExecutionListeners().add(listener);
}

default void attachMainWorkflowTerminationListener(EndEvent endEvent) {
FlowableListener listener =
new FlowableListenerBuilder()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ default void attachWorkflowInstanceListeners(Process process) {
private List<FlowableListener> getWorkflowInstanceListeners() {
List<FlowableListener> listeners = new ArrayList<>();

List<String> events = List.of("start");
List<String> events = List.of("start", "end");
for (String event : events) {
FlowableListener listener =
new FlowableListenerBuilder()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,10 +44,21 @@ private void setStatus(
String originalJson = JsonUtils.pojoToJson(entity);

Optional<String> oCertification = Optional.ofNullable(certification);
Optional<AssetCertification> oEntityCertification =
Optional.ofNullable(entity.getCertification());

if (oCertification.isEmpty() && oEntityCertification.isEmpty()) {
return;
}

if (oCertification.isEmpty()) {
entity.setCertification(null);
} else {

if (oCertification.get().equals(oEntityCertification.get().getTagLabel().getTagFQN())) {
return;
}

AssetCertification assetCertification =
new AssetCertification()
.withTagLabel(EntityUtil.toTagLabel(TagLabelUtil.getTag(certification)));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ public class EndEvent implements NodeInterface {
public EndEvent(EndEventDefinition nodeDefinition) {
this.endEvent = new EndEventBuilder().id(nodeDefinition.getName()).build();
attachWorkflowInstanceStageListeners(endEvent);
attachMainWorkflowHasFinishedListener(endEvent);
}

public void addToWorkflow(BpmnModel model, Process process) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,14 +1,17 @@
package org.openmetadata.service.governance.workflows.elements.triggers;

import static org.openmetadata.service.governance.workflows.Workflow.RELATED_ENTITY_VARIABLE;
import static org.openmetadata.service.governance.workflows.Workflow.getFlowableElementId;

import java.util.ArrayList;
import java.util.List;
import java.util.ListIterator;
import lombok.Getter;
import org.flowable.bpmn.model.BpmnModel;
import org.flowable.bpmn.model.CallActivity;
import org.flowable.bpmn.model.EndEvent;
import org.flowable.bpmn.model.FieldExtension;
import org.flowable.bpmn.model.IOParameter;
import org.flowable.bpmn.model.Process;
import org.flowable.bpmn.model.SequenceFlow;
import org.flowable.bpmn.model.ServiceTask;
Expand All @@ -18,7 +21,8 @@
import org.openmetadata.schema.governance.workflows.elements.triggers.Event;
import org.openmetadata.schema.governance.workflows.elements.triggers.EventBasedEntityTriggerDefinition;
import org.openmetadata.service.governance.workflows.elements.TriggerInterface;
import org.openmetadata.service.governance.workflows.elements.triggers.impl.TriggerEntityWorkflowImpl;
import org.openmetadata.service.governance.workflows.elements.triggers.impl.FilterEntityImpl;
import org.openmetadata.service.governance.workflows.flowable.builders.CallActivityBuilder;
import org.openmetadata.service.governance.workflows.flowable.builders.EndEventBuilder;
import org.openmetadata.service.governance.workflows.flowable.builders.FieldExtensionBuilder;
import org.openmetadata.service.governance.workflows.flowable.builders.ServiceTaskBuilder;
Expand All @@ -32,6 +36,8 @@ public class EventBasedEntityTrigger implements TriggerInterface {
private final List<StartEvent> startEvents = new ArrayList<>();
private final List<Signal> signals = new ArrayList<>();

public static String PASSES_FILTER_VARIABLE = "passesFilter";

public EventBasedEntityTrigger(
String mainWorkflowName,
String triggerWorkflowId,
Expand All @@ -43,20 +49,34 @@ public EventBasedEntityTrigger(

setStartEvents(triggerWorkflowId, triggerDefinition);

ServiceTask triggerWorkflow =
getWorkflowTriggerTask(triggerWorkflowId, mainWorkflowName, triggerDefinition);
triggerWorkflow.setAsynchronous(true);
process.addFlowElement(triggerWorkflow);
ServiceTask filterTask = getFilterTask(triggerWorkflowId, triggerDefinition);
process.addFlowElement(filterTask);

CallActivity workflowTrigger = getWorkflowTrigger(triggerWorkflowId, mainWorkflowName);
process.addFlowElement(workflowTrigger);

EndEvent endEvent =
new EndEventBuilder().id(getFlowableElementId(triggerWorkflowId, "endEvent")).build();
process.addFlowElement(endEvent);

// Start Events -> FilterTask
for (StartEvent startEvent : startEvents) {
process.addFlowElement(startEvent);
process.addFlowElement(new SequenceFlow(startEvent.getId(), triggerWorkflow.getId()));
process.addFlowElement(new SequenceFlow(startEvent.getId(), filterTask.getId()));
}
process.addFlowElement(new SequenceFlow(triggerWorkflow.getId(), endEvent.getId()));

SequenceFlow filterPassed = new SequenceFlow(filterTask.getId(), workflowTrigger.getId());
filterPassed.setConditionExpression(String.format("${%s}", PASSES_FILTER_VARIABLE));

SequenceFlow filterNotPassed = new SequenceFlow(filterTask.getId(), endEvent.getId());
filterNotPassed.setConditionExpression(String.format("${!%s}", PASSES_FILTER_VARIABLE));

// FilterTask -> WorkflowTrigger (if passes filter)
process.addFlowElement(filterPassed);
// FilterTask -> End (if not passes filter)
process.addFlowElement(filterNotPassed);
// WorkflowTrigger -> End
process.addFlowElement(new SequenceFlow(workflowTrigger.getId(), endEvent.getId()));

this.process = process;
this.triggerWorkflowId = triggerWorkflowId;
Expand Down Expand Up @@ -94,16 +114,25 @@ private void setStartEvents(
}
}

private ServiceTask getWorkflowTriggerTask(
String workflowTriggerId,
String mainWorkflowName,
EventBasedEntityTriggerDefinition triggerDefinition) {
FieldExtension workflowNameExpr =
new FieldExtensionBuilder()
.fieldName("workflowNameExpr")
.fieldValue(mainWorkflowName)
private CallActivity getWorkflowTrigger(String triggerWorkflowId, String mainWorkflowName) {
CallActivity workflowTrigger =
new CallActivityBuilder()
.id(getFlowableElementId(triggerWorkflowId, "workflowTrigger"))
.calledElement(mainWorkflowName)
.inheritBusinessKey(true)
.build();

IOParameter inputParameter = new IOParameter();
inputParameter.setSource(RELATED_ENTITY_VARIABLE);
inputParameter.setTarget(RELATED_ENTITY_VARIABLE);

workflowTrigger.setInParameters(List.of(inputParameter));

return workflowTrigger;
}

private ServiceTask getFilterTask(
String workflowTriggerId, EventBasedEntityTriggerDefinition triggerDefinition) {
FieldExtension excludedFilterExpr =
new FieldExtensionBuilder()
.fieldName("excludedFilterExpr")
Expand All @@ -112,10 +141,9 @@ private ServiceTask getWorkflowTriggerTask(

ServiceTask serviceTask =
new ServiceTaskBuilder()
.id(getFlowableElementId(workflowTriggerId, "workflowTrigger"))
.implementation(TriggerEntityWorkflowImpl.class.getName())
.id(getFlowableElementId(workflowTriggerId, "filterTask"))
.implementation(FilterEntityImpl.class.getName())
.build();
serviceTask.getFieldExtensions().add(workflowNameExpr);
serviceTask.getFieldExtensions().add(excludedFilterExpr);

return serviceTask;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,16 @@
package org.openmetadata.service.governance.workflows.elements.triggers;

import static org.openmetadata.service.governance.workflows.Workflow.RELATED_ENTITY_VARIABLE;
import static org.openmetadata.service.governance.workflows.Workflow.getFlowableElementId;

import java.util.List;
import lombok.Getter;
import org.flowable.bpmn.model.BpmnModel;
import org.flowable.bpmn.model.CallActivity;
import org.flowable.bpmn.model.EndEvent;
import org.flowable.bpmn.model.FieldExtension;
import org.flowable.bpmn.model.IOParameter;
import org.flowable.bpmn.model.MultiInstanceLoopCharacteristics;
import org.flowable.bpmn.model.Process;
import org.flowable.bpmn.model.SequenceFlow;
import org.flowable.bpmn.model.ServiceTask;
Expand All @@ -15,9 +20,11 @@
import org.openmetadata.schema.governance.workflows.elements.nodes.trigger.PeriodicBatchEntityTriggerDefinition;
import org.openmetadata.service.apps.scheduler.AppScheduler;
import org.openmetadata.service.governance.workflows.elements.TriggerInterface;
import org.openmetadata.service.governance.workflows.elements.triggers.impl.TriggerBatchEntityWorkflowImpl;
import org.openmetadata.service.governance.workflows.elements.triggers.impl.FetchEntitiesImpl;
import org.openmetadata.service.governance.workflows.flowable.builders.CallActivityBuilder;
import org.openmetadata.service.governance.workflows.flowable.builders.EndEventBuilder;
import org.openmetadata.service.governance.workflows.flowable.builders.FieldExtensionBuilder;
import org.openmetadata.service.governance.workflows.flowable.builders.MultiInstanceLoopCharacteristicsBuilder;
import org.openmetadata.service.governance.workflows.flowable.builders.ServiceTaskBuilder;
import org.openmetadata.service.governance.workflows.flowable.builders.StartEventBuilder;
import org.quartz.CronTrigger;
Expand All @@ -26,6 +33,9 @@ public class PeriodicBatchEntityTrigger implements TriggerInterface {
private final Process process;

@Getter private final String triggerWorkflowId;
public static String HAS_FINISHED_VARIABLE = "hasFinished";
public static String CARDINALITY_VARIABLE = "numberOfEntities";
public static String COLLECTION_VARIABLE = "entityList";

public PeriodicBatchEntityTrigger(
String mainWorkflowName,
Expand All @@ -44,16 +54,31 @@ public PeriodicBatchEntityTrigger(
startEvent.addEventDefinition(timerDefinition);
process.addFlowElement(startEvent);

ServiceTask workflowTriggerTask =
getWorkflowTriggerTask(triggerWorkflowId, mainWorkflowName, triggerDefinition);
process.addFlowElement(workflowTriggerTask);
ServiceTask fetchEntitiesTask = getFetchEntitiesTask(triggerWorkflowId, triggerDefinition);
process.addFlowElement(fetchEntitiesTask);

CallActivity workflowTrigger =
getWorkflowTriggerCallActivity(triggerWorkflowId, mainWorkflowName);
process.addFlowElement(workflowTrigger);

EndEvent endEvent =
new EndEventBuilder().id(getFlowableElementId(triggerWorkflowId, "endEvent")).build();
process.addFlowElement(endEvent);

process.addFlowElement(new SequenceFlow(startEvent.getId(), workflowTriggerTask.getId()));
process.addFlowElement(new SequenceFlow(workflowTriggerTask.getId(), endEvent.getId()));
SequenceFlow finished = new SequenceFlow(fetchEntitiesTask.getId(), endEvent.getId());
finished.setConditionExpression(String.format("${%s}", HAS_FINISHED_VARIABLE));

SequenceFlow notFinished = new SequenceFlow(fetchEntitiesTask.getId(), workflowTrigger.getId());
notFinished.setConditionExpression(String.format("${!%s}", HAS_FINISHED_VARIABLE));

// Start -> Fetch Entities
process.addFlowElement(new SequenceFlow(startEvent.getId(), fetchEntitiesTask.getId()));
// Fetch Entities -> End
process.addFlowElement(finished);
// Fetch Entities -> WorkflowTrigger
process.addFlowElement(notFinished);
// WorkflowTrigger -> Fetch Entities (Loop Back to get next batch)
process.addFlowElement(new SequenceFlow(workflowTrigger.getId(), fetchEntitiesTask.getId()));

this.process = process;
this.triggerWorkflowId = triggerWorkflowId;
Expand All @@ -69,10 +94,34 @@ private TimerEventDefinition getTimerEventDefinition(AppSchedule schedule) {
return timerDefinition;
}

private ServiceTask getWorkflowTriggerTask(
String workflowTriggerId,
String mainWorkflowName,
PeriodicBatchEntityTriggerDefinition triggerDefinition) {
private CallActivity getWorkflowTriggerCallActivity(
String triggerWorkflowId, String mainWorkflowName) {
MultiInstanceLoopCharacteristics multiInstance =
new MultiInstanceLoopCharacteristicsBuilder()
.loopCardinality(String.format("${%s}", CARDINALITY_VARIABLE))
.inputDataItem(COLLECTION_VARIABLE)
.elementVariable(RELATED_ENTITY_VARIABLE)
.build();

CallActivity workflowTrigger =
new CallActivityBuilder()
.id(getFlowableElementId(triggerWorkflowId, "workflowTrigger"))
.calledElement(mainWorkflowName)
.inheritBusinessKey(true)
.build();

IOParameter inputParameter = new IOParameter();
inputParameter.setSource(RELATED_ENTITY_VARIABLE);
inputParameter.setTarget(RELATED_ENTITY_VARIABLE);

workflowTrigger.setInParameters(List.of(inputParameter));
workflowTrigger.setLoopCharacteristics(multiInstance);

return workflowTrigger;
}

private ServiceTask getFetchEntitiesTask(
String workflowTriggerId, PeriodicBatchEntityTriggerDefinition triggerDefinition) {
FieldExtension entityTypeExpr =
new FieldExtensionBuilder()
.fieldName("entityTypeExpr")
Expand All @@ -85,12 +134,6 @@ private ServiceTask getWorkflowTriggerTask(
.fieldValue(triggerDefinition.getConfig().getFilters())
.build();

FieldExtension workflowNameExpr =
new FieldExtensionBuilder()
.fieldName("workflowNameExpr")
.fieldValue(mainWorkflowName)
.build();

FieldExtension batchSizeExpr =
new FieldExtensionBuilder()
.fieldName("batchSizeExpr")
Expand All @@ -99,13 +142,12 @@ private ServiceTask getWorkflowTriggerTask(

ServiceTask serviceTask =
new ServiceTaskBuilder()
.id(getFlowableElementId(workflowTriggerId, "workflowTrigger"))
.implementation(TriggerBatchEntityWorkflowImpl.class.getName())
.id(getFlowableElementId(workflowTriggerId, "fetchEntityTask"))
.implementation(FetchEntitiesImpl.class.getName())
.build();

serviceTask.getFieldExtensions().add(entityTypeExpr);
serviceTask.getFieldExtensions().add(searchFilterExpr);
serviceTask.getFieldExtensions().add(workflowNameExpr);
serviceTask.getFieldExtensions().add(batchSizeExpr);

return serviceTask;
Expand Down
Loading
Loading