Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP]feature:[loom] replace the usages of synchronized with ReentrantLock #6983

Open
wants to merge 20 commits into
base: 2.x
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changes/en-us/2.x.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ Add changes here for all PR submitted to the 2.x branch.
- [[#6992](https://github.com/apache/incubator-seata/pull/6992)] support grpc serializer
- [[#6973](https://github.com/apache/incubator-seata/pull/6973)] support saga annotation
- [[#6926](https://github.com/apache/incubator-seata/pull/6926)] support ssl communication for raft nodes
- [[#6983](https://github.com/apache/incubator-seata/pull/6983)] Support jdk21 virtual threads, and replace synchronized with ReentrantLock


### bugfix:
Expand Down
1 change: 1 addition & 0 deletions changes/zh-cn/2.x.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
- [[#6995](https://github.com/apache/incubator-seata/pull/6995)] 升级过时的 npmjs 依赖
- [[#6973](https://github.com/apache/incubator-seata/pull/6973)] 支持saga注解化
- [[#6926](https://github.com/apache/incubator-seata/pull/6926)] 支持Raft节点间的SSL通信
- [[#6983](https://github.com/apache/incubator-seata/pull/6983)] 支持jdk21虚拟线程,用ReentrantLock替换synchronized

### bugfix:
- [[#6899](https://github.com/apache/incubator-seata/pull/6899)] 修复file.conf打包后的读取
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.apache.commons.lang.exception.ExceptionUtils;
import org.apache.seata.common.Constants;
import org.apache.seata.common.executor.Initialize;
import org.apache.seata.common.lock.ResourceLock;
import org.apache.seata.common.util.CollectionUtils;
import org.apache.seata.common.util.StringUtils;
import org.slf4j.Logger;
Expand Down Expand Up @@ -235,8 +236,10 @@ private static <S> void doUnload(InnerEnhancedServiceLoader<S> serviceLoader, St
serviceLoader.nameToDefinitionsMap.remove(activateName.toLowerCase());
if (CollectionUtils.isNotEmpty(extensionDefinitions)) {
for (ExtensionDefinition<S> definition : extensionDefinitions) {
serviceLoader.definitionToInstanceMap.remove(definition);

InnerEnhancedServiceLoader.Holder<Object> holder = serviceLoader.definitionToInstanceMap.remove(definition);
if (holder != null) {
serviceLoader.holderLocks.remove(holder);
}
}
}
}
Expand Down Expand Up @@ -289,6 +292,8 @@ private static class InnerEnhancedServiceLoader<S> {
new ConcurrentHashMap<>();
private final ConcurrentMap<String, List<ExtensionDefinition<S>>> nameToDefinitionsMap = new ConcurrentHashMap<>();
private final ConcurrentMap<Class<?>, ExtensionDefinition<S>> classToDefinitionMap = new ConcurrentHashMap<>();
private final ConcurrentMap<Holder<Object>,ResourceLock> holderLocks = new ConcurrentHashMap<>();
private final ResourceLock resourceLock = new ResourceLock();

private InnerEnhancedServiceLoader(Class<S> type) {
this.type = type;
Expand Down Expand Up @@ -320,6 +325,9 @@ private static <S> InnerEnhancedServiceLoader<S> removeServiceLoader(Class<S> ty
}

private static void removeAllServiceLoader() {
SERVICE_LOADERS.values().forEach(loader -> {
loader.holderLocks.clear();
});
SERVICE_LOADERS.clear();
}

Expand Down Expand Up @@ -472,7 +480,8 @@ private S getExtensionInstance(ExtensionDefinition<S> definition, ClassLoader lo
key -> new Holder<>());
Object instance = holder.get();
if (instance == null) {
synchronized (holder) {
ResourceLock lock = CollectionUtils.computeIfAbsent(holderLocks, holder, key -> new ResourceLock());
try (ResourceLock ignored = lock.obtain()) {
instance = holder.get();
if (instance == null) {
instance = createNewExtension(definition, loader, argTypes, args);
Expand All @@ -499,7 +508,7 @@ private S createNewExtension(ExtensionDefinition<S> definition, ClassLoader load
private List<Class<S>> loadAllExtensionClass(ClassLoader loader, boolean includeCompatible) {
List<ExtensionDefinition<S>> definitions = definitionsHolder.get();
if (definitions == null) {
synchronized (definitionsHolder) {
try (ResourceLock ignored = resourceLock.obtain()) {
definitions = definitionsHolder.get();
if (definitions == null) {
definitions = findAllExtensionDefinition(loader, includeCompatible);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.seata.common.lock;

import java.util.concurrent.locks.ReentrantLock;

/**
* The ResourceLock extends ReentrantLock and implements AutoCloseable,
* allowing it to be used in try-with-resources blocks without needing
* to unlock in a finally block.
*
* <h3>Example</h3>
* <pre>
* {@code
* private final ResourceLock resourceLock = new ResourceLock();
* try (ResourceLock lock = resourceLock.obtain()) {
* // do something while holding the resource lock
* }
* }
* </pre>
*/
public class ResourceLock extends ReentrantLock implements AutoCloseable {

/**
* Obtain the lock.
*
* @return this ResourceLock
*/
public ResourceLock obtain() {
lock();
return this;
}


/**
* Unlock the resource lock.
*
* <p>This is typically used in try-with-resources blocks to automatically
* unlock the resource lock when the block is exited, regardless of whether
* an exception is thrown or not.
*/
@Override
public void close() {
this.unlock();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,15 @@
*/
package org.apache.seata.common.util;

import org.apache.seata.common.lock.ResourceLock;

/**
* The type Uuid generator.
*/
public class UUIDGenerator {

private static volatile IdWorker idWorker;
private final static ResourceLock RESOURCE_LOCK = new ResourceLock();

/**
* generate UUID using snowflake algorithm
Expand All @@ -30,7 +33,7 @@ public class UUIDGenerator {
*/
public static long generateUUID() {
if (idWorker == null) {
synchronized (UUIDGenerator.class) {
try (ResourceLock ignored = RESOURCE_LOCK.obtain()) {
if (idWorker == null) {
init(null);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.seata.common.lock;

import org.apache.seata.common.util.CollectionUtils;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.junit.jupiter.MockitoExtension;

import java.util.concurrent.ConcurrentHashMap;

import static org.junit.jupiter.api.Assertions.*;

@ExtendWith(MockitoExtension.class)
public class ResourceLockTest {

@Test
public void testObtainAndClose() {
ResourceLock resourceLock = new ResourceLock();

// Test obtaining the lock
try (ResourceLock lock = resourceLock.obtain()) {
assertTrue(resourceLock.isHeldByCurrentThread(), "Lock should be held by current thread");
}

// After try-with-resources, lock should be released
assertFalse(resourceLock.isHeldByCurrentThread(), "Lock should be released after try-with-resources");
}

@Test
public void testMultipleObtainAndClose() {
ResourceLock resourceLock = new ResourceLock();

// First obtain and release
try (ResourceLock lock = resourceLock.obtain()) {
assertTrue(resourceLock.isHeldByCurrentThread(), "Lock should be held by current thread");
}
assertFalse(resourceLock.isHeldByCurrentThread(), "Lock should be released after first try-with-resources");

// Second obtain and release
try (ResourceLock lock = resourceLock.obtain()) {
assertTrue(resourceLock.isHeldByCurrentThread(), "Lock should be held by current thread");
}
assertFalse(resourceLock.isHeldByCurrentThread(), "Lock should be released after second try-with-resources");
}

@Test
public void testResourceLockAutoRemovalFromMap() {
ConcurrentHashMap<String, ResourceLock> lockMap = new ConcurrentHashMap<>();
String key = "testKey";
// Use try-with-resources to obtain and release the lock
try (ResourceLock ignored = CollectionUtils.computeIfAbsent(lockMap, key, k -> new ResourceLock()).obtain()) {
// Do something while holding the lock
assertTrue(lockMap.containsKey(key));
assertTrue(lockMap.get(key).isHeldByCurrentThread());
} finally {
assertFalse(lockMap.get(key).isHeldByCurrentThread());
assertTrue(lockMap.containsKey(key));
// Remove the lock from the map
lockMap.remove(key);
assertFalse(lockMap.containsKey(key));
}
// Ensure the lock is removed from the map
assertFalse(lockMap.containsKey(key));
}

@Test
public void testConcurrentLocking() throws InterruptedException {
ResourceLock resourceLock = new ResourceLock();

Thread t1 = new Thread(() -> {
try (ResourceLock lock = resourceLock.obtain()) {
try {
Thread.sleep(100); // Hold the lock for 100ms
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
});

Thread t2 = new Thread(() -> {
assertFalse(resourceLock.isHeldByCurrentThread(), "Lock should not be held by current thread before t1 releases it");
try (ResourceLock lock = resourceLock.obtain()) {
assertTrue(resourceLock.isHeldByCurrentThread(), "Lock should be held by current thread after t1 releases it");
}
});

t1.start();
t2.start();

t1.join();
t2.join();

assertFalse(resourceLock.isHeldByCurrentThread(), "Lock should be released after both threads complete");
}

@Test
public void testLockInterruptibly() throws InterruptedException {
ResourceLock resourceLock = new ResourceLock();

Thread t1 = new Thread(() -> {
try (ResourceLock lock = resourceLock.obtain()) {
try {
Thread.sleep(1000); // Hold the lock for 1000ms
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
});

t1.start();
Thread.sleep(50); // Wait for t1 to acquire the lock

Thread t2 = new Thread(() -> {
try {
resourceLock.lockInterruptibly();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});

t2.start();
Thread.sleep(50); // Wait for t2 to attempt to acquire the lock

t2.interrupt(); // Interrupt t2

t1.join();
t2.join();

assertFalse(resourceLock.isHeldByCurrentThread(), "Lock should be released after t1 completes");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,15 @@
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.locks.Condition;

import io.netty.channel.Channel;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import org.apache.seata.common.exception.FrameworkErrorCode;
import org.apache.seata.common.exception.FrameworkException;
import org.apache.seata.common.loader.EnhancedServiceLoader;
import org.apache.seata.common.lock.ResourceLock;
import org.apache.seata.common.thread.NamedThreadFactory;
import org.apache.seata.common.thread.PositiveAtomicCounter;
import org.apache.seata.core.protocol.MessageFuture;
Expand Down Expand Up @@ -88,7 +91,8 @@ public abstract class AbstractNettyRemoting implements Disposable {
*/
protected volatile long nowMills = 0;
private static final int TIMEOUT_CHECK_INTERVAL = 3000;
protected final Object lock = new Object();
protected final ResourceLock resourceLock = new ResourceLock();
protected final Condition condition = resourceLock.newCondition();
/**
* The Is sending.
*/
Expand Down Expand Up @@ -119,7 +123,6 @@ public void run() {
}
}
}

nowMills = System.currentTimeMillis();
}
}, TIMEOUT_CHECK_INTERVAL, TIMEOUT_CHECK_INTERVAL, TimeUnit.MILLISECONDS);
Expand Down Expand Up @@ -343,7 +346,7 @@ protected String getAddressFromChannel(Channel channel) {

private void channelWritableCheck(Channel channel, Object msg) {
int tryTimes = 0;
synchronized (lock) {
try (ResourceLock ignored = resourceLock.obtain()) {
while (!channel.isWritable()) {
try {
tryTimes++;
Expand All @@ -352,7 +355,7 @@ private void channelWritableCheck(Channel channel, Object msg) {
throw new FrameworkException("msg:" + ((msg == null) ? "null" : msg.toString()),
FrameworkErrorCode.ChannelIsNotWritable);
}
lock.wait(NOT_WRITEABLE_CHECK_MILLS);
condition.await(NOT_WRITEABLE_CHECK_MILLS, TimeUnit.MILLISECONDS);
} catch (InterruptedException exx) {
LOGGER.error(exx.getMessage());
}
Expand Down
Loading
Loading