多线程执行任务功能

wxpay
LI-CCONG\李聪聪 8 months ago
parent 9a2e8291fe
commit 97f7dbec88

@ -0,0 +1,52 @@
package cc.yunxi.config;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import java.util.concurrent.Executor;
import java.util.concurrent.ThreadPoolExecutor;
//@Configuration
@EnableAsync
@Slf4j
public class AsyncConfig {
private final int corePoolSize = 10;
private final int maxPoolSize = 20;
private final int queueCapacity = 100;
private final String namePrefix = "custom-async-";
private final int keepAliveSeconds = 30;
@Bean("asyncExecutor") // 线程池Bean
public Executor asyncExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
// 核心线程数
executor.setCorePoolSize(corePoolSize);
// 最大线程数
executor.setMaxPoolSize(maxPoolSize);
// 任务队列大小
executor.setQueueCapacity(queueCapacity);
// 线程池名称前缀
executor.setThreadNamePrefix(namePrefix);
// 允许线程的空闲时间
executor.setKeepAliveSeconds(keepAliveSeconds);
/**
*
* CallerRunsPolicy()线 main 线
* AbortPolicy()
* DiscardPolicy()
* DiscardOldestPolicy()
*/
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.AbortPolicy());
// 等待所有任务结束后再关闭线程池
executor.setWaitForTasksToCompleteOnShutdown(true);
log.info("创建一个线程池 corePoolSize is [" + corePoolSize + "] maxPoolSize is [" + maxPoolSize + "] queueCapacity is [" + queueCapacity +
"] keepAliveSeconds is [" + keepAliveSeconds + "] namePrefix is [" + namePrefix + "].");
// 线程初始化
executor.initialize();
return executor;
}
}

@ -4,13 +4,18 @@ import cc.yunxi.common.domain.CommonResult;
import cc.yunxi.common.exception.BadRequestException;
import cc.yunxi.domain.query.TestQuery;
import cc.yunxi.enums.UserTypeEnum;
import cc.yunxi.service.ITestService;
import cn.hutool.log.Log;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.scheduling.annotation.Async;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.web.bind.annotation.*;
import java.util.concurrent.ExecutionException;
@Api(tags = "测试接口")
@RestController
@RequestMapping("/test")
@ -18,6 +23,8 @@ import org.springframework.web.bind.annotation.*;
@Slf4j
public class TestController {
private final ITestService testService;
@ApiOperation("测试接口成功")
@GetMapping("/test01")
public CommonResult<String> success() {
@ -48,4 +55,20 @@ public class TestController {
return CommonResult.success(testQuery);
}
@ApiOperation("测试异步调用")
@PostMapping("/test05")
public CommonResult<String> asyncCall() {
// testService.syncTest();
// testService.asyncTest();
// testService.asyncTestWithCustomExecutor();
ListenableFuture<String> future = testService.asyncTestResultWithCustomExecutor();
try {
String result = future.get();// 同步阻塞,获取线程执行结果
log.info("线程池处理成功,返回结果: {}", result);
} catch (Exception e) {
e.printStackTrace();
}
return CommonResult.success("数据处理完成");
}
}

@ -2,9 +2,25 @@ package cc.yunxi.service;
import cc.yunxi.domain.po.Test;
import com.baomidou.mybatisplus.extension.service.IService;
import org.springframework.util.concurrent.ListenableFuture;
/**
*
*/
public interface ITestService extends IService<Test> {
// 同步方法
void syncTest();
// 异步方法
void asyncTest();
// 异步方法,使用自定义线程池
void asyncTestWithCustomExecutor();
// 异步方法,同步结果返回
ListenableFuture<String> asyncTestResultWithCustomExecutor();
}

@ -4,11 +4,60 @@ import cc.yunxi.domain.po.Test;
import cc.yunxi.mapper.TestMapper;
import cc.yunxi.service.ITestService;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import lombok.extern.slf4j.Slf4j;
import org.springframework.scheduling.annotation.Async;
import org.springframework.scheduling.annotation.AsyncResult;
import org.springframework.stereotype.Service;
import org.springframework.util.concurrent.ListenableFuture;
/**
*
*/
@Service
@Slf4j
public class TestServiceImpl extends ServiceImpl<TestMapper, Test> implements ITestService {
@Override
public void syncTest() {
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
log.info("等待main执行");
}
@Override
@Async
public void asyncTest() {
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
log.info("不等待main执行, 由异步线程先执行");
}
@Override
@Async("asyncExecutor") // 需要指定自定义线程池的名称
public void asyncTestWithCustomExecutor() {
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
log.info("当前异步线程: " + Thread.currentThread().getName());
}
@Override
@Async("asyncExecutor")
public ListenableFuture<String> asyncTestResultWithCustomExecutor() {
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
String result = Thread.currentThread().getName(); // 模拟数据查询结果
return new AsyncResult<>(result);
}
}

@ -52,6 +52,8 @@ public class NxhsApplicationTest {
// String property = System.getProperty("user.dir");
// System.out.println(property);
System.out.println(UserTypeEnum.CLIENT.toString());
Class<? extends UserTypeEnum> aClass = UserTypeEnum.CLIENT.getClass();
System.out.println(aClass.getEnumConstants());
}
}

Loading…
Cancel
Save