Skip to content

Commit

Permalink
Merge branch 'dev/4.3.x' into fix/optimize_error_message_when_cancell…
Browse files Browse the repository at this point in the history
…ing_mockdata_ticket
  • Loading branch information
zijiacj committed Jul 3, 2024
2 parents 612b51e + 78c6d63 commit 57c12f9
Show file tree
Hide file tree
Showing 10 changed files with 123 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,6 @@
*/
package com.oceanbase.odc.metadb.notification;

/**
* @Author: Lebie
* @Date: 2023/3/20 21:36
* @Description: []
*/
/**
* @Author: Lebie
* @Date: 2023/3/20 21:36
* @Description: []
*/
import java.util.Date;

import javax.persistence.Column;
Expand All @@ -39,6 +29,11 @@

import lombok.Data;

/**
* @Author: Lebie
* @Date: 2023/3/20 21:36
* @Description: []
*/
@Data
@Entity
@Table(name = "notification_policy_channel_relation")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -294,7 +294,9 @@ private AuditEvent createAuditEvent(Method method, Object[] args) {
.type(auditEventMeta.getType())
.startTime(new Date())
.serverIpAddress(SystemUtils.getLocalIpAddress())
.clientIpAddress(WebRequestUtils.getClientAddress(servletRequest))
.clientIpAddress(
AuditUtils.getFirstIpFromRemoteAddress(
WebRequestUtils.getClientAddress(servletRequest)))
.organizationId(authenticationFacade.currentOrganizationId())
.userId(authenticationFacade.currentUserId())
.username(authenticationFacade.currentUsername())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -323,4 +323,30 @@ public static AuditEventAction getActualActionForTask(AuditEventType type, Audit
return action;
}

/**
* <pre>
* Get the first ip of {@param remoteAddress}.
* The X-Forwarded-For header may contain multiple IP addresses, separated
* by commas, and typically, the first non-unknown IP is considered to be the client's IP address.
* </pre>
*
* @author keyang.lk
* @date 2024-07-02
* @param remoteAddress
* @return The first ip of remoteAddress
*/
public static String getFirstIpFromRemoteAddress(String remoteAddress) {
if (remoteAddress == null || remoteAddress.isEmpty() || "unknown".equalsIgnoreCase(remoteAddress)) {
return "N/A";
}
// 处理X-Forwarded-For可能包含多个IP地址的情况(由逗号分隔),通常第一个非unknown的IP是客户端的IP
String[] ips = remoteAddress.split(",");
for (String ip : ips) {
if (ip != null && !ip.isEmpty() &&
!"unknown".equalsIgnoreCase(ip.trim())) {
return ip.trim();
}
}
return remoteAddress;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -263,8 +263,6 @@ public class FlowInstanceService {
@Autowired
private EnvironmentRepository environmentRepository;
@Autowired
private DBResourcePermissionHelper dbResourcePermissionHelper;
@Autowired
private EnvironmentService environmentService;

private final List<Consumer<DataTransferTaskInitEvent>> dataTransferTaskInitHooks = new ArrayList<>();
Expand Down Expand Up @@ -790,8 +788,8 @@ private void checkCreateFlowInstancePermission(CreateFlowInstanceReq req) {
}
});
}
List<UnauthorizedDBResource> unauthorizedDBResources =
dbResourcePermissionHelper.filterUnauthorizedDBResources(resource2Types, false);
List<UnauthorizedDBResource> unauthorizedDBResources = this.permissionHelper
.filterUnauthorizedDBResources(resource2Types, false);
if (CollectionUtils.isNotEmpty(unauthorizedDBResources)) {
throw new BadRequestException(ErrorCodes.DatabaseAccessDenied,
new Object[] {unauthorizedDBResources.stream()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;

import org.apache.commons.collections4.CollectionUtils;
import org.flowable.engine.delegate.DelegateExecution;
Expand Down Expand Up @@ -102,7 +103,7 @@ public abstract class BaseODCFlowTaskDelegate<T> extends BaseRuntimeFlowableDele

private void init(DelegateExecution execution) {
this.taskId = FlowTaskUtil.getTaskId(execution);
this.timeoutMillis = FlowTaskUtil.getExecutionExpirationIntervalMillis(execution);
this.timeoutMillis = getTimeoutMillis(execution);
this.taskService.updateExecutorInfo(taskId, new ExecutorInfo(hostProperties));
SecurityContextUtils.setCurrentUser(FlowTaskUtil.getTaskCreator(execution));
}
Expand All @@ -113,6 +114,7 @@ private void initMonitorExecutor() {
.build();
scheduleExecutor = new ScheduledThreadPoolExecutor(1, threadFactory);
int interval = RuntimeTaskConstants.DEFAULT_TASK_CHECK_INTERVAL_SECONDS;
AtomicBoolean isCancelled = new AtomicBoolean(false);
scheduleExecutor.scheduleAtFixedRate(() -> {
try {
updateHeartbeatTime();
Expand All @@ -124,6 +126,13 @@ private void initMonitorExecutor() {
}
try {
if (isCompleted() || isTimeout()) {
if (isTimeout() && !isCancelled.getAndSet(true)) {
try {
cancel(true);
} catch (Exception e) {
log.warn("Task is timeout, failed to cancel it, errorMessage={}", e.getMessage());
}
}
taskLatch.countDown();
}
} catch (Exception e) {
Expand Down Expand Up @@ -188,7 +197,7 @@ protected void run(DelegateExecution execution) throws Exception {
} catch (Exception e) {
log.warn("Task timeout callback method execution failed, taskId={}", taskId, e);
}
throw new InterruptedException();
throw new ServiceTaskCancelledException();
}
if (!isSuccessful()) {
// 监控线程出错导致闭锁失效,此种情况任务必须终止
Expand Down Expand Up @@ -312,6 +321,10 @@ protected void onTimeout(Long taskId, TaskService taskService) {
}
}

protected long getTimeoutMillis(DelegateExecution execution) {
return FlowTaskUtil.getExecutionExpirationIntervalMillis(execution);
}

/**
* This method is scheduled periodically to update the progress of the task
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -183,9 +183,6 @@ protected void onProgressUpdate(Long taskId, TaskService taskService) {
if (Objects.nonNull(asyncTaskThread)) {
double progress = asyncTaskThread.getProgressPercentage();
taskService.updateProgress(taskId, progress);
if (System.currentTimeMillis() - asyncTaskThread.getStartTimestamp() > getTimeoutMillis()) {
asyncTaskThread.stopTaskAndKillQuery(sessionManageFacade);
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,8 +98,7 @@ public boolean isCancelled() {
private MockTaskConfig getMockTaskConfig(Long taskId, DelegateExecution execution) {
OdcMockTaskConfig config = FlowTaskUtil.getMockParameter(execution);
this.connectionConfig = FlowTaskUtil.getConnectionConfig(execution);
return FlowTaskUtil.generateMockConfig(taskId, execution, getTimeoutMillis(),
config, mockProperties);
return FlowTaskUtil.generateMockConfig(taskId, execution, getTimeoutMillis(), config, mockProperties);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.TreeMap;
import java.util.TreeSet;
Expand All @@ -32,12 +33,15 @@
import org.springframework.stereotype.Component;

import com.oceanbase.odc.core.authority.util.SkipAuthorize;
import com.oceanbase.odc.core.shared.constant.DialectType;
import com.oceanbase.odc.core.shared.constant.ErrorCodes;
import com.oceanbase.odc.core.shared.constant.OrganizationType;
import com.oceanbase.odc.core.shared.constant.ResourceRoleName;
import com.oceanbase.odc.core.shared.constant.ResourceType;
import com.oceanbase.odc.core.shared.exception.AccessDeniedException;
import com.oceanbase.odc.core.shared.exception.BadRequestException;
import com.oceanbase.odc.metadb.connection.ConnectionConfigRepository;
import com.oceanbase.odc.metadb.connection.ConnectionEntity;
import com.oceanbase.odc.metadb.connection.DatabaseEntity;
import com.oceanbase.odc.metadb.connection.DatabaseRepository;
import com.oceanbase.odc.metadb.dbobject.DBObjectEntity;
Expand Down Expand Up @@ -83,6 +87,9 @@ public class DBResourcePermissionHelper {
@Autowired
private PermissionCheckWhitelist permissionCheckWhitelist;

@Autowired
private ConnectionConfigRepository connectionConfigRepository;

private static final Set<String> ORACLE_DATA_DICTIONARY = new TreeSet<>(String.CASE_INSENSITIVE_ORDER);

private static final Set<String> MYSQL_DATA_DICTIONARY = new TreeSet<>(String.CASE_INSENSITIVE_ORDER);
Expand All @@ -106,13 +113,19 @@ public void checkDBPermissions(Collection<Long> databaseIds, Collection<Database
return;
}
List<DatabaseEntity> entities = databaseRepository.findByIdIn(databaseIds);
Set<Long> connectionIds = entities.stream()
.map(DatabaseEntity::getConnectionId)
.filter(Objects::nonNull).collect(Collectors.toSet());
Map<Long, ConnectionEntity> id2Entity = this.connectionConfigRepository.findByIdIn(connectionIds)
.stream().collect(Collectors.toMap(ConnectionEntity::getId, e -> e));
List<Long> toCheckDatabaseIds = new ArrayList<>();
Set<Long> projectIds = getPermittedProjectIds();
for (DatabaseEntity e : entities) {
if (e.getProjectId() == null) {
throw new AccessDeniedException("Database is not belong to any project");
}
if (permissionCheckWhitelist.containsDatabase(e.getName(), e.getDialectType())
DialectType dialectType = id2Entity.get(e.getConnectionId()).getDialectType();
if (permissionCheckWhitelist.containsDatabase(e.getName(), dialectType)
|| projectIds.contains(e.getProjectId())) {
continue;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -340,7 +340,8 @@ public AsyncExecuteResultResp getMoreResults(@NotNull String sessionId, String r
Objects.isNull(timeoutSeconds) ? DEFAULT_GET_RESULT_TIMEOUT_SECONDS : timeoutSeconds;
boolean shouldRemoveContext = context.isFinished();
try {
List<JdbcGeneralResult> resultList = context.getMoreSqlExecutionResults(gettingResultTimeoutSeconds * 1000);
List<JdbcGeneralResult> resultList =
context.getMoreSqlExecutionResults(gettingResultTimeoutSeconds * 1000L);
List<SqlExecuteResult> results = resultList.stream().map(jdbcGeneralResult -> {
SqlExecuteResult result = generateResult(connectionSession, jdbcGeneralResult, context.getContextMap());
try (TraceStage stage = result.getSqlTuple().getSqlWatch().start(SqlExecuteStages.SQL_AFTER_CHECK)) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
/*
* Copyright (c) 2023 OceanBase.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.oceanbase.odc.service.audit.util;

import static org.junit.Assert.assertEquals;

import java.util.Arrays;
import java.util.Collection;

import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.junit.runners.Parameterized.Parameter;
import org.junit.runners.Parameterized.Parameters;

@RunWith(Parameterized.class)
public class AuditUtilsTest {
@Parameter(0)
public String input;
@Parameter(1)
public String except;

@Parameters(name = "{index}: getFirstIpFromRemoteAddress({0})={1}")
public static Collection<Object[]> data() {
return Arrays.asList(new Object[][] {
{"", "N/A"},
{null, "N/A"},
{"unknown", "N/A"},
{"UNKNOWN", "N/A"},
{"123", "123"},
{"192.168.1.1", "192.168.1.1"},
{",192.168.1.1", "192.168.1.1"},
{"192.168.1.1,122.122.1.1,127.0.0.1", "192.168.1.1"},
{"unknown,192.168.1.1,122.122.1.1,127.0.0.1", "192.168.1.1"}
});
}

@Test
public void getFirstIpFromRemoteAddress() {
assertEquals(except, AuditUtils.getFirstIpFromRemoteAddress(input));
}
}

0 comments on commit 57c12f9

Please sign in to comment.