异步
前期理论知识
- 什么是异步任务
- 异步调用是相对于同步调用而言的,同步调用是指程序按预定顺序一步步执行,每一步必须等到上一步执行完后才能执行,异步调用则无需等待上一步程序执行完即可执行
- 多线程就是一种实现异步调用的方式
- MQ也是一种宏观上的异步
- 使用场景
- 适用于处理
log、发送邮件、短信……等 - 涉及到网络IO调用等操作
- 适用于处理
- 使用方式
- 启动类里面使用
@EnableAsync注解开启功能,自动扫描 - 定义异步任务类并使用
@Component标记组件被容器扫描,异步方法加上@Async
- 启动类里面使用
- 注意:
@Async失效情况- 注解
@Async的方法不是public方法 - 注解
@Async的返回值只能为void或者Future - 注解
@Async方法使用static修饰也会失效 spring无法扫描到异步类,没加注解@Async或@EnableAsync注解- 调用方与被调方不能在同一个类
Spring在扫描bean的时候会扫描方法上是否包含@Async注解,动态地生成一个子类(即proxy代理类),当这个有注解的方法被调用的时候,实际上是由代理类来调用的,代理类在调用时增加异步作用- 如果这个有注解的方法是被同一个类中的其他方法调用的,那么该方法的调用并没有通过代理类,而是直接通过原来的那个
bean,所以就失效了 - 所以调用方与被调方不能在同一个类,主要是使用了动态代理,同一个类的时候直接调用,不是通过生成的动态代理类调用
- 一般将要异步执行的方法单独抽取成一个类
- 类中需要使用
@Autowired或@Resource等注解自动注入,不能自己手动new对象 - 在
Async方法上标注@Transactional是没用的,但在Async方法调用的方法上标注@Transactional是有效的
- 注解
如果我们没有使用http线程池,那么将会发生下面的这些事情
-
- 服务端向前端socket连接管道写返回数据时 链接(pipe)却断开了
- 从应用角度分析,这是因为客户端等待返回超时了,主动断开了与服务端链接
- 连接数设置太小,并发量增加后,造成大量请求排队等待
- 网络延迟,是否有丢包
- 内存是否足够多支持对应的并发量
- 服务端向前端socket连接管道写返回数据时 链接(pipe)却断开了
- 问题分析
- resttemplate底层是怎样的?
- 基于之前的认知-池化思想,联想到是否使用了http连接池?
- 重新认识RestTemplate
- RestTemplate是Spring提供的用于访问Rest服务的客户端
- 底层通过使用java.net包下的实现创建HTTP 请求
- 通过使用ClientHttpRequestFactory指定不同的HTTP请求方式,主要提供了两种实现方式
- SimpleClientHttpRequestFactory(默认)
- 底层使用J2SE提供的方式,既java.net包提供的方式,创建底层的Http请求连接
- 主要createRequest 方法( 断点调试),每次都会创建一个新的连接,每次都创建连接会造成极大的资源浪费,而且若连接不能及时释放,会因为无法建立新的连接导致后面的请求阻塞
- HttpComponentsClientHttpRequestFactory
- 底层使用HttpClient访问远程的Http服务
- SimpleClientHttpRequestFactory(默认)
- 问题解决
- 客户端每次请求都要和服务端建立新的连接,即三次握手将会非常耗时
- 通过http连接池可以减少连接建立与释放的时间,提升http请求的性能
- Spring的restTemplate是对httpclient进行了封装, 而httpclient是支持池化机制
- 拓展
- 对httpclient进行封装的有:Apache的Fluent、es的restHighLevelClient、spring的restTemplate等
一、封装RestTemplateConfig
RestTemplateConfig
点击查看完整内容
package net.dbc.config;
import org.apache.http.client.HttpClient;
import org.apache.http.client.config.RequestConfig;
import org.apache.http.config.Registry;
import org.apache.http.config.RegistryBuilder;
import org.apache.http.conn.socket.ConnectionSocketFactory;
import org.apache.http.conn.socket.PlainConnectionSocketFactory;
import org.apache.http.conn.ssl.SSLConnectionSocketFactory;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClientBuilder;
import org.apache.http.impl.conn.PoolingHttpClientConnectionManager;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.http.client.ClientHttpRequestFactory;
import org.springframework.http.client.HttpComponentsClientHttpRequestFactory;
import org.springframework.http.client.SimpleClientHttpRequestFactory;
import org.springframework.web.client.RestTemplate;
/**
* @author DBC
* @version 1.0.0
* @date 2022年09月05日 20:10:10
* @website dbc655.top
*/
@Configuration
public class RestTemplateConfig {
@Bean
public RestTemplate restTemplate(ClientHttpRequestFactory requestFactory){
return new RestTemplate(requestFactory);
}
@Bean
public ClientHttpRequestFactory httpRequestFactory(){
return new HttpComponentsClientHttpRequestFactory(httpClient());
}
@Bean
public HttpClient httpClient(){
Registry<ConnectionSocketFactory> registry = RegistryBuilder.<ConnectionSocketFactory>create()
.register("http", PlainConnectionSocketFactory.getSocketFactory())
.register("https", SSLConnectionSocketFactory.getSocketFactory())
.build();
PoolingHttpClientConnectionManager connectionManager = new PoolingHttpClientConnectionManager(registry);
//设置整个连接池最大连接数
connectionManager.setMaxTotal(500);
//MaxPerRoute路由是对maxTotal的细分,每个主机的并发,这里route指的是域名
connectionManager.setDefaultMaxPerRoute(200);
RequestConfig requestConfig = RequestConfig.custom()
//返回数据的超时时间
.setSocketTimeout(20000)
//连接上服务器的超时时间
.setConnectTimeout(10000)
//从连接池中获取连接的超时时间
.setConnectionRequestTimeout(1000)
.build();
return HttpClientBuilder.create()
.setDefaultRequestConfig(requestConfig)
.setConnectionManager(connectionManager)
.build();
}
}
二、一个简单的自定义线程池——ThreadPoolTaskConfig
- 请你说下 ThreadPoolTaskExecutor线程池 有哪几个重要参数,什么时候会创建线程
- 查看核心线程池是否已满,不满就创建一条线程执行任务,否则执行第二步。
- 查看阻塞队列是否已满,不满就将任务存储在阻塞队列中,否则执行第三步。
- 查看线程池是否已满,即是否达到最大线程池数,不满就创建一条线程执行任务,否则就按照策略处理无法执行的任务。
- 高并发下核心线程怎么设置?
- 分IO密集还是CPU密集
- CPU密集设置为跟核心数一样大小
- IO密集型设置为2倍CPU核心数
- 分IO密集还是CPU密集
ThreadPoolTaskConfig
点击查看完整内容
package net.dbc.config;
/**
* @author DBC
* @version 1.0.0
* @date 2022年09月11日 11:28:38
* @website dbc655.top
*/
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import java.util.concurrent.ThreadPoolExecutor;
@Configuration
@EnableAsync
public class ThreadPoolTaskConfig {
@Bean("threadPoolTaskExecutor")
public ThreadPoolTaskExecutor threadPoolTaskExecutor(){
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
//线程池创建的核心线程数,线程池维护线程的最少数量,即使没有任务需要执行,也会一直存活
//如果设置allowCoreThreadTimeout=true(默认false)时,核心线程会超时关闭
executor.setCorePoolSize(16);
//executor.setAllowCoreThreadTimeOut();
//阻塞队列 当核心线程数达到最大时,新任务会放在队列中排队等待执行
executor.setQueueCapacity(124);
//最大线程池数量,当线程数>=corePoolSize,且任务队列已满时。线程池会创建新线程来处理任务
//任务队列已满时, 且当线程数=maxPoolSize,,线程池会拒绝处理任务而抛出异常
executor.setMaxPoolSize(64);
//当线程空闲时间达到keepAliveTime时,线程会退出,直到线程数量=corePoolSize
//允许线程空闲时间30秒,当maxPoolSize的线程在空闲时间到达的时候销毁
//如果allowCoreThreadTimeout=true,则会直到线程数量=0
executor.setKeepAliveSeconds(30);
//spring 提供的 ThreadPoolTaskExecutor 线程池,是有setThreadNamePrefix() 方法的。
//jdk 提供的ThreadPoolExecutor 线程池是没有 setThreadNamePrefix() 方法的
executor.setThreadNamePrefix("DBC-自定义线程池-");
// rejection-policy:当pool已经达到max size的时候,如何处理新任务
// CallerRunsPolicy():交由调用方线程运行,比如 main 线程;如果添加到线程池失败,那么主线程会自己去执行该任务,不会等待线程池中的线程去执行
// AbortPolicy():该策略是线程池的默认策略,如果线程池队列满了丢掉这个任务并且抛出RejectedExecutionException异常。
// DiscardPolicy():如果线程池队列满了,会直接丢掉这个任务并且不会有任何异常
// DiscardOldestPolicy():丢弃队列中最老的任务,队列满了,会将最早进入队列的任务删掉腾出空间,再尝试加入队列
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.AbortPolicy());
executor.initialize();
return executor;
}
}
三、完成一个发送短信的小例子
这里我们搞一个发送短信的操作来实战一下我们的全新的自定义线程+加异步+http线程
写入一些必要参数到配置文件
SmsConfig
package net.dbc.config;
import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Configuration;
/**
* @author DBC
* @version 1.0.0
* @date 2022年09月05日 20:16:14
* @website dbc655.top
*/
@ConfigurationProperties(prefix = "sms")
@Configuration
@Data
public class SmsConfig {
private String templateId;
private String appCode;
}
SmsComponent
封装的简单短信发送
package net.dbc.component;
/**
* @author DBC
* @version 1.0.0
* @date 2022年09月05日 20:09:32
* @website dbc655.top
*/
import lombok.extern.slf4j.Slf4j;
import net.dbc.config.SmsConfig;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.*;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Component;
import org.springframework.web.client.RestTemplate;
@Component
@Slf4j
public class SmsComponent {
/**
* 发送地址
*/
private static final String URL_TEMPLATE = "https://gyytz.market.alicloudapi.com/sms/smsSend?mobile=%s&templateId=%s¶m=%s&smsSignId=%s";
@Autowired
private RestTemplate restTemplate;
@Autowired
private SmsConfig smsConfig;
/**
* 发送短信验证码
* @param to
* @param templateId
* @param value
*/
@Async("threadPoolTaskExecutor")
public void send(String to,String templateId,String value){
String url = String.format(URL_TEMPLATE,to,templateId,value,"2e65b1bb3d054466b82f0c9d125465e2");
HttpHeaders headers = new HttpHeaders();
//最后在header中的格式(中间是英文空格)为Authorization:APPCODE 83359fd73fe94948385f570e3c139105
headers.set("Authorization","APPCODE "+smsConfig.getAppCode());
HttpEntity entity = new HttpEntity<>(headers);
ResponseEntity<String> response = restTemplate.exchange(url, HttpMethod.POST, entity, String.class);
log.info("url={},body={}",url,response.getBody());
if(response.getStatusCode().is2xxSuccessful()){
log.info("发送短信验证码成功");
}else {
log.error("发送短信验证码失败:{}",response.getBody());
}
}
} AccountController
@RestController
@RequestMapping("/account")
@Slf4j
public class AccountController {
@Autowired
private SmsComponent smsComponent;
@Autowired
private SmsConfig smsConfig;
@GetMapping
public void test(){
String value = "**code**:%s,**minute**:5";
String url = String.format(value,"1997");
smsComponent.send("手机号", smsConfig.getTemplateId(), url);
}
} 点击查看完整内容
相信看起来也很简单,其他业务操作依然可以同理可得,主要是要学会这种池化的设计思想![aru_50]
本文作者为DBC,转载请注明。
