+ * Get the first ip of {@param remoteAddress}.
+ * The X-Forwarded-For header may contain multiple IP addresses, separated
+ * by commas, and typically, the first non-unknown IP is considered to be the client's IP address.
+ *
+ *
+ * @author keyang.lk
+ * @date 2024-07-02
+ * @param remoteAddress
+ * @return The first ip of remoteAddress
+ */
+ public static String getFirstIpFromRemoteAddress(String remoteAddress) {
+ if (remoteAddress == null || remoteAddress.isEmpty() || "unknown".equalsIgnoreCase(remoteAddress)) {
+ return "N/A";
+ }
+ // 处理X-Forwarded-For可能包含多个IP地址的情况(由逗号分隔),通常第一个非unknown的IP是客户端的IP
+ String[] ips = remoteAddress.split(",");
+ for (String ip : ips) {
+ if (ip != null && !ip.isEmpty() &&
+ !"unknown".equalsIgnoreCase(ip.trim())) {
+ return ip.trim();
+ }
+ }
+ return remoteAddress;
+ }
}
diff --git a/server/odc-service/src/main/java/com/oceanbase/odc/service/flow/FlowInstanceService.java b/server/odc-service/src/main/java/com/oceanbase/odc/service/flow/FlowInstanceService.java
index f0acc1ee47..e1e2c3f03a 100644
--- a/server/odc-service/src/main/java/com/oceanbase/odc/service/flow/FlowInstanceService.java
+++ b/server/odc-service/src/main/java/com/oceanbase/odc/service/flow/FlowInstanceService.java
@@ -263,8 +263,6 @@ public class FlowInstanceService {
@Autowired
private EnvironmentRepository environmentRepository;
@Autowired
- private DBResourcePermissionHelper dbResourcePermissionHelper;
- @Autowired
private EnvironmentService environmentService;
private final List> dataTransferTaskInitHooks = new ArrayList<>();
@@ -790,8 +788,8 @@ private void checkCreateFlowInstancePermission(CreateFlowInstanceReq req) {
}
});
}
- List unauthorizedDBResources =
- dbResourcePermissionHelper.filterUnauthorizedDBResources(resource2Types, false);
+ List unauthorizedDBResources = this.permissionHelper
+ .filterUnauthorizedDBResources(resource2Types, false);
if (CollectionUtils.isNotEmpty(unauthorizedDBResources)) {
throw new BadRequestException(ErrorCodes.DatabaseAccessDenied,
new Object[] {unauthorizedDBResources.stream()
diff --git a/server/odc-service/src/main/java/com/oceanbase/odc/service/flow/task/BaseODCFlowTaskDelegate.java b/server/odc-service/src/main/java/com/oceanbase/odc/service/flow/task/BaseODCFlowTaskDelegate.java
index d10318e9ae..b8779086a5 100644
--- a/server/odc-service/src/main/java/com/oceanbase/odc/service/flow/task/BaseODCFlowTaskDelegate.java
+++ b/server/odc-service/src/main/java/com/oceanbase/odc/service/flow/task/BaseODCFlowTaskDelegate.java
@@ -26,6 +26,7 @@
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.commons.collections4.CollectionUtils;
import org.flowable.engine.delegate.DelegateExecution;
@@ -102,7 +103,7 @@ public abstract class BaseODCFlowTaskDelegate extends BaseRuntimeFlowableDele
private void init(DelegateExecution execution) {
this.taskId = FlowTaskUtil.getTaskId(execution);
- this.timeoutMillis = FlowTaskUtil.getExecutionExpirationIntervalMillis(execution);
+ this.timeoutMillis = getTimeoutMillis(execution);
this.taskService.updateExecutorInfo(taskId, new ExecutorInfo(hostProperties));
SecurityContextUtils.setCurrentUser(FlowTaskUtil.getTaskCreator(execution));
}
@@ -113,6 +114,7 @@ private void initMonitorExecutor() {
.build();
scheduleExecutor = new ScheduledThreadPoolExecutor(1, threadFactory);
int interval = RuntimeTaskConstants.DEFAULT_TASK_CHECK_INTERVAL_SECONDS;
+ AtomicBoolean isCancelled = new AtomicBoolean(false);
scheduleExecutor.scheduleAtFixedRate(() -> {
try {
updateHeartbeatTime();
@@ -124,6 +126,13 @@ private void initMonitorExecutor() {
}
try {
if (isCompleted() || isTimeout()) {
+ if (isTimeout() && !isCancelled.getAndSet(true)) {
+ try {
+ cancel(true);
+ } catch (Exception e) {
+ log.warn("Task is timeout, failed to cancel it, errorMessage={}", e.getMessage());
+ }
+ }
taskLatch.countDown();
}
} catch (Exception e) {
@@ -188,7 +197,7 @@ protected void run(DelegateExecution execution) throws Exception {
} catch (Exception e) {
log.warn("Task timeout callback method execution failed, taskId={}", taskId, e);
}
- throw new InterruptedException();
+ throw new ServiceTaskCancelledException();
}
if (!isSuccessful()) {
// 监控线程出错导致闭锁失效,此种情况任务必须终止
@@ -312,6 +321,10 @@ protected void onTimeout(Long taskId, TaskService taskService) {
}
}
+ protected long getTimeoutMillis(DelegateExecution execution) {
+ return FlowTaskUtil.getExecutionExpirationIntervalMillis(execution);
+ }
+
/**
* This method is scheduled periodically to update the progress of the task
*/
diff --git a/server/odc-service/src/main/java/com/oceanbase/odc/service/flow/task/DatabaseChangeRuntimeFlowableTask.java b/server/odc-service/src/main/java/com/oceanbase/odc/service/flow/task/DatabaseChangeRuntimeFlowableTask.java
index df804f412d..652a4ad26b 100644
--- a/server/odc-service/src/main/java/com/oceanbase/odc/service/flow/task/DatabaseChangeRuntimeFlowableTask.java
+++ b/server/odc-service/src/main/java/com/oceanbase/odc/service/flow/task/DatabaseChangeRuntimeFlowableTask.java
@@ -183,9 +183,6 @@ protected void onProgressUpdate(Long taskId, TaskService taskService) {
if (Objects.nonNull(asyncTaskThread)) {
double progress = asyncTaskThread.getProgressPercentage();
taskService.updateProgress(taskId, progress);
- if (System.currentTimeMillis() - asyncTaskThread.getStartTimestamp() > getTimeoutMillis()) {
- asyncTaskThread.stopTaskAndKillQuery(sessionManageFacade);
- }
}
}
diff --git a/server/odc-service/src/main/java/com/oceanbase/odc/service/flow/task/MockDataRuntimeFlowableTask.java b/server/odc-service/src/main/java/com/oceanbase/odc/service/flow/task/MockDataRuntimeFlowableTask.java
index 8ddec0a41f..5ec5ac4b2f 100644
--- a/server/odc-service/src/main/java/com/oceanbase/odc/service/flow/task/MockDataRuntimeFlowableTask.java
+++ b/server/odc-service/src/main/java/com/oceanbase/odc/service/flow/task/MockDataRuntimeFlowableTask.java
@@ -98,8 +98,7 @@ public boolean isCancelled() {
private MockTaskConfig getMockTaskConfig(Long taskId, DelegateExecution execution) {
OdcMockTaskConfig config = FlowTaskUtil.getMockParameter(execution);
this.connectionConfig = FlowTaskUtil.getConnectionConfig(execution);
- return FlowTaskUtil.generateMockConfig(taskId, execution, getTimeoutMillis(),
- config, mockProperties);
+ return FlowTaskUtil.generateMockConfig(taskId, execution, getTimeoutMillis(), config, mockProperties);
}
@Override
diff --git a/server/odc-service/src/main/java/com/oceanbase/odc/service/permission/DBResourcePermissionHelper.java b/server/odc-service/src/main/java/com/oceanbase/odc/service/permission/DBResourcePermissionHelper.java
index 21aab9aec4..674cbce4f4 100644
--- a/server/odc-service/src/main/java/com/oceanbase/odc/service/permission/DBResourcePermissionHelper.java
+++ b/server/odc-service/src/main/java/com/oceanbase/odc/service/permission/DBResourcePermissionHelper.java
@@ -22,6 +22,7 @@
import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Objects;
import java.util.Set;
import java.util.TreeMap;
import java.util.TreeSet;
@@ -32,12 +33,15 @@
import org.springframework.stereotype.Component;
import com.oceanbase.odc.core.authority.util.SkipAuthorize;
+import com.oceanbase.odc.core.shared.constant.DialectType;
import com.oceanbase.odc.core.shared.constant.ErrorCodes;
import com.oceanbase.odc.core.shared.constant.OrganizationType;
import com.oceanbase.odc.core.shared.constant.ResourceRoleName;
import com.oceanbase.odc.core.shared.constant.ResourceType;
import com.oceanbase.odc.core.shared.exception.AccessDeniedException;
import com.oceanbase.odc.core.shared.exception.BadRequestException;
+import com.oceanbase.odc.metadb.connection.ConnectionConfigRepository;
+import com.oceanbase.odc.metadb.connection.ConnectionEntity;
import com.oceanbase.odc.metadb.connection.DatabaseEntity;
import com.oceanbase.odc.metadb.connection.DatabaseRepository;
import com.oceanbase.odc.metadb.dbobject.DBObjectEntity;
@@ -83,6 +87,9 @@ public class DBResourcePermissionHelper {
@Autowired
private PermissionCheckWhitelist permissionCheckWhitelist;
+ @Autowired
+ private ConnectionConfigRepository connectionConfigRepository;
+
private static final Set ORACLE_DATA_DICTIONARY = new TreeSet<>(String.CASE_INSENSITIVE_ORDER);
private static final Set MYSQL_DATA_DICTIONARY = new TreeSet<>(String.CASE_INSENSITIVE_ORDER);
@@ -106,13 +113,19 @@ public void checkDBPermissions(Collection databaseIds, Collection entities = databaseRepository.findByIdIn(databaseIds);
+ Set connectionIds = entities.stream()
+ .map(DatabaseEntity::getConnectionId)
+ .filter(Objects::nonNull).collect(Collectors.toSet());
+ Map id2Entity = this.connectionConfigRepository.findByIdIn(connectionIds)
+ .stream().collect(Collectors.toMap(ConnectionEntity::getId, e -> e));
List toCheckDatabaseIds = new ArrayList<>();
Set projectIds = getPermittedProjectIds();
for (DatabaseEntity e : entities) {
if (e.getProjectId() == null) {
throw new AccessDeniedException("Database is not belong to any project");
}
- if (permissionCheckWhitelist.containsDatabase(e.getName(), e.getDialectType())
+ DialectType dialectType = id2Entity.get(e.getConnectionId()).getDialectType();
+ if (permissionCheckWhitelist.containsDatabase(e.getName(), dialectType)
|| projectIds.contains(e.getProjectId())) {
continue;
}
diff --git a/server/odc-service/src/main/java/com/oceanbase/odc/service/session/ConnectConsoleService.java b/server/odc-service/src/main/java/com/oceanbase/odc/service/session/ConnectConsoleService.java
index c6e3ff673e..6ed7f89265 100644
--- a/server/odc-service/src/main/java/com/oceanbase/odc/service/session/ConnectConsoleService.java
+++ b/server/odc-service/src/main/java/com/oceanbase/odc/service/session/ConnectConsoleService.java
@@ -340,7 +340,8 @@ public AsyncExecuteResultResp getMoreResults(@NotNull String sessionId, String r
Objects.isNull(timeoutSeconds) ? DEFAULT_GET_RESULT_TIMEOUT_SECONDS : timeoutSeconds;
boolean shouldRemoveContext = context.isFinished();
try {
- List resultList = context.getMoreSqlExecutionResults(gettingResultTimeoutSeconds * 1000);
+ List resultList =
+ context.getMoreSqlExecutionResults(gettingResultTimeoutSeconds * 1000L);
List results = resultList.stream().map(jdbcGeneralResult -> {
SqlExecuteResult result = generateResult(connectionSession, jdbcGeneralResult, context.getContextMap());
try (TraceStage stage = result.getSqlTuple().getSqlWatch().start(SqlExecuteStages.SQL_AFTER_CHECK)) {
diff --git a/server/odc-service/src/test/java/com/oceanbase/odc/service/audit/util/AuditUtilsTest.java b/server/odc-service/src/test/java/com/oceanbase/odc/service/audit/util/AuditUtilsTest.java
new file mode 100644
index 0000000000..c84624de99
--- /dev/null
+++ b/server/odc-service/src/test/java/com/oceanbase/odc/service/audit/util/AuditUtilsTest.java
@@ -0,0 +1,55 @@
+/*
+ * Copyright (c) 2023 OceanBase.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.oceanbase.odc.service.audit.util;
+
+import static org.junit.Assert.assertEquals;
+
+import java.util.Arrays;
+import java.util.Collection;
+
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameter;
+import org.junit.runners.Parameterized.Parameters;
+
+@RunWith(Parameterized.class)
+public class AuditUtilsTest {
+ @Parameter(0)
+ public String input;
+ @Parameter(1)
+ public String except;
+
+ @Parameters(name = "{index}: getFirstIpFromRemoteAddress({0})={1}")
+ public static Collection