Skip to the content.

一、基础篇概述

认识微服务

随着互联网行业的发展,对服务的要求也越来越高,服务架构也从单体架构逐渐演变为现在流行的微服务架构。

单体架构

单体架构:将业务的所有功能集中在一个项目中开发,打成一个包部署。

单体架构的优缺点如下

优点:

缺点:

分布式架构

分布式架构:根据业务功能对系统做拆分,每个业务功能模块作为独立项目开发,称为一个服务。

分布式架构的优缺点

优点:

缺点:

分布式架构虽然降低了服务耦合,但是服务拆分时也有很多问题:

需要制定一套行之有效的标准来约束分布式架构。

微服务

微服务架构:一种良好的分布式架构方案

微服务的架构特征

微服务的上述特性其实是在给分布式架构制定一个标准,进一步降低服务之间的耦合度,提供服务的独立性和灵活性。做到高内聚,低耦合。

因此,可以认为微服务是一种经过良好架构设计的分布式架构方案

微服务技术对比

- Dubbo SpringCloud SpringCloudAlibaba
注册中心 zookeeper、redis Eureka、Consul Nacos、Eureka
服务远程调用 dubbo 协议 Feign(HTTP 协议) Dubbo、Feign
配置中心 SpringCloudConfig SpringCloudConfig、Nacos
服务网关 SpringCloud Gateway、Zuul SpringCloud Gateway、Zuul
服务监控和保护 dubbo-admin,功能弱 Hysitx Sentinel

Spring Cloud

Spring Cloud 是目前国内使用最广泛的微服务框架。官网地址:https://spring.io/projects/spring-cloud。

Spring Cloud 集成了各种微服务功能组件,并基于 Spring Boot 实现了这些组件的自动装配,从而提供了良好的开箱即用体验。

其中常见的组件包括:

另外,Spring Cloud 底层是依赖于 Spring Boot 的,并且有版本的兼容关系,如下:

我学习的版本是 Hoxton.SR10,因此对应的 Spring Boot 版本是 2.3.x 版本。

总结

服务拆分和远程调用

任何分布式架构都离不开服务的拆分,微服务也是一样。

服务拆分原则

微服务拆分时的几个原则

服务拆分示例

以微服务 cloud-demo 为例,其结构如下:

cloud-demo:父工程,管理依赖

要求:

导入 SQL 语句

首先,将 cloud-order.sqlcloud-user.sql 导入到 mysql 中:

cloud-user 表中初始数据如下:

cloud-order 表中初始数据如下:

cloud-order 表中持有 cloud-user 表中的 id 字段。

导入demo工程

用 IDEA 导入课前资料提供的 Demo:

项目结构如下:

导入后,会在 IDEA 右下角出现弹窗:

点击弹窗,然后按下图选择:

会出现这样的菜单:

配置下项目使用的 JDK

实现远程调用案例

在 order-service 服务中,有一个根据 id 查询订单的接口:

@RestController
@RequestMapping("order")
public class OrderController {

   @Autowired
   private OrderService orderService;

    @GetMapping("{orderId}")
    public Order queryOrderByUserId(@PathVariable("orderId") Long orderId) {
        // 根据id查询订单并返回
        return orderService.queryOrderById(orderId);
    }
}

根据 id 查询订单,返回值是 Order 对象,其中的 user 为 null

{
    "id": 101,
    "price": 699900,
    "name": "Apple 苹果 iPhone 12",
    "num": 1,
    "userId": 1,
    "user": null
}

在 user-service 中有一个根据 id 查询用户的接口

@Slf4j
@RestController
@RequestMapping("/user")
public class UserController {

    @Autowired
    private UserService userService;

    /**
     * 路径: /user/110
     * @param id 用户id
     * @return 用户
     */
    @GetMapping("/{id}")
    public User queryById(@PathVariable("id") Long id, @RequestHeader(value = "Truth", required = false) String truth) {
        System.out.println(truth);
        return userService.queryById(id);
    }
}

查询的结果如下

{
    "id": 1,
    "username": "xxx",
    "address": "xxxxxx"
}

案例需求

修改 order-service 中的根据 id 查询订单业务,要求在查询订单的同时,根据订单中包含的 userId 查询出用户信息,一起返回。

因此,我们需要在 order-service 中向 user-service 发起一个 http 的请求,调用 http://localhost:8081/user/{userId} 这个接口。

大概的步骤是这样的:

注册RestTemplate

首先,我们在 order-service 服务中的 OrderApplication 启动类中,注册 RestTemplate 实例

package cn.itcast.order;

import org.mybatis.spring.annotation.MapperScan;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import org.springframework.web.client.RestTemplate;

@MapperScan("cn.itcast.order.mapper")
@SpringBootApplication
public class OrderApplication {

    public static void main(String[] args) {
        SpringApplication.run(OrderApplication.class, args);
    }

    @Bean
    public RestTemplate restTemplate() {
        return new RestTemplate();
    }
}

实现远程调用

修改 order-service 服务中的 cn.itcast.order.service 包下的 OrderService 类中的 queryOrderById 方法:

@Service
public class OrderService {

    @Autowired
    private OrderMapper orderMapper;
    @Autowired
    private RestTemplate restTemplate;

    public Order queryOrderById(Long orderId) {
        Order order = orderMapper.findById(orderId);
        String url = "http://localhost:8081/user/" + order.getUserId();
        // 发起调用
        User user = restTemplate.getForObject(url, User.class);
        order.setUser(user);
        return order;
    }
}

RestTemplate进阶用法

参考博客 Java RestTemplate post请求传递参数_postforentity和postforobject区别_chnjg的博客-CSDN博客

用法 说明
postForObject 不可以设置 Header,返回值只包含 Body 信息。
postForEntity 可以设置 Header,不仅包含 Body 信息,包含了响应的所有信息。
exchange 与 postForEntity 类似,而且可以调用 get、put 等请求。

提供者与消费者

在服务调用关系中,会有两个不同的角色

服务提供者:一次业务中,被其它微服务调用的服务。(提供接口给其它微服务)

服务消费者:一次业务中,调用其它微服务的服务。(调用其它微服务提供的接口)

graph LR
服务消费者,&nbsporder-service===>服务提供者,&nbspuser-service

但是,服务提供者与服务消费者的角色并不是绝对的,而是相对于业务而言。

如果服务 A 调用了服务 B,而服务 B 又调用了服务 C,服务 B 的角色是什么?

因此,服务 B 既可以是服务提供者,也可以是服务消费者。

Eureka注册中心

假如我们的服务提供者 user-service 部署了多个实例

思考:

Eureka的结构和作用

这些问题都需要利用 Spring Cloud 中的注册中心来解决,其中最广为人知的注册中心就是 Eureka,其结构如下:

order-service 如何得知 user-service 实例地址?

获取地址信息的流程如下:

order-service 如何从多个 user-service 实例中选择具体的实例?

order-service 如何得知某个 user-service 实例是否依然健康,是不是已经宕机?

注意:一个微服务,既可以是服务提供者,又可以是服务消费者,因此 eureka 将服务注册、服务发现等功能统一封装到了eureka-client 端

搭建eureka-server

首先大家注册中心服务端:eureka-server,这必须是一个独立的微服务

创建eureka-server服务

cloud-demo 父工程下,创建一个子模块:

填写模块信息:

然后填写服务信息:

引入eureka依赖

引入 Spring Cloudeureka 提供的 starter 依赖:

<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-netflix-eureka-server</artifactId>
</dependency>

编写启动类

eureka-server 服务编写一个启动类,一定要添加一个 @EnableEurekaServer 注解,开启 eureka 的注册中心功能:

package cn.itcast.eureka;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.netflix.eureka.server.EnableEurekaServer;

@SpringBootApplication
@EnableEurekaServer
public class EurekaApplication {
    public static void main(String[] args) {
        SpringApplication.run(EurekaApplication.class, args);
    }
}

编写配置文件

编写一个 application.yml 文件,内容如下:

server:
  port: 10086
spring:
  application:
    name: eureka-server
eureka: # 配置 eureka 的地址信息。
  client:
    service-url: 
      defaultZone: http://127.0.0.1:10086/eureka

为什么 eureka 要自己配置自己?因为 eureka 也是一个微服务,所以会把自己也注册进去。为了以后 eureka 集群通信准备的。

启动服务

启动微服务,然后在浏览器访问:http://127.0.0.1:10086,看到下面结果应该是成功了:

服务注册

下面,我们将 user-service 注册到 eureka-server 中去。

引入依赖

user-servicepom 文件中,引入下面的 eureka-client 依赖

<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
</dependency>

配置文件

user-service 中,修改 application.yml 文件,添加服务名称、eureka 地址

spring:
  application:
    name: userservice
eureka: # 配置 eureka 地址,将 userservice 注册到 eureka 中
  client:
    service-url:
      defaultZone: http://127.0.0.1:10086/eureka

启动多个user-service实例

为了演示一个服务有多个实例的场景,我们添加一个 SpringBoot 的启动配置,再启动一个 user-service

首先,复制原来的 user-service 启动配置:

然后,在弹出的窗口中,填写信息:

现在,Spring Boot 窗口会出现两个 user-service 启动配置:

不过,第一个是 8081 端口,第二个是 8082 端口。

启动两个 user-service 实例:

查看 eureka-server 管理页面:

服务发现

下面,我们将 order-service 的逻辑修改:向 eureka-server 拉取 user-service 的信息,实现服务发现。

引入依赖

之前说过,服务发现、服务注册统一都封装在 eureka-client 依赖,因此这一步与服务注册时一致。

order-servicepom 文件中,引入下面的 eureka-client 依赖:

<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
</dependency>

配置文件

服务发现也需要知道 eureka 地址,因此第二步与服务注册一致,都是配置 eureka 信息:

在 order-service 中,修改 application.yml 文件,添加服务名称、eureka地址:

spring:
  application:
    name: orderservice
eureka:
  client:
    service-url:
      defaultZone: http://127.0.0.1:10086/eureka

服务拉取和负载均衡

最后,我们要去 eureka-server 中拉取 user-service 服务的实例列表,并且实现负载均衡。添加注解 @LoadBalanced 即可实现负载均衡。

在 order-service 的 OrderApplication 中,给 RestTemplate 这个 Bean 添加一个 @LoadBalanced 注解:

@MapperScan("cn.itcast.order.mapper")
@SpringBootApplication
public class OrderApplication {

    public static void main(String[] args) {
        SpringApplication.run(OrderApplication.class, args);
    }

    @Bean
    @LoadBalanced
    public RestTemplate restTemplate() {
        return new RestTemplate();
    }
}

修改 order-service 服务中的 cn.itcast.order.service 包下的 OrderService 类中的 queryOrderById 方法。修改访问的 url 路径,用服务名代替 ip、端口

@Service
public class OrderService {

    @Autowired
    private OrderMapper orderMapper;
    @Autowired
    private RestTemplate restTemplate;

    public Order queryOrderById(Long orderId) {
        Order order = orderMapper.findById(orderId);
        
        // 用服务名替代具体的 ip
        String url = "http://userservice/user/" + order.getUserId();
        // 发起调用
        User user = restTemplate.getForObject(url, User.class);
        order.setUser(user);
        return order;
    }
}

spring 会自动帮助我们从 eureka-server 端,根据 userservice 这个服务名称,获取实例列表,而后完成负载均衡。而这个服务拉取,不是每次请求服务都要进行的,服务消费者会拉取到服务列表后会进行缓存,后面的一段时间可以直接用缓存的服务列表信息。然后每隔 30 秒更新一次服务列表信息

Ribbon负载均衡

添加了 @LoadBalanced 注解,即可实现负载均衡功能,这是什么原理呢?

负载均衡原理

Spring Cloud 底层其实是利用了一个名为 Ribbon 的组件,来实现负载均衡功能的。

我们发出的请求明明是 http://userservice/user/1,怎么变成了 http://localhost:8081 的呢?实际上是做了一个映射,将 userservice 映射到了其他地址。

源码跟踪

为什么我们只输入了 service 名称就可以访问了呢?之前还要获取 IP 和端口。显然有人帮我们根据 service 名称,获取到了服务实例的 IP 和端口。它就是 LoadBalancerInterceptor,这个类会在对 RestTemplate 的请求进行拦截,然后从 Eureka 根据服务 id 获取服务列表,随后利用负载均衡算法得到真实的服务地址信息,替换服务 id

想看处理流程的可以进行源码跟踪。

LoadBalancerIntercepor

可以看到这里的 intercept 方法,拦截了用户的 HttpRequest 请求,然后做了几件事:

这里的 this.loadBalancerLoadBalancerClient 类型,我们继续跟入。

LoadBalancerClient

继续跟入 execute 方法:

代码是这样的:

放行后,再次访问并跟踪,发现获取的是 8081:

果然实现了负载均衡。

负载均衡策略IRule

在刚才的代码中,可以看到获取服务使通过一个 getServer 方法来做负载均衡:

我们继续跟入:

继续跟踪源码 chooseServer 方法,发现这么一段代码:

我们看看这个 rule 是谁:

这里的 rule 默认值是一个 RoundRobinRule,看类的介绍:

这不就是轮询的意思嘛。到这里,整个负载均衡的流程我们就清楚了。

总结

SpringCloudRibbon 的底层采用了一个拦截器,拦截了 RestTemplate 发出的请求,对地址做了修改。用一幅图来总结一下:

基本流程如下

负载均衡策略

负载均衡策略

负载均衡的规则都定义在 IRule 接口中,而 IRule 有很多不同的实现类:

不同规则的含义如下

内置负载均衡规则类 规则描述
RoundRobinRule 简单轮询服务列表来选择服务器。它是 Ribbon 默认的负载均衡规则。
AvailabilityFilteringRule 对以下两种服务器进行忽略:
(1)在默认情况下,这台服务器如果 3 次连接失败,这台服务器就会被设置为“短路”状态。短路状态将持续30秒,如果再次连接失败,短路的持续时间就会几何级地增加。
(2)并发数过高的服务器。如果一个服务器的并发连接数过高,配置了 AvailabilityFilteringRule 规则的客户端也会将其忽略。并发连接数的上限,可以由客户端的 <clientName>.<clientConfigNameSpace>.ActiveConnectionsLimit 属性进行配置。
WeightedResponseTimeRule 为每一个服务器赋予一个权重值。服务器响应时间越长,这个服务器的权重就越小。这个规则会随机选择服务器,这个权重值会影响服务器的选择。
ZoneAvoidanceRule 以区域可用的服务器为基础进行服务器的选择。使用 Zone 对服务器进行分类,这个 Zone 可以理解为一个机房、一个机架等。而后再对 Zone 内的多个服务做轮询。
BestAvailableRule 忽略那些短路的服务器,并选择并发数较低的服务器。
RandomRule 随机选择一个可用的服务器。
RetryRule 重试机制的选择逻辑

默认的实现就是 ZoneAvoidanceRule,是一种轮询方案

自定义负载均衡策略

通过定义 IRule 实现可以修改负载均衡规则,有两种方式:

代码方式:在 order-service 中的 OrderApplication 类中,定义一个新的 IRule,这样 order-service 在请求服务的时候就会根据我们配置的负载均衡规则进行查找最合适的服务。

@Bean
public IRule randomRule(){
    return new RandomRule();
}

配置文件方式:在 order-serviceapplication.yml 文件中,添加新的配置也可以修改规则

userservice: # 给某个微服务配置负载均衡规则,这里是userservice服务
  ribbon:
    NFLoadBalancerRuleClassName: com.netflix.loadbalancer.RandomRule # 负载均衡规则 

注意,一般用默认的负载均衡规则,不做修改。

饥饿加载

Ribbon 默认是采用懒加载,即第一次访问时才会去创建 LoadBalanceClient,因此第一次请求的时间会很长。

而饥饿加载则会在项目启动时创建,降低第一次访问的耗时,通过下面配置开启饥饿加载:

ribbon:
  eager-load:
    enabled: true
    clients: userservice

总结

Nacos注册中心

SpringCloudAlibaba 推出了一个名为 Nacos 的注册中心。

Nacos 是阿里巴巴的产品,现在是 SpringCloud 中的一个组件。相比 Eureka 功能更加丰富,在国内受欢迎程度较高。安装方式可以参考《Nacos安装指南.md》

服务注册到 nacos

NacosSpringCloudAlibaba 的组件,而 SpringCloudAlibaba 也遵循 SpringCloud 中定义的服务注册、服务发现规范。因此使用 Nacos 和使用 Eureka 对于微服务来说,并没有太大区别。

主要差异在于:

引入依赖

cloud-demo 父工程的 pom 文件中的 <dependencyManagement> 中引入 SpringCloudAlibaba 的依赖:

<dependency>
    <groupId>com.alibaba.cloud</groupId>
    <artifactId>spring-cloud-alibaba-dependencies</artifactId>
    <version>2.2.6.RELEASE</version>
    <type>pom</type>
    <scope>import</scope>
</dependency>

然后在 user-serviceorder-service 中的 pom 文件中引入 nacos-discovery 依赖:

<dependency>
    <groupId>com.alibaba.cloud</groupId>
    <artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId>
</dependency>

注意:不要忘了注释掉 eureka 的依赖。

配置nacos地址

user-serviceorder-serviceapplication.yml 中添加 nacos 地址

spring:
  cloud:
    nacos:
      server-addr: localhost:8848

注意:不要忘了注释掉 eureka 的地址

重启

重启微服务后,登录 nacos 管理页面,可以看到微服务信息

服务分级存储模型

一个服务可以有多个实例,例如我们的 user-service,可以有

假如这些实例分布于全国各地的不同机房,例如

Nacos 就将同一机房内的实例划分为一个集群

也就是说,user-service 是服务,一个服务可以包含多个集群,如杭州、上海,每个集群下可以有多个实例,形成分级模型,如图:

微服务互相访问时,应该尽可能访问同集群实例,因为本地访问速度更快。当本集群内不可用时,才访问其它集群。例如:

杭州机房内的 order-service 应该优先访问同机房的 user-service。

给 user-service 配置集群

修改 user-serviceapplication.yml 文件,添加集群配置

spring:
  cloud:
    nacos:
      server-addr: localhost:8848
      discovery:
        cluster-name: HZ # 集群名称

重启两个 user-service 实例后,我们可以在 nacos 控制台看到下面结果

我们再次复制一个 user-service 启动配置,添加属性

-Dserver.port=8083 -Dspring.cloud.nacos.discovery.cluster-name=SH

配置如图所示

启动 UserApplication3 后再次查看 nacos 控制台

同集群优先的负载均衡

默认的 ZoneAvoidanceRule 并不能实现根据同集群优先来实现负载均衡,它只是首次访问是会同集群的服务,后面就是轮询,一个一个依次使用了。

因此 Nacos 中提供了一个 NacosRule 的实现,可以优先从同集群中挑选实例。

1)给 order-service 配置集群信息

修改 order-serviceapplication.yml 文件,添加集群配置:

spring:
  cloud:
    nacos:
      server-addr: localhost:8848
      discovery:
        cluster-name: HZ # 集群名称

2)修改负载均衡规则

修改 order-serviceapplication.yml 文件,修改负载均衡规则:

userservice:
  ribbon:
    NFLoadBalancerRuleClassName: com.alibaba.cloud.nacos.ribbon.NacosRule # 负载均衡规则 

这样就是优先访问同集群的服务了,同集群内有多个服务的话,会随机选择一个服务进行访问。

Nacos 负载均衡策略

权重配置

实际部署中会出现这样的场景:

服务器设备性能有差异,部分实例所在机器性能较好,另一些较差,我们希望性能好的机器承担更多的用户请求。

但默认情况下 NacosRule 是同集群内随机挑选,不会考虑机器的性能问题。

因此,Nacos 提供了权重配置来控制访问频率,权重越大则访问频率越高。

在 nacos 控制台,找到 user-service 的实例列表,点击编辑,即可修改权重:

在弹出的编辑窗口,修改权重:

注意:如果权重修改为 0,则该实例永远不会被访问

可以通过权值配置,在需要进行服务升级时,先将一部分服务器的权值设置为 0,服务神升级后在将升级后的服务器权值设置成 0.1 用小批量用户测下升级后稳不稳定,稳定后再进行大批量的升级。

环境隔离

Nacos 提供了 namespace 来实现环境隔离功能。

创建 namespace

默认情况下,所有 service、data、group 都在同一个 namespace,名为 public

我们可以点击页面新增按钮,添加一个 namespace

然后,填写表单

就能在页面看到一个新的 namespace

给微服务配置 namespace

给微服务配置 namespace 只能通过修改配置来实现。例如,修改 order-service 的 application.yml 文件

spring:
  cloud:
    nacos:
      server-addr: localhost:8848
      discovery:
        cluster-name: HZ
        namespace: 492a7d5d-237b-46a1-a99a-fa8e98e4b0f9 # 命名空间,填ID

重启 order-service 后,访问控制台,可以看到下面的结果

此时访问 order-service,因为 namespace 不同,会导致找不到 userservice,控制台会报错

Nacos 与 Eureka 的区别

Nacos 会把服务实例划分为两种类型

Eureka 做服务拉取是 30 秒更新/拉取一次服务列表,更新的可能不够及时;而 nacos 中,如果采用的是非临时实例,会有消息推送,假设 nacos 发现有服务挂了会把主动推送变更消息到消费者那边,让消费者及时更新。但是 nacos 非临时实例的主动检测模式对服务器的压力比较大,一般都是采用临时实例。

配置一个服务实例为永久实例:

spring:
  cloud:
    nacos:
      discovery:
        ephemeral: false # 设置为非临时实例

Nacos 和 Eureka 整体结构类似,服务注册、服务拉取、心跳等待,但是也存在一些差异:

nacos 注册中心细节分析

二、实用篇概述

Nacos配置管理

Nacos 除了可以做注册中心,同样可以做配置管理来使用。

统一配置管理

当微服务部署的实例越来越多,达到数十、数百时,逐个修改微服务配置很麻烦,而且很容易出错。我们需要一种统一配置管理方案,可以集中管理所有实例的配置。

配置热更新

Nacos 一方面可以将配置集中管理,另一方可以在配置变更时,及时通知微服务,实现配置的热更新。

在nacos中添加配置文件

如何在 nacos 中管理配置呢?

然后在弹出的表单中,填写配置信息:

注意:项目的核心配置,需要热更新的配置才有放到 nacos 管理的必要。基本不会变更的一些配置还是保存在微服务本地比较好。

从微服务拉取配置

微服务要拉取 nacos 中管理的配置,并且与本地的 application.yml 配置合并,才能完成项目启动。但如果尚未读取 application.yml,又如何得知 nacos 地址呢?

因此 spring 引入了一种新的配置文件:bootstrap.yaml 文件,会在 application.yml 之前被读取,流程如下:

1)引入 nacos-config 依赖

首先,在 user-service 服务中,引入 nacos-config 的客户端依赖:

<!--nacos配置管理依赖-->
<dependency>
    <groupId>com.alibaba.cloud</groupId>
    <artifactId>spring-cloud-starter-alibaba-nacos-config</artifactId>
</dependency>

2)添加 bootstrap.yaml

然后,在 user-service 中添加一个 bootstrap.yaml 文件,内容如下:

spring:
  application:
    name: userservice # 服务名称
  profiles:
    active: dev #开发环境,这里是dev 
  cloud:
    nacos:
      server-addr: localhost:8848 # Nacos地址
      config:
        file-extension: yaml # 文件后缀名

这里会根据 spring.cloud.nacos.server-addr 获取 nacos 地址,再根据

${spring.application.name}-${spring.profiles.active}.${spring.cloud.nacos.config.file-extension} 作为文件 id,来读取配置。

本例中,就是去读取 userservice-dev.yaml

3)读取 nacos 配置

在 user-service 中的 UserController 中添加业务逻辑,读取 pattern.dateformat 配置:

@RestController
@RequestMapping("/user")
public class UserController {

    @Autowired
    private UserService userService;

    @Value("${pattern.dateformat}")
    private String dateformat;
    
    @GetMapping("now")
    public String now(){
        return LocalDateTime.now().format(DateTimeFormatter.ofPattern(dateformat));
    }
    // ...略
}

在页面访问,可以看到效果:

配置热更新

我们最终的目的,是修改 nacos 中的配置后,微服务中无需重启即可让配置生效,也就是配置热更新。

Nacos 中的配置文件变更后,微服务无需重启就可以感知,不过需要通过下面两种配置实现:

方式一

在 @Value 注入的变量所在类上添加注解 @RefreshScope

@RestController
@RequestMapping("/user")
@RefreshScope // 增加注解
public class UserController {

    @Autowired
    private UserService userService;

    @Value("${pattern.dateformat}")
    private String dateformat;
    
    @GetMapping("now")
    public String now(){
        return LocalDateTime.now().format(DateTimeFormatter.ofPattern(dateformat));
    }
    // ...略
}

方式二

使用 @ConfigurationProperties 注解代替 @Value 注解。

在 user-service 服务中,添加一个类,读取 patterrn.dateformat 属性:

package cn.itcast.user.config;

import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.stereotype.Component;

@Component
@Data
@ConfigurationProperties(prefix = "pattern")
public class PatternProperties {
    private String dateformat;
}

在 UserController 中使用这个类代替 @Value

@Slf4j
@RestController
@RequestMapping("/user")
public class UserController {

    @Autowired
    private UserService userService;

    @Autowired // 替代 @Value
    private PatternProperties patternProperties;

    @GetMapping("now")
    public String now(){
        return LocalDateTime.now().format(DateTimeFormatter.ofPattern(patternProperties.getDateformat()));
    }

}

配置共享

其实微服务启动时,会去 nacos 读取多个配置文件,例如:

[spring.application.name].yaml 不包含环境,因此可以被多个环境共享。

下面我们通过案例来测试配置共享

添加一个环境共享配置

我们在 nacos 中添加一个 userservice.yaml 文件:

在user-service中读取共享配置

在 user-service 服务中,修改 PatternProperties 类,读取新添加的属性:

@Component
@Data
@ConfigurationProperties(prefix = "pattern")
public class PatternProperties{
    private String dataformat;
    private String envSharedValue;
}

在 user-service 服务中,修改 UserController,添加一个方法:

@Slf4j
@RestController
@RequestMapping("/user")
public class UserController {

    @Autowired
    private UserService userService;

    @Autowired
    private PatternProperties patternProperties;
    
    @GetMapping("prop")
    public PatternProperties prop(){
        return patternProperties;
    }

}

运行两个UserApplication,使用不同的profile

修改 UserApplication2 这个启动项,改变其 profile 值:

这样,UserApplication(8081) 使用的 profile 是 dev,UserApplication2(8082) 使用的 profile 是 test。

启动 UserApplication 和 UserApplication2

访问 http://localhost:8081/user/prop,结果:

{
    "dataformat": "yyyy年MM月dd日 HH:mm:ss",
    "envSharedValue": "环境共享属性值"
}

访问 http://localhost:8082/user/prop,结果:

{
    "dataformat": null,
    "envSharedValue": "环境共享属性值"
}

可以看出来,不管是 dev,还是 test 环境,都读取到了 envSharedValue 这个属性的值。

配置共享的优先级

当 nacos、服务本地同时出现相同属性时,优先级有高低之分:

搭建Nacos集群

Nacos 生产环境下一定要部署为集群状态,部署方式参考课前资料中的文档《nacos集群搭建.md》

Feign远程调用

先前我们利用 RestTemplate 发起远程调用的代码

url 中的服务名是如何转换为具体的 IP 地址的

// 注册中心会将服务名替换为 user-service 服务的具体 ip 地址。
String url = "http://userservice/user/" + order.getUserId();
User user = restTemplate.getForObject(url, User.class);

但是 RestTemplate 存在下面的问题

Feign 是一个声明式的 http 客户端,官方地址:https://github.com/OpenFeign/feign;其作用就是帮助我们优雅的实现 http 请求的发送,解决上面提到的问题。

Feign替代RestTemplate

Fegin 的使用步骤如下

引入依赖

在 order-service 服务的 pom 文件中引入 feign 的依赖:

<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-openfeign</artifactId>
</dependency>

添加注解

在 order-service 的启动类添加注解开启 Feign 的功能

@MapperScan("cn.itcast.order.mapper")
@SpringBootApplication
@EnableFeignClients // 开启 Feign 功能
public class OrderApplication {

    public static void main(String[] args) {
        SpringApplication.run(OrderApplication.class, args);
    }
	// ...
}

编写Feign的客户端

在 order-service 中新建一个接口

package cn.itcast.order.client;

import cn.itcast.order.pojo.User;
import org.springframework.cloud.openfeign.FeignClient;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;

@FeignClient("userservice")
public interface UserClient {
    @GetMapping("/user/{id}")
    User findById(@PathVariable("id") Long id);
}

这个客户端主要是基于 Spring MVC 的注解来声明远程调用的信息,比如:

这样,Feign 就可以帮助我们发送 http 请求(无需我们写具体的调用代码,会自动生成)无需自己使用 RestTemplate 来发送了。

测试

修改 order-service 中的 OrderService 类中的 queryOrderById 方法,使用 Feign 客户端代替 RestTemplate;看起来优雅多了。

@Service
public class OrderService {

    @Autowired
    private OrderMapper orderMapper;
    @Autowired
    private UserClient userClient;

    public Order queryOrderById(Long orderId) {
        Order order = orderMapper.findById(orderId);
        // 利用 Feign 发起 HTTP 请求查询用户
        User user = userClient.findById(order.getUserId());
        order.setUser(user);
        return order;
    }
}

总结

使用 Feign 的步骤

① 引入依赖

② 主程序启动类上添加 @EnableFeignClients 注解

③ 编写 FeignClient 接口(只需要告诉程序要去这个远程地址请求数据,数据的返回类型是什么,因此定义一个接口,接口中声明方法即可,后面应该会自动生成对象执行方法的调用)

④ 使用 FeignClient 中定义的方法代替 RestTemplate

自定义配置

Feign 可以支持很多的自定义配置,如下所示

类型 作用 说明
feign.Logger.Level 修改日志级别 包含四种不同的级别:NONE、BASIC、HEADERS、FULL
feign.codec.Decoder 响应结果的解析器 http 远程调用的结果做解析,例如解析 json 字符串为 java 对象
feign.codec.Encoder 请求参数编码 将请求参数编码,便于通过 http 请求发送
feign.Contract 支持的注解格式 默认是 Spring MVC 的注解
feign.Retryer 失败重试机制 请求失败的重试机制,默认是没有,但是 feign 底层是依赖于 Ribbon 的,Ribbon 自己会进行重试。(如有多个服务 A,B,C,请求 A 长时间没有响应那就继续请求 B)

一般情况下,默认值就能满足我们使用,如果要自定义时,只需要创建自定义的 @Bean 覆盖默认 Bean 即可。下面以日志为例来演示如何自定义配置。

配置文件方式

基于配置文件修改 feign 的日志级别可以针对单个服务:(我们在 Order 里调用了 User,那么 Feign 的配置就写在 Order 的配置文件中)

feign:  
  client:
    config: 
      userservice: # 针对某个微服务的配置
        loggerLevel: FULL #  日志级别 

也可以针对所有服务

feign:  
  client:
    config: 
      default: # 这里用default就是全局配置,如果是写服务名称,则是针对某个微服务的配置
        loggerLevel: FULL #  日志级别 

而日志的级别分为四种

Java代码方式

也可以基于 Java 代码来修改日志级别,先声明一个类,然后声明一个 Logger.Level 的对象

public class DefaultFeignConfiguration  {
    @Bean
    public Logger.Level feignLogLevel(){
        return Logger.Level.BASIC; // 日志级别为BASIC
    }
}

如果要全局生效,将其放到启动类的 @EnableFeignClients 这个注解中

@EnableFeignClients(defaultConfiguration = DefaultFeignConfiguration.class) 

如果是局部生效,则把它放到对应的 @FeignClient 这个注解中(uservice 只对 uservice 这个服务有效)

@FeignClient(value = "userservice", configuration = DefaultFeignConfiguration.class) 

Feign使用优化

Feign 底层发起 http 请求,依赖于其它的框架。其底层客户端实现包括:

因此提高 Feign 的性能主要手段就是使用连接池代替默认的 URLConnection。

这里我们用 Apache 的 HttpClient 来演示。

1)引入依赖

在 order-service 的 pom 文件中引入 Apache 的 HttpClient 依赖

<!--httpClient的依赖 -->
<dependency>
    <groupId>io.github.openfeign</groupId>
    <artifactId>feign-httpclient</artifactId>
</dependency>

2)配置连接池

在 order-service 的 application.yml 中添加配置

feign:
  client:
    config:
      default: # default全局的配置
        loggerLevel: BASIC # 日志级别,BASIC就是基本的请求和响应信息
  httpclient:
    enabled: true # 开启feign对HttpClient的支持
    max-connections: 200 # 最大的连接数
    max-connections-per-route: 50 # 每个路径的最大连接数

接下来,在 FeignClientFactoryBean 中的 loadBalance 方法中打断点

Debug 方式启动 order-service 服务,可以看到这里的 client,底层就是 Apache HttpClient

总结,Feign 的优化

1.日志级别尽量用 basic

2.使用 HttpClient 或 OKHttp 代替 URLConnection

①引入 feign-httpClient 依赖

②配置文件开启 httpClient 功能,设置连接池参数

HTTP 连接池复用扩展

Http持久连接与HttpClient连接池 - kingszelda - 博客园 (cnblogs.com)

HttpClien 中使用了连接池来管理持有连接,同一条 TCP 链路上,连接是可以复用的。HttpClient 通过连接池的方式进行连接持久化。

最佳实践

所谓最佳实践,就是使用过程中总结的经验,最好的一种使用方式。观察可以发现,Feign 的客户端与服务提供者的 Controller 代码非常相似。

feign 客户端

@FeignClient("userservice")
public interface UserClient {
    @GetMapping("/user/{id}")
    User findById(@PathVariable("id") Long id);
}

UserController

@GetMapping("/user/{id}")
public User queryById(@PathVariable("id") Long id) {
    System.out.println(truth);
    return userService.queryById(id);
}

有没有一种办法简化这种重复的代码编写呢?

Feign 的最佳实现

继承方式

一样的代码可以通过继承来共享:

1)定义一个 API 接口,利用定义方法,并基于 Spring MVC 注解做声明。

2)Feign 客户端和 Controller 都继承接口

优点

缺点

抽取方式

将 Feign 的 Client 抽取为独立模块,并且把接口有关的 POJO、默认的 Feign 配置都放到这个模块中,提供给所有消费者使用。例如,将 UserClient、User、Feign 的默认配置都抽取到一个 feign-api 包中,所有微服务引用该依赖包,即可直接使用。

实现基于抽取的最佳实践

抽取

首先创建一个 module,命名为 feign-api

项目结构

在 feign-api 中然后引入 feign 的 starter 依赖

<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-openfeign</artifactId>
</dependency>

然后,order-service 中编写的 UserClient、User、DefaultFeignConfiguration 都复制到 feign-api 项目中

在order-service中使用feign-api

首先,删除 order-service 中的 UserClient、User、DefaultFeignConfiguration 等类或接口。在 order-service 的 pom 文件中引入 feign-api 的依赖

<dependency>
    <groupId>cn.itcast.demo</groupId>
    <artifactId>feign-api</artifactId>
    <version>1.0</version>
</dependency>

修改 order-service 中的所有与上述三个组件有关的导包部分,改成导入 feign-api 中的包

重启测试

重启后,发现服务报错了

这是因为 UserClient 现在在 cn.itcast.feign.clients 包下,而 order-service 的 @EnableFeignClients 注解是在 cn.itcast.order 包下,不在同一个包,无法扫描到 UserClient。

解决扫描包问题

方式一,指定 Feign 应该扫描的包

@EnableFeignClients(basePackages = "cn.itcast.feign.clients")

方式二,指定需要加载的 Client 接口

@EnableFeignClients(clients = {UserClient.class})

Gateway服务网关

Spring Cloud Gateway 是 Spring Cloud 的一个全新项目,该项目是基于 Spring 5.0,Spring Boot 2.0 和 Project Reactor 等响应式编程和事件流技术开发的网关,它旨在为微服务架构提供一种简单有效的统一的 API 路由管理方式。

为什么需要网关

Gateway 网关是所有微服务的统一入口。先判断请求是不是可以放行,可以放行才会把请求转发到对应的微服务。网关可以对请求进行校验,符合要求才放行;可以对请求进行加工,如添加/修改/删除请求头;可以做服务的负载均衡;由于请求是由网关进行分发的,且如果需要请求数据,最终也是由网关发送数据给客户端的,因此最终请求的到底是那个服务器,到底是用的那个接口对客户端来说是未知的,可以隐藏具体业务的请求地址;

网关的核心功能特性

架构图如下

权限控制:网关作为微服务入口,需要校验用户是是否有请求资格,如果没有则进行拦截。

路由和负载均衡:一切请求都必须先经过 gateway,但网关不处理业务,而是根据某种规则,把请求转发到某个微服务,这个过程叫做路由。当然路由的目标服务有多个时,还需要做负载均衡。

限流:当请求流量过高时,在网关中按照下流的微服务能够接受的速度来放行请求,避免服务压力过大。

在 Spring Cloud 中网关的实现包括两种:

Zuul 是基于 Servlet 的实现,属于阻塞式编程。而 Spring Cloud Gateway 则是基于 Spring5 中提供的 WebFlux,属于响应式编程的实现,具备更好的性能。

可以搭建Gateway集群吗?

Gateway 是无法搭建集群的,不过可以配合 Nginx 实现集群的搭建及负载均衡。集群搭建很简单, Gateway 项目自己不用做改动。

Nginx集群(负载均衡) - 休耕 - 博客园 (cnblogs.com)

gateway快速入门

下面,我们就演示下网关的基本路由功能。基本步骤如下

  1. 创建 Spring Boot 工程 gateway,引入网关依赖
  2. 编写启动类
  3. 编写基础配置和路由规则
  4. 启动网关服务进行测试

创建gateway服务,引入依赖

创建服务

引入依赖:网关本身也是一个微服务,也要注册到 nacos 中去。

<!--网关-->
<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-gateway</artifactId>
</dependency>
<!--nacos服务发现依赖-->
<dependency>
    <groupId>com.alibaba.cloud</groupId>
    <artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId>
</dependency>

编写启动类

package cn.itcast.gateway;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class GatewayApplication {

	public static void main(String[] args) {
		SpringApplication.run(GatewayApplication.class, args);
	}
}

编写基础配置和路由规则

创建 application.yml 文件,内容如下:

server:
  port: 10010 # 网关端口
spring:
  application:
    name: gateway # 服务名称
  cloud:
    nacos:
      server-addr: localhost:8848 # nacos地址
    gateway:
      routes: # 网关路由配置
        - id: user-service # 路由id,自定义,只要唯一即可
          # uri: http://127.0.0.1:8081 # 路由的目标地址 http就是固定地址
          uri: lb://userservice # 路由的目标地址 lb就是负载均衡,后面跟服务名称
          predicates: # 路由断言,也就是判断请求是否符合路由规则的条件
            - Path=/user/** # 这个是按照路径匹配,只要以/user/开头就符合要求

我们将符合 Path 规则的一切请求,都代理到 uri 参数指定的地址。

本例中,我们将 /user/** 开头的请求,代理到 lb://userservice,lb 是负载均衡,根据服务名拉取服务列表,实现负载均衡。

重启测试

重启网关,访问 http://localhost:10010/user/1时,符合 /user/** 规则,请求转发到 uri:http://userservice/user/1,得到了结果

{
    "id": 1,
    "username": "xxx",
    "address": "xxxx"
}

网关路由的流程图

整个访问的流程如下

网关搭建步骤

  1. 创建项目,引入 nacos 服务发现和 gateway 依赖

  2. 配置 application.yml,包括服务基本信息、nacos 地址、路由

路由配置包括

  1. 路由 id:路由的唯一标示

  2. 路由目标(uri):路由的目标地址,http 代表固定地址,lb 代表根据服务名负载均衡

  3. 路由断言(predicates):判断路由的规则,

  4. 路由过滤器(filters):对请求或响应做处理

接下来,就重点来学习路由断言和路由过滤器的详细知识

断言工厂

我们在配置文件中写的断言规则只是字符串,这些字符串会被 Predicate Factory 读取并处理,转变为路由判断的条件

例如 Path=/user/** 是按照路径匹配,这个规则是由

org.springframework.cloud.gateway.handler.predicate.PathRoutePredicateFactory 类来处理的,像这样的断言工厂在 Spring Cloud Gateway 还有十几个

名称 说明 示例
After 是某个时间点后的请求 - After=2037-01-20T17:42:47.789-07:00[America/Denver]
Before 是某个时间点之前的请求 - Before=2031-04-13T15:14:47.433+08:00[Asia/Shanghai]
Between 是某两个时间点之前的请求 - Between=2037-01-20T17:42:47.789-07:00[America/Denver], 2037-01-21T17:42:47.789-07:00[America/Denver]
Cookie 请求必须包含某些 cookie - Cookie=chocolate, ch.p
Header 请求必须包含某些 header - Header=X-Request-Id, \d+
Host 请求必须是访问某个 host(域名) - Host=**.somehost.org.**.anotherhost.org
Method 请求方式必须是指定方式 - Method=GET,POST
Path 请求路径必须符合指定规则 - Path=/red/{segment},/blue/**
Query 请求参数必须包含指定参数 - Query=name, Jack 或者- Query=name
RemoteAddr 请求者的 ip 必须是指定范围 - RemoteAddr=192.168.1.1/24
Weight 权重处理  

我们只需要掌握 Path 这种路由工程就可以了。

过滤器工厂

Gateway Filter 是网关中提供的一种过滤器,可以对进入网关的请求和微服务返回的响应做处理,如添加请求头。

路由过滤器的种类

Spring 提供了 31 种不同的路由过滤器工厂。

名称 说明
AddRequestHeader 给当前请求添加一个请求头
RemoveRequestHeader 移除请求中的一个请求头
AddResponseHeader 给响应结果中添加一个响应头
RemoveResponseHeader 从响应结果中移除有一个响应头
RequestRateLimiter 限制请求的流量

请头过滤器

以 AddRequestHeader 为例。

需求:给所有进入 userservice 的请求添加一个请求头:Truth=itcast is freaking awesome!

只需要修改 gateway 服务的 application.yml 文件,添加路由过滤即可

spring:
  cloud:
    gateway:
      routes:
      - id: user-service 
        uri: lb://userservice 
        predicates: 
        - Path=/user/** 
        filters: # 过滤器
        - AddRequestHeader=Truth, Itcast is freaking awesome! # 添加请求头

当前过滤器写在 userservice 路由下,因此仅仅对访问 userservice 的请求有效。

默认过滤器

如果要对所有的路由都生效,则可以将过滤器工厂写到 default 下。格式如下

spring:
  cloud:
    gateway:
      routes:
      - id: user-service 
        uri: lb://userservice 
        predicates: 
        - Path=/user/**
      default-filters: # 默认过滤项
      - AddRequestHeader=Truth, Itcast is freaking awesome! 

总结

过滤器的作用是什么?

①对路由的请求或响应做加工处理,比如添加请求头

②配置在路由下的过滤器只对当前路由的请求生效

defaultFilters 的作用是什么?

①对所有路由都生效的过滤器

全局过滤器

上一节学习的过滤器,网关提供了 31 种,但每一种过滤器的作用都是固定的。如果我们希望拦截请求,做自己的业务逻辑则没办法实现。

全局过滤器作用

全局过滤器的作用也是处理一切进入网关的请求和微服务响应,与 Gateway Filter 的作用一样。区别在于 Gateway Filter 是通过配置定义的,处理逻辑是固定的;而 Global Filter 的逻辑需要自己写代码实现,可以实现自定义逻辑。

定义方式是实现 GlobalFilter 接口。

public interface GlobalFilter {
    /**
     *  处理当前请求,有必要的话通过{@link GatewayFilterChain}将请求交给下一个过滤器处理
     *
     * @param exchange 请求上下文,里面可以获取Request、Response等信息
     * @param chain 用来把请求委托给下一个过滤器 
     * @return {@code Mono<Void>} 返回标示当前过滤器业务结束
     */
    Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain);
}

在 filter 中编写自定义逻辑,可以实现下列功能

自定义全局过滤器

需求:定义全局过滤器,拦截请求,判断请求的参数是否满足下面条件

如果同时满足则放行,否则拦截

实现:在 gateway 中定义一个过滤器

package cn.itcast.gateway.filters;

import org.springframework.cloud.gateway.filter.GatewayFilterChain;
import org.springframework.cloud.gateway.filter.GlobalFilter;
import org.springframework.core.annotation.Order;
import org.springframework.http.HttpStatus;
import org.springframework.stereotype.Component;
import org.springframework.web.server.ServerWebExchange;
import reactor.core.publisher.Mono;

@Order(-1)
@Component
public class AuthorizeFilter implements GlobalFilter {
    @Override
    public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
        // 1.获取请求参数
        MultiValueMap<String, String> params = exchange.getRequest().getQueryParams();
        // 2.获取authorization参数
        String auth = params.getFirst("authorization");
        // 3.校验
        if ("admin".equals(auth)) {
            // 放行
            return chain.filter(exchange);
        }
        // 4.拦截
        // 4.1.禁止访问,设置状态码
        exchange.getResponse().setStatusCode(HttpStatus.FORBIDDEN);
        // 4.2.结束处理
        return exchange.getResponse().setComplete();
    }
}

过滤器执行顺序

请求进入网关会碰到三类过滤器:当前路由的过滤器、DefaultFilter、GlobalFilter

请求路由后,会将当前路由过滤器和 DefaultFilter、GlobalFilter,合并到一个过滤器链(集合)中 [通过适配器模式让它们都变成 GlobalFilter 类型的过滤器],排序后依次执行每个过滤器:

排序的规则是什么呢?

详细内容,可以查看源码:

org.springframework.cloud.gateway.route.RouteDefinitionRouteLocator#getFilters() 方法是先加载 defaultFilters,然后再加载某个 route 的 filters,然后合并。

org.springframework.cloud.gateway.handler.FilteringWebHandler#handle() 方法会加载全局过滤器,与前面的过滤器合并后根据 order 排序,组织过滤器链

跨域问题

什么是跨域问题

跨域:域名不一致就是跨域,主要包括:

网关是基于 WebFlux 实现的,先前解决的跨域问题是解决的 Servlet 的跨域问题,在这里不一定适用。

跨域问题:客户端通过 Ajax 向服务器端发送请求时,浏览器会禁止请求的发起者与服务端发生跨。不过,多个微服务之间,不会存在跨域问题。

解决方案:CORS,浏览器询问服务器,允不允许它跨域。可查看

模拟跨域问题

找到课前资料的页面文件

放入 tomcat 或者 nginx 这样的 web 服务器中,启动并访问。

可以在浏览器控制台看到下面的错误:

从 localhost:8090 访问 localhost:10010,端口不同,显然是跨域的请求。

解决跨域问题

在 gateway 服务的 application.yml 文件中,添加下面的配置

spring:
  cloud:
    gateway:
      # 。。。
      globalcors: # 全局的跨域处理
        add-to-simple-url-handler-mapping: true # 解决options请求被拦截问题
        corsConfigurations:
          '[/**]': # 拦截一切请求
            allowedOrigins: # 允许哪些网站的跨域请求 
              - "http://localhost:8090"
            allowedMethods: # 允许的跨域ajax的请求方式
              - "GET"
              - "POST"
              - "DELETE"
              - "PUT"
              - "OPTIONS"
            allowedHeaders: "*" # 允许在请求中携带的头信息
            allowCredentials: true # 是否允许携带cookie
            maxAge: 360000 # 这次跨域检测的有效期

三、高级篇概述

Sentinel限流

Sentinel,用来做什么的?

初识Sentinel

雪崩问题

graph LR
请求-->|A 的 tomcat 资源耗尽|A服务
A服务-->B服务
A服务-->C服务
A服务-->|D出现故障|D服务

A 服务的部分请求依赖于 D 服务,此时 D 服务出现故障无法正常执行。由于 A 中依赖于 D 的业务等不到 D 的结果,A 的业务被阻塞住,无法释放连接。如果多个同类型请求来了,会导致 A 中 tomcat 的资源耗尽(线程数),从而拖垮 A。

解决雪崩问题的常见方式有四种

流量控制是用来预防雪崩问题的,而其他三种是出现了问题用来避免故障传递到其他服务。

服务保护技术对比

- Sentinel Hystrix
隔离策略 信号量隔离 线程池隔离 / 信号量隔离
熔断降级策略 基于慢调用比例或异常比率 基于失败比率
实时指标实现 滑动窗口 滑动窗口(基于 RxJava)
规则配置 支持多种数据源 支持多种数据源
扩展性 多个扩展点 插件形式
基于注解的支持 支持 支持
限流 基于 QPS,支持基于调用关系的限流 有限的支持
流量整型 支持慢启动、匀速排队模式 不支持
系统自适应包含 支持 不支持
控制台 开箱即用,可配置规则、查看秒级监控、机器发现等 不完善
常见框架适配 Servlet、Spring Cloud、Dubbo、gRPC 等 Servlet、Spring Cloud Netfix

线程池隔离是为每个被隔离的业务创建独立的线程池,自然就有独立的线程,会比 tomcat 原始的处理方式多出很多线程。虽然隔离性比较好,但是线程数一多,线程上下文也会随之变多,系统的性能也就随之降低了。

信号量隔离则是限制每个业务可使用的最大线程数,没有可用的线程了就阻塞业务。这样就减少了线程的创建数,上下文切换自然也就少了,也不会影响系统的整体性能。

Sentinel 会统计慢调用的比例是认为,慢调用可能会拖垮整个系统。

Sentinel介绍和安装

认识Sentinel

Sentinel 是阿里巴巴开源的一款微服务流量控制组件。官网地址:https://sentinelguard.io/zh-cn/index.html

Sentinel 具有以下特征:

安装Sentinel

1)下载

sentinel 官方提供了 UI 控制台,方便我们对系统做限流设置。可以在 GitHub 下载 sentinel-dashboad-x.x.x.jar

2)运行

将 jar 包放到任意非中文目录,执行命令:

java -jar sentinel-dashboard-1.8.1.jar

如果要修改 Sentinel 的默认端口、账户、密码,可以通过下列配置

配置项 默认值 说明
server.port 8080 服务端口
sentinel.dashboard.auth.username sentinel 默认用户名
sentinel.dashboard.auth.password sentinel 默认密码

例如,修改端口

java -Dserver.port=8090 -jar sentinel-dashboard-1.8.1.jar

3)访问

访问 http://localhost:8080 页面,就可以看到 sentinel 的控制台了

需要输入账号和密码,默认都是:sentinel

登录后,发现一片空白,什么都没有

这是因为还没有与微服务整合。

微服务整合Sentinel

要使用 Sentinel 需要结合微服务,这里使用同级目录下的 sentinel-demo 工程。其项目结构如下

graph LR
sentinel-demo-->gateway
sentinel-demo-->|用户服务, 包括用户的 CRUD|user-service
sentinel-demo-->|订单服务, 调用 user-service|order-service
sentinel-demo-->|用户服务对外暴露的 feign 客户端,实体类|feign-api

在 order-service 中整合 sentinel,并连接 sentinel 的控制台

1)引入 sentinel 依赖。

<!--sentinel-->
<dependency>
    <groupId>com.alibaba.cloud</groupId> 
    <artifactId>spring-cloud-starter-alibaba-sentinel</artifactId>
</dependency>

2)配置 sentinel 监控控制台的地址,让服务与 sentinel 做交互。

修改 application.yaml 文件,添加下面内容

server:
  port: 8088
spring:
  cloud: 
    sentinel:
      transport:
        dashboard: localhost:8080

3)访问 order-service 的任意端点,触发 sentinel 的监控。

打开浏览器,访问 http://localhost:8088/order/101,这样才能触发 sentinel 的监控。

然后再访问 sentinel 的控制台,查看效果

流量控制

雪崩问题虽然有四种方案,但是限流是避免服务因突发的流量而发生故障,是对微服务雪崩问题的预防。我们先学习这种模式。

簇点链路

当请求进入微服务时,首先会访问 DispatcherServlet,然后进入 Controller、Service、Mapper,这样的一个调用链就叫做簇点链路簇点链路中被监控的每一个接口就是一个资源。

簇点链路就是项目内的调用链路,链路中被监控的每个接口就是一个资源。默认情况下 sentinel 会监控 SpringMVC 的每一个端点(Endpoint,也就是 Controller 中的方法),因此 SpringMVC 的每一个端点(Endpoint)就是调用链路中的一个资源。如果希望 Service、Mapper 也被监控,则需要使用 Sentinel 提供的注解进行监控。

例如,刚才访问的 order-service 中的 OrderController 中的端点:/order/{orderId}

流控、熔断等都是针对簇点链路中的资源来设置的,因此可以点击对应资源后面的按钮来设置规则

快速入门

示例

点击资源 /order/{orderId} 后面的流控按钮,就可以弹出表单。

表单中可以填写限流规则,如下:

其含义是限制 /order/{orderId} 这个资源的单机 QPS 为 1,即每秒只允许 1 次请求,超出的请求会被拦截并报错。针对来源设置为 default 表示一切进来的请求都要被限,一般都是 default。

练习

需求:给 /order/{orderId} 这个资源设置流控规则,QPS 不能超过 5,然后测试。

1)首先在 sentinel 控制台添加限流规则

2)利用 jmeter 测试

如果没有用过 jmeter,可以参考资料《Jmeter 快速入门.md》

课前资料提供了编写好的 Jmeter 测试样例:

打开 jmeter,导入提供的测试样例

选择

20 个用户,2 秒内运行完,QPS 是 10,超过了 5.

选中 流控入门,QPS<5 右键运行:

注意,不要点击菜单中的执行按钮来运行。

可以看到,成功的请求每次只有 5 个

流控模式

在添加限流规则时,点击高级选项,可以选择三种流控模式

快速入门测试的就是直接模式。

关联模式

关联模式:统计与当前资源相关的另一个资源,触发阈值时,对当前资源限流。

使用场景:比如用户支付时需要修改订单状态,同时用户要查询订单。查询和修改操作会争抢数据库锁,产生竞争。业务需求是优先支付和更新订单的业务,因此当修改订单业务触发阈值时,需要对查询订单业务限流。

配置规则

语法说明:当 /write 资源访问量触发阈值时,就会对 /read 资源限流,避免影响 /write 资源。

需求说明

1)定义 /order/query 端点,模拟订单查询

@GetMapping("/query")
public String queryOrder() {
    return "查询订单成功";
}

2)定义 /order/update 端点,模拟订单更新

@GetMapping("/update")
public String updateOrder() {
    return "更新订单成功";
}

重启服务,查看 sentinel 控制台的簇点链路

3)配置流控规则

对哪个端点限流,就点击哪个端点后面的按钮。我们是对订单查询 /order/query 限流,因此点击它后面的按钮

在表单中填写流控规则

4)在 Jmeter 测试

选择《流控模式-关联》:

可以看到 1000 个用户,100 秒,因此 QPS 为 10,超过了我们设定的阈值:5

查看 http 请求:

请求的目标是 /order/update,这样这个断点就会触发阈值。

但限流的目标是 /order/query,我们在浏览器访问,可以发现:

确实被限流了。

5)总结

满足下面条件可以使用关联模式

链路模式

链路模式:只针对从指定链路访问到本资源的请求做统计,判断是否超过阈值。

配置示例

例如有两条请求链路

如果只希望统计从 /test2 进入到 /common 的请求,则可以这样配置

实战案例

需求:有查询订单和创建订单业务,两者都需要查询商品。针对从查询订单进入到查询商品的请求统计,并设置限流。

graph LR
查询订单-->|调用|查询商品
创建订单-->|调用|查询商品

步骤:

  1. 在 OrderService 中添加一个 queryGoods 方法,不用实现业务

  2. 在 OrderController 中,改造 /order/query 端点,调用 OrderService 中的 queryGoods 方法

  3. 在 OrderController 中添加一个 /order/save 的端点,调用 OrderService 的 queryGoods 方法

  4. 给 queryGoods 设置限流规则,从 /order/query 进入 queryGoods 的方法限制 QPS 必须小于 2

实现

1)添加查询商品方法

在 order-service 服务中,给 OrderService 类添加一个 queryGoods 方法

public void queryGoods(){
    System.err.println("查询商品");
}

2)查询订单时,查询商品

在 order-service 的 OrderController 中,修改 /order/query 端点的业务逻辑

@GetMapping("/query")
public String queryOrder() {
    // 查询商品
    orderService.queryGoods();
    // 查询订单
    System.out.println("查询订单");
    return "查询订单成功";
}

3)新增订单,查询商品

在 order-service 的 OrderController 中,修改 /order/save 端点,模拟新增订单

@GetMapping("/save")
public String saveOrder() {
    // 查询商品
    orderService.queryGoods();
    // 查询订单
    System.err.println("新增订单");
    return "新增订单成功";
}

4)给查询商品添加资源标记

默认情况下,OrderService 中的方法是不被 Sentinel 监控的,需要我们自己通过注解来标记要监控的方法。

给 OrderService 的 queryGoods 方法添加 @SentinelResource 注解

@SentinelResource("goods")
public void queryGoods(){
    System.err.println("查询商品");
}

链路模式中,是对不同来源的两个链路做监控。但是 sentinel 默认会给进入 SpringMVC 的所有请求设置同一个 root 资源,会导致链路模式失效。(不同来源是指不是父子路径?)

我们需要关闭这种对 SpringMVC 的资源聚合,修改 order-service 服务的 application.yml 文件。

(查阅相关文档,未发现相关资料,搜到的相关博客如下)

从1.6.3 版本开始,Sentinel Web filter 默认收敛所有 URL 的入口 context,因此链路限流不生效。

1.7.0 版本开始(对应 SCA 的 2.1.1.RELEASE),官方在 CommonFilter 引入了WEB_CONTEXT_UNIFY 参数,用于控制是否收敛 context。将其配置为 false 即可根据不同的 URL 进行链路限流。

本篇文章使用的 SCA 是 2.2.5 版本,也就是可以直接用这个 web-context-unify: false 来使链路限流生效

spring:
  cloud:
    sentinel:
      web-context-unify: false # 关闭context整合

重启服务,访问 /order/query 和 /order/save,可以查看到 sentinel 的簇点链路规则中,出现了新的资源:

5)添加流控规则

点击 goods 资源后面的流控按钮,在弹出的表单中填写下面信息:

只统计从 /order/query 进入 /goods 的资源,QPS 阈值为 2,超出则被限流。

6)Jmeter 测试

选择《流控模式-链路》

可以看到这里 200 个用户,50 秒内发完,QPS 为 4,超过了我们设定的阈值 2

一个 http 请求是访问 /order/save

运行的结果

完全不受影响。

另一个是访问 /order/query

运行结果

每次只有 2 个通过。

总结

流控模式有哪些?

流控效果

在流控的高级选项中,还有一个流控效果选项

流控效果是指请求达到流控阈值时应该采取的措施,包括三种

warm up

阈值一般是一个微服务能承担的最大 QPS,但是一个服务刚刚启动时,一切资源尚未初始化(冷启动),如果直接将 QPS 跑到最大值,可能导致服务瞬间宕机。

warm up 也叫预热模式,是应对服务冷启动的一种方案。请求阈值初始值是 maxThreshold / coldFactor,持续指定时长后,逐渐提高到 maxThreshold 值。而 coldFactor 的默认值是 3.

例如,我设置 QPS 的 maxThreshold 为 10,预热时间为 5 秒,那么初始阈值就是 10 / 3 ,也就是 3,然后在 5 秒后逐渐增长到 10.

案例

需求:给 /order/{orderId} 这个资源设置限流,最大 QPS 为 10,利用 warm up 效果,预热时长为 5 秒

1)配置流控规则

2)Jmeter测试

选择《流控效果,warm up》

QPS 为 10.

刚刚启动时,大部分请求失败,成功的只有 3 个,说明 QPS 被限定在 3

随着时间推移,成功比例越来越高

到 Sentinel 控制台查看实时监控

一段时间后

排队等待

当请求超过 QPS 阈值时,快速失败和 warm up 会拒绝新的请求并抛出异常。

而排队等待则是让所有请求进入一个队列中,然后按照阈值允许的时间间隔依次执行。后来的请求必须等待前面执行完成,如果请求预期的等待时间超出最大时长,则会被拒绝。

例如:QPS=5,意味着每 200ms 处理一个队列中的请求;timeout = 2000,意味着预期等待时长超过 2000ms 的请求会被拒绝并抛出异常。

那什么叫做预期等待时长呢?

比如现在一下子来了 12 个请求,因为每 200ms 执行一个请求,那么

现在,第 1 秒同时接收到 10 个请求,但第 2 秒只有 1 个请求,此时 QPS 的曲线这样的:

如果使用队列模式做流控,所有进入的请求都要排队,以固定的 200ms 的间隔执行,QPS 会变的很平滑:

平滑的 QPS 曲线,对于服务器来说是更友好的。

案例

需求:给 /order/{orderId} 这个资源设置限流,最大 QPS 为 10,利用排队的流控效果,超时时长设置为 5s

1)添加流控规则

2)Jmeter 测试

选择《流控效果,队列》:

QPS 为 15,已经超过了我们设定的 10。

如果是之前的快速失败、warm up 模式,超出的请求应该会直接报错。

但是我们看看队列模式的运行结果,全部都通过了。

再去 sentinel 查看实时监控的 QPS 曲线

QPS 非常平滑,一致保持在 10,但是超出的请求没有被拒绝,而是放入队列。因此响应时间(等待时间)会越来越长。当队列满了以后,才会有部分请求失败。

总结

流控效果有哪些?

热点参数限流

之前的限流是统计访问某个资源的所有请求,判断是否超过 QPS 阈值。而热点参数限流是分别统计参数值相同的请求,判断是否超过 QPS 阈值。

全局参数限流

例如,一个根据 id 查询商品的接口

访问 /goods/{id} 的请求中,id 参数值会有变化,热点参数限流会根据参数值分别统计 QPS,统计结果:

当 id=1 的请求触发阈值被限流时,id 值不为 1 的请求不受影响。

配置示例

代表的含义是:对 hot 这个资源的 0 号参数(第一个参数)做统计,每 1 秒相同参数值的请求数不能超过 5

热点参数限流

刚才的配置中,对查询商品这个接口的所有商品一视同仁,QPS 都限定为 5.

而在实际开发中,可能部分商品是热点商品,例如秒杀商品,我们希望这部分商品的 QPS 限制与其它商品不一样,高一些。那就需要配置热点参数限流的高级选项了:

结合上一个配置,这里的含义是对 0 号的 long 类型参数限流,每 1 秒相同参数的 QPS 不能超过 5,有两个例外:

案例

案例需求:给 /order/{orderId} 这个资源添加热点参数限流,规则如下

注意事项:热点参数限流对默认的 SpringMVC 资源无效,需要利用 @SentinelResource 注解标记资源

1)标记资源

给 order-service 中的 OrderController 中的 /order/{orderId} 资源添加注解

@RestController
@RequestMapping("order")
public class OrderController {

   @Autowired
   private OrderService orderService;

    @SentinelResource("hot") // 标记资源
    @GetMapping("{orderId}")
    public Order queryOrderByUserId(@PathVariable("orderId") Long orderId) {
        // 根据id查询订单并返回
        return orderService.queryOrderById(orderId);
    }
}

2)热点参数限流规则

访问该接口,可以看到我们标记的 hot 资源出现了

这里不要点击 hot 后面的按钮,页面有 BUG

点击左侧菜单中热点规则菜单

点击新增,填写表单

3)Jmeter 测试

选择《热点参数限流 QPS1》:

这里发起请求的 QPS 为 5.

包含 3 个 http 请求:

普通参数,QPS 阈值为 2

运行结果:

例外项,QPS 阈值为 4

运行结果:

例外项,QPS 阈值为 10

运行结果:

隔离和降级

限流是一种预防措施,虽然限流可以尽量避免因高并发而引起的服务故障,但服务还会因为其它原因而故障。而要将这些故障控制在一定范围,避免雪崩,就要靠线程隔离(舱壁模式)和熔断降级手段了。

线程隔离之前讲到过,调用者在调用服务提供者时,给每个调用的请求分配独立线程池,出现故障时,最多消耗这个线程池内资源,避免把调用者的所有资源耗尽。

熔断降级:是在调用方这边加入断路器,统计对服务提供者的调用,如果调用的失败比例过高,则熔断该业务,不允许访问该服务的提供者了。

可以看到,不管是线程隔离还是熔断降级,都是对客户端(调用方)的保护。需要在调用方发起远程调用时做线程隔离、或者服务熔断。而我们的微服务远程调用都是基于 Feign 来完成的,因此我们需要将 Feign 与 Sentinel 整合,在 Feign 里面实现线程隔离和服务熔断。

FeignClient整合Sentinel

SpringCloud 中,微服务调用都是通过 Feign 来实现的,因此做客户端保护必须整合 Feign 和 Sentinel。

修改配置,开启sentinel功能

修改 OrderService 的 application.yml 文件,开启 Feign 的 Sentinel 功能

feign:
  sentinel:
    enabled: true # 开启feign对sentinel的支持

编写失败降级逻辑

业务失败后,不能直接报错,而应该返回用户一个友好提示或者默认结果,这个就是失败降级逻辑。因此,我们需要给 FeignClient 编写失败后的降级逻辑

①方式一:FallbackClass,无法对远程调用的异常做处理

②方式二:FallbackFactory,可以对远程调用的异常做处理,此处选择这种

步骤一:在 feing-api 项目中定义类,实现 FallbackFactory

package cn.itcast.feign.clients.fallback;

import cn.itcast.feign.clients.UserClient;
import cn.itcast.feign.pojo.User;
import feign.hystrix.FallbackFactory;
import lombok.extern.slf4j.Slf4j;

@Slf4j
public class UserClientFallbackFactory implements FallbackFactory<UserClient> {
    @Override
    public UserClient create(Throwable throwable) {
        // UserClient 是个接口,此处定义匿名内部类,重写里面的方法,相当于编写降级逻辑了
        return new UserClient() {
            @Override
            public User findById(Long id) {
                log.error("查询用户异常", throwable);
                return new User();
            }
        };
    }
}

步骤二:在 feing-api 项目中的 DefaultFeignConfiguration 类中将 UserClientFallbackFactory 注册为一个 Bean

@Bean
public UserClientFallbackFactory userClientFallbackFactory(){
    return new UserClientFallbackFactory();
}

步骤三:在 feing-api 项目中的 UserClient 接口中使用 UserClientFallbackFactory

import cn.itcast.feign.clients.fallback.UserClientFallbackFactory;
import cn.itcast.feign.pojo.User;
import org.springframework.cloud.openfeign.FeignClient;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;

// 这样,编写的逻辑就生效了。
@FeignClient(value = "userservice", fallbackFactory = UserClientFallbackFactory.class)
public interface UserClient {

    @GetMapping("/user/{id}")
    User findById(@PathVariable("id") Long id);
}

重启后,访问一次订单查询业务,然后查看 sentinel 控制台,可以看到新的簇点链路

总结

Sentinel 支持的雪崩解决方案

Feign 整合 Sentinel 的步骤

线程隔离(舱壁模式)

线程隔离的实现方式

线程隔离有两种方式实现

线程池隔离:给每个服务调用业务分配一个线程池,利用线程池本身实现隔离效果

信号量隔离:不创建线程池,而是计数器模式,记录业务使用的线程数量,达到信号量上限时,禁止新的请求。

隔离方式 优点 缺点 场景
信号量隔离 轻量级,无额外开销 不支持主动超时(只能依赖于 Feign 的超时)
不支持异步调用
高频调用,高扇出
线程池隔离 支持主动超时
支持异步调用
线程的额外开销比较大 低扇出 (请求到 A 这里来了,然后 A 又依赖于 N 个其他的服务,这个就叫扇出)

sentinel的线程隔离

用法说明

在添加限流规则时,可以选择两种阈值类型,选择线程数即开启了舱壁模式。

案例需求

给 order-service 服务中的 UserClient 的查询用户接口设置流控规则,线程数不能超过 2。然后利用 jemeter 测试。

1)配置隔离规则

选择 feign 接口后面的流控按钮

填写表单

2)Jmeter 测试

选择《阈值类型-线程数<2》

一次发生 10 个请求,有较大概率并发线程数超过 2,而超出的请求会走之前定义的失败降级逻辑。

查看运行结果:

发现虽然结果都是通过了,不过部分请求得到的响应是降级返回的 null 信息。

总结

线程隔离的两种手段是?

信号量隔离的特点是?

线程池隔离的特点是?

熔断降级

熔断降级是解决雪崩问题的重要手段。其思路是由断路器统计服务调用的异常比例、慢请求比例,如果超出阈值则会熔断该服务。即拦截访问该服务的一切请求;而当服务恢复时,断路器会放行访问该服务的请求。

断路器控制熔断和放行是通过状态机来完成的:

状态机包括三个状态:

断路器熔断策略有三种:慢调用、异常比例、异常数

慢调用

慢调用:业务的响应时长(RT)大于指定时长的请求认定为慢调用请求。在指定时间内,如果请求数量超过设定的最小数量,慢调用比例大于设定的阈值,则触发熔断。

例如:

解读:RT 超过 500ms 的调用是慢调用,统计最近 10000ms 内的请求,如果请求量超过 10 次,并且慢调用比例不低于 0.5,则触发熔断,熔断时长为 5 秒。然后进入 half-open 状态,放行一次请求做测试。

案例

需求:给 UserClient 的查询用户接口设置降级规则,慢调用的 RT 阈值为 50ms,统计时间为 1 秒,最小请求数量为 5,失败阈值比例为 0.4,熔断时长为 5

1)设置慢调用

修改 user-service 中的 /user/{id} 这个接口的业务。通过休眠模拟一个延迟时间

@GetMapping("/{id}")
public User queryById(@PathVariable("id") Long id) throws InterruptedException{
    if(id == 1){
        // id 为 1 时模拟慢调用
        Thread.sleep(60);
    }
    return userService.queryById(id);
}

此时,orderId=101 的订单,关联的是 id 为 1 的用户,调用时长为 60ms

orderId=102 的订单,关联的是 id 为 2 的用户,调用时长为非常短;

2)设置熔断规则

下面,给 feign 接口设置降级规则

超过 50ms 的请求都会被认为是慢请求

3)测试

在浏览器访问:http://localhost:8088/order/101,快速刷新 5 次,可以发现

触发了熔断,请求时长缩短至 5ms,快速失败了,并且走降级逻辑,返回的 null

在浏览器访问:http://localhost:8088/order/102,竟然也被熔断了

异常比例、异常数

异常比例或异常数:统计指定时间内的调用,如果调用次数超过指定请求数,并且出现异常的比例达到设定的比例阈值(或超过指定异常数),则触发熔断。

例如,一个异常比例设置:

解读:统计最近 1000ms 内的请求,如果请求量超过 10 次,并且异常比例不低于 0.4,则触发熔断。

一个异常数设置:

解读:统计最近 1000ms 内的请求,如果请求量超过 10 次,并且异常比例不低于 2 次,则触发熔断。

案例

需求:给 UserClient 的查询用户接口设置降级规则,统计时间为 1 秒,最小请求数量为 5,失败阈值比例为 0.4,熔断时长为 5s

1)设置异常请求

首先,修改 user-service 中的 /user/{id} 这个接口的业务。手动抛出异常,以触发异常比例的熔断

@GetMapping("/{id}")
public User queryById(@PathVariable("id") Long id) throws InterruptedException{
    if(id == 1){
        // id 为 1 时模拟慢调用
        Thread.sleep(60);
    }else if(id == 2){
        // 这种应该不能用 try-cache 捕捉,不然框架无法感知到发生了异常是吗,查查资料?
        throw new RuntimeException("故意抛出异常,触发异常比例熔断");
    }
    return userService.queryById(id);
}

也就是说,id 为 2时,就会触发异常

2)设置熔断规则

下面,给 feign 接口设置降级规则

在 5 次请求中,只要异常比例超过 0.4,也就是有 2 次以上的异常,就会触发熔断。

3)测试

在浏览器快速访问:http://localhost:8088/order/102,快速刷新 5 次,触发熔断:

此时,我们去访问本来应该正常的 103

授权规则

授权规则可以对请求方来源做判断和控制。网关也是负责校验请求的,为什么 Sentinel 也要搞一个授权规则校验呢?Gateway 拦截所有请求,然后转发给对应的微服务,但是!如果有人泄露了服务的具体地址,Gateway 的这层校验就失效了,而 Sentinel 的授权规则可以解决这个问题。

授权规则

基本规则

授权规则可以对调用方的来源做控制,有白名单和黑名单两种方式。

点击左侧菜单的授权,可以看到授权规则

比如:

我们允许请求从 gateway 到 order-service,不允许浏览器访问 order-service,那么白名单中就要填写网关的来源名称(origin)

如何获取origin

Sentinel 是通过 RequestOriginParser 这个接口的 parseOrigin 来获取请求的来源的。

public interface RequestOriginParser {
    /**
     * 从请求request对象中获取origin,获取方式自定义
     */
    String parseOrigin(HttpServletRequest request);
}

这个方法的作用就是从 request 对象中,获取请求者的 origin 值并返回。默认情况下,sentinel 不管请求者从哪里来,返回值永远是 default,也就是说一切请求的来源都被认为是一样的值 default。因此,我们需要自定义这个接口的实现,让不同的请求,返回不同的 origin。只要能区分请求来自浏览器还是网关就能判断是不是跨过网关的请求了。

例如 order-service 服务中,我们定义一个 RequestOriginParser 的实现类

package cn.itcast.order.sentinel;

import com.alibaba.csp.sentinel.adapter.spring.webmvc.callback.RequestOriginParser;
import org.springframework.stereotype.Component;
import org.springframework.util.StringUtils;

import javax.servlet.http.HttpServletRequest;

@Component
public class HeaderOriginParser implements RequestOriginParser {
    @Override
    public String parseOrigin(HttpServletRequest request) {
        // 1.获取请求头
        String origin = request.getHeader("origin");
        // 2.非空判断
        if (StringUtils.isEmpty(origin)) {
            origin = "blank";
        }
        return origin;
    }
}

我们会尝试从 request-header 中获取 origin 值。

给网关添加请求头

既然获取请求 origin 的方式是从 reques-header 中获取 origin 值,我们必须让所有从 gateway 路由到微服务的请求都带上 origin 头。这个需要利用之前学习的一个 GatewayFilter 来实现,AddRequestHeaderGatewayFilter。

修改 gateway 服务中的 application.yml,添加一个 defaultFilter

spring:
  cloud:
    gateway:
      default-filters:
        - AddRequestHeader=origin,gateway # 添加名为 origin,值为 gateway 的请求头
      routes:
       # ...略

这样,从 gateway 路由的所有请求都会带上 origin 头,值为 gateway。而从其它地方到达微服务的请求则没有这个头。

配置授权规则

接下来,我们添加一个授权规则,放行 origin 值为 gateway 的请求。

我们编写的代码,对来自浏览器的请求,由于 Sentinel 无法获取到 origin 的值,会返回 blank,而来自网关的请求可以获取到 origin 的值,其值为 gateway,因此配置如下:

现在,我们直接跳过网关,访问 order-service 服务

通过网关访问

自定义异常结果

默认情况下,发生限流、降级、授权拦截时,都会抛出异常到调用方。异常结果都是 flow limmiting(限流)。这样不够友好,无法得知是限流还是降级还是授权拦截。

异常类型

而如果要自定义异常时的返回结果,需要实现 BlockExceptionHandler 接口

public interface BlockExceptionHandler {
    /**
     * 处理请求被限流、降级、授权拦截时抛出的异常:BlockException
     */
    void handle(HttpServletRequest request, HttpServletResponse response, BlockException e) throws Exception;
}

这个方法有三个参数

这里的 BlockException 包含多个不同的子类

异常 说明
FlowException 限流异常
ParamFlowException 热点参数限流的异常
DegradeException 降级异常
AuthorityException 授权规则异常
SystemBlockException 系统规则异常

自定义异常处理

下面,我们就在 order-service 定义一个自定义异常处理类

package cn.itcast.order.sentinel;

import com.alibaba.csp.sentinel.adapter.spring.webmvc.callback.BlockExceptionHandler;
import com.alibaba.csp.sentinel.slots.block.BlockException;
import com.alibaba.csp.sentinel.slots.block.authority.AuthorityException;
import com.alibaba.csp.sentinel.slots.block.degrade.DegradeException;
import com.alibaba.csp.sentinel.slots.block.flow.FlowException;
import com.alibaba.csp.sentinel.slots.block.flow.param.ParamFlowException;
import org.springframework.stereotype.Component;

import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;

@Component
public class SentinelExceptionHandler implements BlockExceptionHandler {
    @Override
    public void handle(HttpServletRequest request, HttpServletResponse response, BlockException e) throws Exception {
        String msg = "未知异常";
        int status = 429;

        if (e instanceof FlowException) {
            msg = "请求被限流了";
        } else if (e instanceof ParamFlowException) {
            msg = "请求被热点参数限流";
        } else if (e instanceof DegradeException) {
            msg = "请求被降级了";
        } else if (e instanceof AuthorityException) {
            msg = "没有权限访问";
            status = 401;
        }

        response.setContentType("application/json;charset=utf-8");
        response.setStatus(status);
        response.getWriter().println("{\"msg\": " + msg + ", \"status\": " + status + "}");
    }
}

重启测试,在不同场景下,会返回不同的异常消息

{
    "msg": 请求被限流了, "status": 429
}

授权拦截时

{
    "msg": 没有权限访问, "status": 401
}

总结

获取请求来源的接口是什么? ==> RequestOriginParser

处理 BlockException 的接口是什么?==> BlockExceptionHandler

规则持久化

现在,sentinel 的所有规则都是内存存储,重启后所有规则都会丢失。在生产环境下,我们必须确保这些规则的持久化,避免丢失。

规则管理模式

规则是否能持久化,取决于规则管理模式,sentinel 支持三种规则管理模式

pull模式

pull 模式:控制台将配置的规则推送到 Sentinel 客户端,而客户端会将配置规则保存在本地文件或数据库中。以后会定时去本地文件或数据库中查询,更新本地规则。

因为是定时查询的,因此时效性较差。

push模式

push 模式:控制台将配置规则推送到远程配置中心,例如 Nacos。Sentinel 客户端监听 Nacos,获取配置变更的推送消息,完成本地配置更新。

实现push模式

修改order-service服务

修改 OrderService,让其监听 Nacos 中的 sentinel 规则配置。

1)引入依赖,在 order-service 中引入 sentinel 监听 nacos 的依赖

<dependency>
    <groupId>com.alibaba.csp</groupId>
    <artifactId>sentinel-datasource-nacos</artifactId>
</dependency>

2)配置 nacos 地址,在 order-service 中的 application.yml 文件配置 nacos 地址及监听的配置信息

spring:
  cloud:
    sentinel:
      datasource:
        flow:
          nacos:
            server-addr: localhost:8848 # nacos地址
            dataId: orderservice-flow-rules
            groupId: SENTINEL_GROUP
            rule-type: flow # 还可以是:degrade、authority、param-flow

修改sentinel-dashboard源码

SentinelDashboard 默认不支持 nacos 的持久化,需要修改源码。

1)解压,解压 sentinel 源码,打开项目

2)修改 nacos 依赖,在 sentinel-dashboard 源码的 pom 文件中,nacos 的依赖默认的 scope 是 test,只能在测试时使用,这里要去除,修改为

<dependency>
    <groupId>com.alibaba.csp</groupId>
    <artifactId>sentinel-datasource-nacos</artifactId>
</dependency>

3)添加 nacos 支持,在 sentinel-dashboard 的 test 包下,已经编写了对 nacos 的支持,我们需要将其拷贝到 main 下

4)修改 nacos 地址,然后,还需要修改测试代码中的 NacosConfig 类

修改其中的 nacos 地址,让其读取 application.properties 中的配置

在 sentinel-dashboard 的 application.properties 中添加 nacos 地址配置

nacos.addr=localhost:8848

5)配置 nacos 数据源,另外,还需要修改 com.alibaba.csp.sentinel.dashboard.controller.v2 包下的 FlowControllerV2 类

让我们添加的 Nacos 数据源生效:

6)修改前端页面

接下来,还要修改前端页面,添加一个支持 nacos 的菜单。修改 src/main/webapp/resources/app/scripts/directives/sidebar/ 目录下的 sidebar.html 文件

将其中的这部分注释打开

修改其中的文本

7)重新编译、打包项目

运行 IDEA 中的 maven 插件,编译和打包修改好的 Sentinel-Dashboard

8)启动,与原先一样

java -jar sentinel-dashboard.jar

如果要修改 nacos 地址,需要添加参数

java -jar -Dnacos.addr=localhost:8848 sentinel-dashboard.jar

分布式事务

能不使用分布式事务中间件就不要使用。这些中间件的开销太大了,而且不同业务适合的模式又不一样,这又会导致一个系统中,不同的子系统采用的不同的分布式事务中间件模式。而且引入分布式事务中间件会造成系统大规模耦合的情况。是否使用分布式事务中间件是一个取舍问题,我也倾向于尽量不去使用它~。【IT老齐266】不要在项目中使用分布式事务中间件_哔哩哔哩_bilibili

微服务网关本来可以将所有的微服务接口进行屏蔽,但是使用 Seata 这种分布式事务中间件时,TM 又要接入到 Seata 中(不明白),这样基础设施就对外暴露了,有安全风险和权限设计问题。

优势

缺点

解决方案

分布式事务问题

本地事务

本地事务,也就是传统的单机事务。在传统数据库事务中,必须要满足 ACID 四个原则

原子性 (A):事务中的所有操作,要么全部成功,要么全部失败

一致性 (C):要保证数据库内部完整性约束、声明性约束

隔离性 (I):对同一资源操作的事务不能同时发生

持久性 (D):对数据库做的一切修改将永久保存,不管是否出现故障

分布式事务

分布式事务,就是指不是在单个服务或单个数据库架构下,产生的事务,例如

在数据库水平拆分、服务垂直拆分之后,一个业务操作通常要跨多个数据库、服务才能完成。例如电商行业中比较常见的下单付款案例,包括下面几个行为

微服务下单业务,在下单时会调用订单服务,创建订单并写入数据库。然后订单服务调用账户服务和库存服务

在分布式系统下,一个业务跨越多个服务或数据源,每个服务都是一个分支事务,要保证所有分支事务最终状态一致,这样的事务就是分布式事务。

订单的创建、库存的扣减、账户扣款在每一个服务和数据库内是一个本地事务,可以保证 ACID 原则。

但是当我们把三件事情看做一个”业务”,要满足保证“业务”的原子性,要么所有操作全部成功,要么全部失败,不允许出现部分成功部分失败的现象,这就是分布式系统下的事务了。

此时 ACID 难以满足,这是分布式事务要解决的问题

演示分布式事务问题

1)创建数据库,名为 seata_demo,然后导入 SQL seata-demo.sql

2)导入微服务项目 seata-demo

项目结构介绍

seata-demo:父工程,负责管理项目依赖

3)启动 nacos、所有微服务

4)测试下单功能,发出 Post 请求

curl --location --request POST 'http://localhost:8082/order?userId=user202103032042012&commodityCode=100202003032041&count=20&money=200'

测试发现,当库存不足时,如果余额已经扣减,并不会回滚,出现了分布式事务问题。

分布式事务思想

全局事务:定义全局事务的范围、开始全局事务、提交或回滚全局事务。

分支事务管理器:管理分支事务处理的资源,与 TC 交谈以注册分支事务和报告分支事务的状态,并驱动分支事务提交或回滚。

事务协调者:维护全局和分支事务的状态,协调全局事务提交或回滚。

一个全局事务内包括若干个相关联操作的子事务。子事务将自己的信息注册到事务协调者中,事务的操作结果:可正常提交、回滚都会报告给事务协调者。事务协调者如果发现所有事务都操作成功了,则提交全局事务;如果发现有事务操作失败,则回滚所有子事务。事务提交的结果应该也需要给事务协调者一个回执,如果有一方事务提交失败,其他子事务也应该做出相应操作。

理论基础

解决分布式事务问题,需要一些分布式系统的基础知识作为理论指导。

CAP定理

1998 年,加州大学的计算机科学家 Eric Brewer 提出,分布式系统有三个指标。

它们的第一个字母分别是 C、A、P。

Eric Brewer 说,这三个指标不可能同时做到。这个结论就叫做 CAP 定理。

一致性

Consistency(一致性):用户访问分布式系统中的任意节点,得到的数据必须一致。

比如现在包含两个节点,其中的初始数据是一致的

当我们修改其中一个节点的数据时,两者的数据产生了差异

要想保住一致性,就必须实现 node01 到 node02 的数据同步

可用性

Availability (可用性):用户访问集群中的任意健康节点,必须能得到响应,而不是超时或拒绝。

如图,有三个节点的集群,访问任何一个都可以及时得到响应

当有部分节点因为网络故障或其它原因无法访问时,代表节点不可用

分区容错

Partition(分区):因为网络故障或其它原因导致分布式系统中的部分节点与其它节点失去连接,形成独立分区。

Tolerance(容错):在集群出现分区时,整个系统也要持续对外提供服务。

这时候 node01 和 node03 的数据是不一致的,没有满足一致性。如果一定要满足一致性,那么可以让 node03 等待 node02 网络的恢复和数据同步。在恢复之前所有来访问我的请求都阻塞住,等 node03 数据同步完成之后再放行请求。这样 node03 明明是监控的,但是在数据恢复前却是不可用的,就不满足可用性了。

矛盾★

在分布式系统中,系统间的网络不能 100% 保证健康,一定会有故障的时候,而服务有必须对外保证服务。因此 Partition Tolerance 不可避免。

当节点接收到新的数据变更时,就会出现问题了

如果此时要保证一致性,就必须等待网络恢复,完成数据同步后,整个集群才对外提供服务,服务处于阻塞状态,不可用。

如果此时要保证可用性,就不能等待网络恢复,那 node01、node02 与 node03 之间就会出现数据不一致。

也就是说,在 P 一定会出现的情况下,A 和 C 之间只能实现一个。

总结

CAP 定理内容

思考:elasticsearch 集群是 CP 还是 AP。

elasticsearch 集群是服务不可用时就剔除这个服务,牺牲了可用性来保证一致性。即,es 集群出现分区时,故障节点会被剔出集群,数据分片会重新分配到其它节点,保证数据一致性。因此时低可用性,高一致性,属于 CP。

BASE理论

BASE 理论是对 CAP 的一种解决思路,包含三个思想

解决思路

分布式事务最大的问题是各个子事务的一致性问题,因此可以借鉴 CAP 定理和 BASE 理论,有两种解决思路:

但不管是哪一种模式,都需要在子系统事务之间互相通讯,协调事务状态,也就是需要一个事务协调者 (TC)

这里的子系统事务,称为分支事务;有关联的各个分支事务在一起称为全局事务

总结

BASE 理论的三个思想

解决分布式事务的思想和模型

初识Seata

Seata 是 2019 年 1 月份蚂蚁金服和阿里巴巴共同开源的分布式事务解决方案。致力于提供高性能和简单易用的分布式事务服务,为用户打造一站式的分布式解决方案。

官网地址:http://seata.io/,其中的文档、博客中提供了大量的使用说明、源码分析。

Seata的架构

Seata 事务管理中有三个重要的角色

整体的架构如图

TM 定义全局事务的范围,开启全局事务后,在这个全局事务内的分支事务会注册到 TC(事务协调者,负责维护全局事务和分支事务的状态)中。如果注册到 TC 的事务都执行成功了则提交事务,如果有任何一个分支事务执行失败,则进行事务回滚。(如何设置 xx 事务为某全局事务的分支事务?分支事务的注册和报告的对象是否是分布式机器,如果机器宕机又如何处理?)

Seata 基于上述架构提供了四种不同的分布式事务解决方案

无论哪种方案,都离不开 TC,也就是事务的协调者。

部署TC服务

下载

下载 seata-server 包,http://seata.io/zh-cn/blog/download.html

解压

在非中文目录解压缩这个 zip 包,其目录结构如下

修改配置

修改 conf 目录下的 registry.conf 文件

内容如下

registry {
  # tc服务的注册中心类型
  # 这里选择nacos,也可以是 eureka、zookeeper 等
  type = "nacos"

  nacos {
    # seata tc 服务注册到 nacos的服务名称,可以自定义
    application = "seata-tc-server"
    serverAddr = "127.0.0.1:8848"
    group = "DEFAULT_GROUP"
    namespace = ""
    cluster = "SH"
    username = "nacos"
    password = "nacos"
  }
}

config {
  # 读取tc服务端的配置文件的方式,这里是从nacos配置中心读取
  # 如果tc是集群,可以共享配置
  # 将 seata 的配置信息放到 nacos 注册中心
  type = "nacos"
  # 配置nacos地址等信息
  nacos {
    serverAddr = "127.0.0.1:8848"
    namespace = ""
    # 配置管理的组
    group = "SEATA_GROUP"
    username = "nacos"
    password = "nacos"
    # nacos 注册中心的配置文件名
    dataId = "seataServer.properties"
  }
}

在nacos添加配置

特别注意,为了让 tc 服务的集群可以共享配置,我们选择了 nacos 作为统一配置中心。因此服务端配置文件 seataServer.properties 文件需要在 nacos 中配好。

格式如下:

配置内容如下:

# 数据存储方式,db代表数据库,分支事务注册的信息保存在哪里
# 此处选择的是保存到 db,数据库中,也可以选择 redis
store.mode=db
store.db.datasource=druid
store.db.dbType=mysql
store.db.driverClassName=com.mysql.jdbc.Driver
store.db.url=jdbc:mysql://127.0.0.1:3306/seata?useUnicode=true&rewriteBatchedStatements=true
store.db.user=root
store.db.password=123
store.db.minConn=5
store.db.maxConn=30
store.db.globalTable=global_table
store.db.branchTable=branch_table
store.db.queryLimit=100
store.db.lockTable=lock_table
store.db.maxWait=5000

# 事务、日志等配置
server.recovery.committingRetryPeriod=1000
server.recovery.asynCommittingRetryPeriod=1000
server.recovery.rollbackingRetryPeriod=1000
server.recovery.timeoutRetryPeriod=1000
server.maxCommitRetryTimeout=-1
server.maxRollbackRetryTimeout=-1
server.rollbackRetryTimeoutUnlockEnable=false
server.undo.logSaveDays=7
server.undo.logDeletePeriod=86400000

# 客户端与服务端传输方式
transport.serialization=seata
transport.compressor=none
# 关闭metrics功能,提高性能
metrics.enabled=false
metrics.registryType=compact
metrics.exporterList=prometheus
metrics.exporterPrometheusPort=9898

其中的数据库地址、用户名、密码都需要修改成你自己的数据库信息。

创建数据库表

特别注意:tc 服务在管理分布式事务时,需要记录事务相关数据到数据库中,你需要提前创建好这些表。

新建一个名为 seata 的数据库,运行 sql 文件 seata-tc-server.sql

这些表主要记录全局事务、分支事务、全局锁信息

SET NAMES utf8mb4;
SET FOREIGN_KEY_CHECKS = 0;

-- ----------------------------
-- 分支事务表
-- ----------------------------
DROP TABLE IF EXISTS `branch_table`;
CREATE TABLE `branch_table`  (
  `branch_id` bigint(20) NOT NULL,
  `xid` varchar(128) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL,
  `transaction_id` bigint(20) NULL DEFAULT NULL,
  `resource_group_id` varchar(32) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL,
  `resource_id` varchar(256) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL,
  `branch_type` varchar(8) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL,
  `status` tinyint(4) NULL DEFAULT NULL,
  `client_id` varchar(64) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL,
  `application_data` varchar(2000) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL,
  `gmt_create` datetime(6) NULL DEFAULT NULL,
  `gmt_modified` datetime(6) NULL DEFAULT NULL,
  PRIMARY KEY (`branch_id`) USING BTREE,
  INDEX `idx_xid`(`xid`) USING BTREE
) ENGINE = InnoDB CHARACTER SET = utf8 COLLATE = utf8_general_ci ROW_FORMAT = Compact;

-- ----------------------------
-- 全局事务表
-- ----------------------------
DROP TABLE IF EXISTS `global_table`;
CREATE TABLE `global_table`  (
  `xid` varchar(128) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL,
  `transaction_id` bigint(20) NULL DEFAULT NULL,
  `status` tinyint(4) NOT NULL,
  `application_id` varchar(32) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL,
  `transaction_service_group` varchar(32) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL,
  `transaction_name` varchar(128) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL,
  `timeout` int(11) NULL DEFAULT NULL,
  `begin_time` bigint(20) NULL DEFAULT NULL,
  `application_data` varchar(2000) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL,
  `gmt_create` datetime NULL DEFAULT NULL,
  `gmt_modified` datetime NULL DEFAULT NULL,
  PRIMARY KEY (`xid`) USING BTREE,
  INDEX `idx_gmt_modified_status`(`gmt_modified`, `status`) USING BTREE,
  INDEX `idx_transaction_id`(`transaction_id`) USING BTREE
) ENGINE = InnoDB CHARACTER SET = utf8 COLLATE = utf8_general_ci ROW_FORMAT = Compact;

SET FOREIGN_KEY_CHECKS = 1;

启动TC服务

进入 bin 目录,运行其中的 seata-server.bat 即可

启动成功后,seata-server 应该已经注册到nacos注册中心了。

打开浏览器,访问 nacos 地址:http://localhost:8848,然后进入服务列表页面,可以看到 seata-tc-server 的信息

微服务集成Seata

以 order-service 为例来演示。

引入依赖

首先,在 order-service 中引入依赖

<!--seata-->
<dependency>
    <groupId>com.alibaba.cloud</groupId>
    <artifactId>spring-cloud-starter-alibaba-seata</artifactId>
    <exclusions>
        <!--版本较低,1.3.0,因此排除--> 
        <exclusion>
            <artifactId>seata-spring-boot-starter</artifactId>
            <groupId>io.seata</groupId>
        </exclusion>
    </exclusions>
</dependency>
<dependency>
    <groupId>io.seata</groupId>
    <artifactId>seata-spring-boot-starter</artifactId>
    <!--seata starter 采用1.4.2版本-->
    <version>${seata.version}</version>
</dependency>

配置TC地址

在 order-service 中的 application.yml 中,配置 TC 服务信息,通过注册中心 nacos,结合服务名称获取 TC 地址

seata:
  registry: # TC服务注册中心的配置,微服务根据这些信息去注册中心获取tc服务地址
    type: nacos # 注册中心类型 nacos
    nacos:
      server-addr: 127.0.0.1:8848 # nacos地址
      namespace: "" # namespace,默认为空
      group: DEFAULT_GROUP # 分组,默认是DEFAULT_GROUP
      application: seata-tc-server # seata服务名称
      username: nacos
      password: nacos
  tx-service-group: seata-demo # 事务组名称
  service:
    vgroup-mapping: # 事务组与cluster的映射关系
      seata-demo: SH

微服务如何根据这些配置寻找 TC 的地址呢?

我们知道注册到 Nacos 中的微服务,确定一个具体实例需要四个信息:

以上四个信息,在刚才的 yaml 文件中都能找到

namespace 为空,就是默认的 public。

TC 地址的查找规则如下:先找属于那个事务组,再根据事务组查找对应的集群映射关系。

结合起来,TC 服务的信息就是:public@DEFAULT_GROUP@seata-tc-server@SH,这样就能确定 TC 服务集群了。然后就可以去 Nacos 拉取对应的实例信息了。

注意:seata 并未做好与 spring cloud nacos 的集成关系,因此 seata 中还是需要配置一下 nacos 注册中心的地址

其它服务

其它两个微服务也都参考 order-service 的步骤来做,完全一样。

总结

nacos 服务名称组成包括

seata 客户端获取 tc 的 cluster 名称方式

动手实践

学习 Seata 中的四种不同的事务模式。

XA模式

XA 规范是 X/Open 组织定义的分布式事务处理(DTP,Distributed Transaction Processing)标准,XA 规范描述了全局的 TM 与局部的 RM 之间的接口,几乎所有主流的数据库都对 XA 规范提供了支持。(XA 标准中 RM 是由数据库来实现的)

两阶段提交

XA 是规范,目前主流数据库都实现了这种规范,实现的原理都是基于两阶段提交。

正常情况

异常情况

一阶段

二阶段

Seata的XA模型

Seata 对原始的 XA 模式做了简单的封装和改造,以适应自己的事务模型,基本架构如图(不看 TM 部分和前面说的 XA 模式基本一样,只不过是多了 TM 做整个事务的注册和管理):

RM 一阶段的工作

​ ① 注册分支事务到 TC

​ ② 执行分支业务 sql 但不提交

​ ③ 报告执行状态到 TC

TC 二阶段的工作

RM 二阶段的工作

优缺点

XA 模式的优点是什么?

XA 模式的缺点是什么?

实现XA模式

Seata 的 starter 已经完成了 XA 模式的自动装配,实现非常简单,步骤如下:

1)修改 application.yml 文件(每个参与事务的微服务),开启 XA 模式:

seata:
  data-source-proxy-mode: XA

2)给发起全局事务的入口方法添加 @GlobalTransactional 注解,这样全局事务就交由 TM 管理了。

本例中是 OrderServiceImpl 中的 create 方法

3)重启服务并测试

重启 order-service,再次测试,发现无论怎样,三个微服务都能成功回滚。

AT模式

AT 模式同样是分阶段提交的事务模型,不过缺弥补了 XA 模型中资源锁定周期过长的缺陷。执行完业务 SQL 后会直接提交事务,不用等待其他分支 SQL 的执行结果。

Seata的AT模型

阶段一 RM 的工作

阶段二提交时 RM 的工作

阶段二回滚时 RM 的工作

流程梳理

我们用一个真实的业务来梳理下 AT 模式的原理。

比如,现在又一个数据库表,记录用户余额

id money
1 100

其中一个分支业务要执行的 SQL 为

update tb_account set money = money - 10 where id = 1

AT 模式下,当前分支事务执行流程如下

一阶段:

1)TM 发起并注册全局事务到 TC

2)TM 调用分支事务

3)分支事务准备执行业务 SQL

4)RM 拦截业务 SQL,根据 where 条件查询原始数据,形成快照。

{
    "id": 1, "money": 100
}

5)RM 执行业务 SQL,提交本地事务,释放数据库锁。此时 money = 90

6)RM 报告本地事务状态给 TC

二阶段:

1)TM 通知 TC 事务结束

2)TC 检查分支事务状态

​ a)如果都成功,则立即删除快照

​ b)如果有分支事务失败,需要回滚。读取快照数据({"id": 1, "money": 100}),将快照恢复到数据库。此时数据库再次恢复为 100

AT与XA的区别

简述 AT 模式与 XA 模式最大的区别是什么?

脏写问题

因为“提前”释放了锁,因此在多线程并发访问 AT 模式的分布式事务时,有可能出现脏写问题,如图

解决思路就是引入了全局锁的概念。在释放 DB 锁之前,先拿到全局锁,只有持有全局锁的事务才可以拿到记录的执行权。避免同一时刻有另外一个事务来操作当前数据。

全局锁是由 TC 记录的,内部记录下谁在访问,而 XA 的锁是执行完业务不提交,是数据库的锁。数据库的锁,不释放,任何人都无法操作这条数据(无法修改,删除,不加锁的 select 是可以执行的),TC 的锁只是记录操纵这行表的全局事务,由 seata 管理的,如果这个事务不是由 seata 管理的,去操作是不会有影响的。

如果一个由 seata 管理的全局事务操作了 money 字段,一个不由 seata 管理的全局事务也操作了 money 字段,这样就锁不住数据了,会出现问题。

优缺点

AT 模式的优点

AT 模式的缺点

实现AT模式

AT 模式中的快照生成、回滚等动作都是由框架自动完成,没有任何代码侵入,因此实现非常简单。

只不过,AT 模式需要一个表来记录全局锁、另一张表来记录数据快照 undo_log。

1)导入数据库表,记录全局锁

导入提供的 sql 文件:seata-at.sql,其中 lock_table 导入到 TC 服务关联的数据库,undo_log 表导入到微服务关联的数据库

2)修改 application.yml 文件,将事务模式修改为 AT 模式即可

seata:
  data-source-proxy-mode: AT # 默认就是AT

3)重启服务并测试

TCC模式

TCC 模式与 AT 模式非常相似,每阶段都是独立事务,不同的是 TCC 通过人工编码来实现数据恢复。需要实现三个方法

扣减库存和扣减余额适合用 TCC,新增数据则不适合。

流程分析

举例,一个扣减用户余额的业务。假设账户 A 原来余额是 100,需要余额扣减 30 元。

阶段一(Try):检查余额是否充足,如果充足则冻结金额增加 30 元,可用余额扣除 30

初始余额

余额充足,可以冻结

此时,总金额 = 冻结金额 + 可用金额,数量依然是 100 不变。事务直接提交无需等待其它事务。

阶段二(Confirm):假如要提交(Confirm),则冻结金额扣减30

确认可以提交,不过之前可用金额已经扣减过了,这里只要清除冻结金额就好了

此时,总金额 = 冻结金额 + 可用金额 = 0 + 70 = 70元

阶段二(Canncel):如果要回滚(Cancel),则冻结金额扣减 30,可用余额增加 30

需要回滚,那么就要释放冻结金额,恢复可用金额:

Seata的TCC模型

Seata 中的 TCC 模型依然延续之前的事务架构

优缺点

TCC 模式的每个阶段是做什么的?

TCC 的优点是什么?

TCC 的缺点是什么?

案例

事务悬挂和空回滚

1)空回滚

当某分支事务的 try 阶段阻塞时,可能导致全局事务超时而触发二阶段的 cancel 操作。在未执行 try 操作时先执行了 cancel 操作,这时 cancel 不能做回滚,就是空回滚

执行 cancel 操作时,应当判断 try 是否已经执行,如果尚未执行,则应该空回滚。

2)业务悬挂

对于已经空回滚的业务,之前被阻塞的 try 操作恢复,继续执行 try,就永远不可能 confirm 或 cancel ,事务一直处于中间状态,这就是业务悬挂

执行 try 操作时,应当判断 cancel 是否已经执行过了,如果已经执行,应当阻止空回滚后的 try 操作,避免悬挂。

实现TCC模式

解决空回滚和业务悬挂问题,必须要记录当前事务状态,是在 try、还是 cancel?

1)思路分析

这里我们定义一张表:

CREATE TABLE `account_freeze_tbl` (
  `xid` varchar(128) NOT NULL,
  `user_id` varchar(255) DEFAULT NULL COMMENT '用户id',
  `freeze_money` int(11) unsigned DEFAULT '0' COMMENT '冻结金额',
  `state` int(1) DEFAULT NULL COMMENT '事务状态,0:try,1:confirm,2:cancel',
  PRIMARY KEY (`xid`) USING BTREE
) ENGINE=InnoDB DEFAULT CHARSET=utf8 ROW_FORMAT=COMPACT;

那此时,我们的业务该怎么做呢?

业务 做法
Try 业务 记录冻结金额和事务状态到 account_freeze 表
扣减 account 表可用金额
Confirm 业务 根据 xid 删除 account_freeze 表的冻结记录
Cancel 业务 修改 account_freeze 表,冻结金额为 0,state 为 2
修改 account 表,恢复可用金额
如何判断是否空回滚? cancel 业务中,根据 xid 查询 account_freeze,如果为 null 则说明 try 还没做,需要空回滚
如何避免业务悬挂? try 业务中,根据 xid 查询 account_freeze ,如果已经存在则证明 Cancel 已经执行,拒绝执行 try 业务

接下来,改造 account-service,利用 TCC 实现余额扣减功能。

2)声明TCC接口

TCC 的 Try、Confirm、Cancel 方法都需要在接口中基于注解来声明,

我们在 account-service 项目中的 cn.itcast.account.service 包中新建一个接口,声明 TCC 三个接口:

package cn.itcast.account.service;

import io.seata.rm.tcc.api.BusinessActionContext;
import io.seata.rm.tcc.api.BusinessActionContextParameter;
import io.seata.rm.tcc.api.LocalTCC;
import io.seata.rm.tcc.api.TwoPhaseBusinessAction;

@LocalTCC
public interface AccountTCCService {
	
    // 声明 try 方法,deduct
    @TwoPhaseBusinessAction(name = "deduct", commitMethod = "confirm", rollbackMethod = "cancel")
    void deduct(@BusinessActionContextParameter(paramName = "userId") String userId,
                @BusinessActionContextParameter(paramName = "money")int money);

    boolean confirm(BusinessActionContext ctx);

    boolean cancel(BusinessActionContext ctx);
}
3)编写实现类

在 account-service 服务中的 cn.itcast.account.service.impl 包下新建一个类,实现 TCC 业务

package cn.itcast.account.service.impl;

import cn.itcast.account.entity.AccountFreeze;
import cn.itcast.account.mapper.AccountFreezeMapper;
import cn.itcast.account.mapper.AccountMapper;
import cn.itcast.account.service.AccountTCCService;
import io.seata.core.context.RootContext;
import io.seata.rm.tcc.api.BusinessActionContext;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;

@Service
@Slf4j
public class AccountTCCServiceImpl implements AccountTCCService{

    @Autowired
    private AccountMapper accountMapper;
    @Autowired
    private AccountFreezeMapper freezeMapper;

    @Override
    @Transactional
    // 做资源检测和预留的。余额用的 unsigned int, 不可能为负,变成负数会报错,自动回滚
    public void deduct(String userId, int money) {
        // 0.获取事务id
        String xid = RootContext.getXID();       
        
        // 判断 freeze 中是否有冻结记录,如果有,一定是 CANCEL 执行过,要拒绝业务
        AccountFreeze oldFreeze = freezeMapper.selectById(xid);
        if(oldFreeze != null){
            // CANCCEL 执行过,要拒绝业务
            return;
        }
        
        // 1.扣减可用余额
        accountMapper.deduct(userId, money);
        // 2.记录冻结金额,事务状态
        AccountFreeze freeze = new AccountFreeze();
        freeze.setUserId(userId);
        freeze.setFreezeMoney(money);
        freeze.setState(AccountFreeze.State.TRY);
        freeze.setXid(xid);
        freezeMapper.insert(freeze);
    }

    @Override
    public boolean confirm(BusinessActionContext ctx) {
        // 1.获取事务id
        String xid = ctx.getXid();
        // 2.根据id删除冻结记录
        int count = freezeMapper.deleteById(xid);
        return count == 1;
    }

    @Override
    public boolean cancel(BusinessActionContext ctx) {
        // 0.查询冻结记录
        String xid = ctx.getXid();
        AccountFreeze freeze = freezeMapper.selectById(xid);
		
        // 空回滚判断,判断 freeze 是否为 null,为 null 证明 try 没执行,
        if(freeze == null){
            freeze = new AccountFreeze();
            freeze.setUserId(userId);
            freeze.setFreezeMoney(0);
            freeze.setState(AccountFreeze.State.CANCEL);
            freeze.setXid(xid);
            freezeMapper.insert(freeze);
            return true;
        }
        
        // 2.幂等判断 只要 CANCEL 执行了状态一定是 CANCEL,可以通过状态值来判断
        if(freeze.getState() == AccountFreeze.State.CANCEL){
            // 处理过了 CANCEL,无需处理
            return true;
        }
        
        // 1.恢复可用余额
        accountMapper.refund(freeze.getUserId(), freeze.getFreezeMoney());
        // 2.将冻结金额清零,状态改为CANCEL
        freeze.setFreezeMoney(0);
        freeze.setState(AccountFreeze.State.CANCEL);
        // 
        int count = freezeMapper.updateById(freeze);
        return count == 1;
    }
}

SAGA模式

Saga 模式是 Seata 即将开源的长事务解决方案,将由蚂蚁金服主要贡献。

其理论基础是 Hector & Kenneth 在 1987 年发表的论文 Sagas

Seata 官网对于 Saga 的指南:https://seata.io/zh-cn/docs/user/saga.html

原理

在 Saga 模式下,分布式事务内有多个参与者,每一个参与者都是一个冲正补偿服务,需要用户根据业务场景实现其正向操作和逆向回滚操作。

分布式事务执行过程中,依次执行各参与者的正向操作,如果所有正向操作均执行成功,那么分布式事务提交。如果任何一个正向操作执行失败,那么分布式事务会去退回去执行前面各参与者的逆向回滚操作,回滚已提交的参与者,使分布式事务回到初始状态。

Saga 也分为两个阶段

优缺点

优点

缺点

四种模式对比

我们从以下几个方面来对比四种实现

- XA AT TCC SAGA
一致性 强一致 弱一致 弱一致 最终一致
隔离性 完全隔离 基于全局锁隔离 基于资源预留隔离 无隔离
代码入侵 有,要编写三个接口 有,要编写状态机和补偿业务
性能 非常好 非常好
场景 对一致性、隔离性有高要求的业务 基于关系型数据库的大多数分布式事务场景都可以 对性能要求较高的事务
有非关系型数据库要参与的事务
业务流程长、业务流程多
参与者包含其他公司或遗留系统服务,无法提供 TCC 模式要求的三个接口

高可用

Seata 的 TC 服务作为分布式事务核心,一定要保证集群的高可用性。

高可用架构模型

搭建 TC 服务集群非常简单,启动多个 TC 服务,注册到 nacos 即可。

但集群并不能确保 100% 安全,万一集群所在机房故障怎么办?所以如果要求较高,一般都会做异地多机房容灾。

比如一个 TC 集群在上海,另一个 TC 集群在杭州

微服务基于事务组(tx-service-group) 与 TC 集群的映射关系,来查找当前应该使用哪个 TC 集群。当 SH 集群故障时,只需要将 vgroup-mapping 中的映射关系改成 HZ。则所有微服务就会切换到 HZ 的 TC 集群了。

实现高可用

模拟异地容灾的TC集群

计划启动两台 seata 的 tc 服务节点

节点名称 ip地址 端口号 集群名称
seata 127.0.0.1 8091 SH
seata2 127.0.0.1 8092 HZ

之前我们已经启动了一台 seata 服务,端口是 8091,集群名为 SH。

现在,将 seata 目录复制一份,起名为 seata2

修改 seata2/conf/registry.conf 内容如下

registry {
  # tc服务的注册中心类,这里选择nacos,也可以是eureka、zookeeper等
  type = "nacos"

  nacos {
    # seata tc 服务注册到 nacos的服务名称,可以自定义
    application = "seata-tc-server"
    serverAddr = "127.0.0.1:8848"
    group = "DEFAULT_GROUP"
    namespace = ""
    cluster = "HZ"
    username = "nacos"
    password = "nacos"
  }
}

config {
  # 读取tc服务端的配置文件的方式,这里是从nacos配置中心读取,这样如果tc是集群,可以共享配置
  type = "nacos"
  # 配置nacos地址等信息
  nacos {
    serverAddr = "127.0.0.1:8848"
    namespace = ""
    group = "SEATA_GROUP"
    username = "nacos"
    password = "nacos"
    dataId = "seataServer.properties"
  }
}

进入 seata2/bin 目录,然后运行命令

seata-server.bat -p 8092

打开 nacos 控制台,查看服务列表

点进详情查看

将事务组映射配置到nacos

接下来,我们需要将 tx-service-group 与 cluster 的映射关系都配置到 nacos 配置中心。

新建一个配置

配置的内容如下

# 事务组映射关系
service.vgroupMapping.seata-demo=SH

service.enableDegrade=false
service.disableGlobalTransaction=false
# 与TC服务的通信配置
transport.type=TCP
transport.server=NIO
transport.heartbeat=true
transport.enableClientBatchSendRequest=false
transport.threadFactory.bossThreadPrefix=NettyBoss
transport.threadFactory.workerThreadPrefix=NettyServerNIOWorker
transport.threadFactory.serverExecutorThreadPrefix=NettyServerBizHandler
transport.threadFactory.shareBossWorker=false
transport.threadFactory.clientSelectorThreadPrefix=NettyClientSelector
transport.threadFactory.clientSelectorThreadSize=1
transport.threadFactory.clientWorkerThreadPrefix=NettyClientWorkerThread
transport.threadFactory.bossThreadSize=1
transport.threadFactory.workerThreadSize=default
transport.shutdown.wait=3
# RM配置
client.rm.asyncCommitBufferLimit=10000
client.rm.lock.retryInterval=10
client.rm.lock.retryTimes=30
client.rm.lock.retryPolicyBranchRollbackOnConflict=true
client.rm.reportRetryCount=5
client.rm.tableMetaCheckEnable=false
client.rm.tableMetaCheckerInterval=60000
client.rm.sqlParserType=druid
client.rm.reportSuccessEnable=false
client.rm.sagaBranchRegisterEnable=false
# TM配置
client.tm.commitRetryCount=5
client.tm.rollbackRetryCount=5
client.tm.defaultGlobalTransactionTimeout=60000
client.tm.degradeCheck=false
client.tm.degradeCheckAllowTimes=10
client.tm.degradeCheckPeriod=2000

# undo日志配置
client.undo.dataValidation=true
client.undo.logSerialization=jackson
client.undo.onlyCareUpdateColumns=true
client.undo.logTable=undo_log
client.undo.compress.enable=true
client.undo.compress.type=zip
client.undo.compress.threshold=64k
client.log.exceptionRate=100

微服务读取nacos配置

接下来,需要修改每一个微服务的 application.yml 文件,让微服务读取 nacos 中的 client.properties 文件

seata:
  config:
    type: nacos
    nacos:
      server-addr: 127.0.0.1:8848
      username: nacos
      password: nacos
      group: SEATA_GROUP
      data-id: client.properties

重启微服务,现在微服务到底是连接 tc 的 SH 集群,还是 tc 的 HZ 集群,都统一由 nacos 的 client.properties 来决定了。

分布式缓存

单点 Redis 的问题:

水平扩展和垂直扩展

Redis的持久化

RDB持久化

RDB 全称 Redis Database Backup file(Redis 数据备份文件),也被叫做 Redis 数据快照。简单来说就是把内存中的所有数据都记录到磁盘中。当 Redis 实例故障重启后,从磁盘读取快照文件,恢复数据。快照文件称为 RDB 文件,默认是保存在当前运行目录。

执行时机

RDB 持久化在四种情况下会执行:

1)save 命令

执行下面的命令,可以立即执行一次 RDB

[root@localhost ~]# redis-cli
127.0.0.1:6379> save # 由 Redis 主进程来执行 RDB, 会阻塞所有命令
ok
127.0.0.1:6379>

save 命令会导致主进程执行 RDB,这个过程中其它所有命令都会被阻塞。适合用在 Redis 即将停止时,比如在数据迁移时可能用到。

2)bgsave 命令

下面的命令可以异步执行 RDB

[root@localhost ~]# redis-cli
127.0.0.1:6379> bgsave # 开启子进程来执行 RDB, 避免主进程受到影响
Background saving started

这个命令执行后会开启独立进程完成 RDB,主进程可以持续处理用户请求,不受影响。

3)停机时

Redis 停机时会执行一次 save 命令,实现 RDB 持久化。

4)触发 RDB 条件

Redis 内部有触发 RDB 的机制,可以在 redis.conf 文件中找到,格式如下:

# 900秒内,如果至少有1个key被修改,则执行bgsave , 如果是save "" 则表示禁用RDB
save 900 1  
save 300 10  
save 60 10000 

RDB 的其它配置也可以在 redis.conf 文件中设置:

# 是否压缩 ,建议不开启,压缩也会消耗 cpu,磁盘的话不值钱
rdbcompression yes

# RDB文件名称
dbfilename dump.rdb  

# 文件保存的路径目录
dir ./ 

RDB 的频率不要太高,频率太高会一直处于写入数据的状态,影响性能,一般用默认的就好。

RDB原理

bgsave 开始时会 fork 主进程得到子进程,子进程共享主进程的内存数据。完成 fork 后读取内存数据并写入 RDB 文件。注意:fork 这个操作过程要拷贝页表是阻塞的。

fork 采用的是 copy-on-write 技术:

Linux 中,所有的进程都没办法直接操作物理内存而是由操作系统给每个进程分配一个虚拟内存,主进程操作虚拟内存,操作系统维护一个虚拟内存与物理内存直接的映射关系(页表)。fork 主进程实际上是 fork 页表(页表中保存了物理内存与虚拟内存的映射关系)的过程,让子进程和主进程拥有一样的映射关系。这样就实现了子进程和主进程一样的内存共享。这样就无需拷贝内存中的数据,直接实现数据共享。

但这样会有一个问题,就是一个读一个写,会有并发问题。如果子进程在拷贝数据的时候,主进程还在写怎么办?fork 底层会采用 copy-on-write 的技术。源数据只读,如果需要修改就复制一份数据,在复制的数据中进行修改,如果每次修改要复制的数据很大,那么这样的开销是很大的。JVM 垃圾回收算法的标记如果标记在对象上,修改对象的 GC 标记的频繁复制对象对内存的压力特别大,具体可看《垃圾回收的算法与实现》与写时复制技术不兼容。JVM 笔记#标记清除(后面好像是等持久化结束后,在写入源数据。MySQL 也有一个类似的操作,查下 MySQL 的笔记)

小结

RDB 方式 bgsave 的基本流程?

RDB 会在什么时候执行?save 60 1000 代表什么含义?

RDB 的缺点?

AOF持久化

AOF 全称为 Append Only File(追加文件)。Redis 处理的每一个写命令都会记录在 AOF 文件,可以看做是命令日志文件。

AOF配置

AOF 默认是关闭的,需要修改 redis.conf 配置文件来开启 AOF:

# 是否开启AOF功能,默认是no
appendonly yes
# AOF文件的名称
appendfilename "appendonly.aof"

AOF 的命令记录的频率也可以通过 redis.conf 文件来配:

# 表示每执行一次写命令,立即记录到AOF文件,Redis 主进程完成磁盘写入操作。
appendfsync always 
# 写命令执行完先放入AOF缓冲区,然后表示每隔1秒将缓冲区数据写到AOF文件,是默认方案,子进程完成磁盘写入操作
appendfsync everysec 
# 写命令执行完先放入AOF缓冲区,由操作系统决定何时将缓冲区内容写回磁盘
appendfsync no

用配置文件的方式启动 Redis

sudo ./redis-server ../redis.conf

向 Redis 中写入数据可以发现,每次执行的命令都被写入到了 aof 文件中

cat appendonly.aof
*2
$6
SELECT
$1
0
*3
$3
set
$4
name
$3
ljw
*3
$3
set
$4
name
$3
kkx

三种策略对比

配置项 刷盘时机 优点 缺点
Always 同步刷盘 可靠性高,几乎不丢数据 性能影响大
everysec 每秒刷盘 性能适中 最多丢失 1 秒数据
no 操作系统控制 性能最好 可靠性较差,可能丢失大量数据

AOF文件重写

因为是记录命令,AOF 文件会比 RDB 文件大的多。而且 AOF 会记录对同一个 key 的多次写操作,但只有最后一次写操作才有意义。通过执行 bgrewriteaof 命令,可以让 AOF 文件执行重写功能,用最少的命令达到相同效果。

如图,AOF 原本有三个命令,但是 set num 123 和 set num 666 都是对 num 的操作,第二次会覆盖第一次的值,因此第一个命令记录下来没有意义。

所以重写命令后,AOF 文件内容就是:mset name jack num 666。实际测试,AOF 文件重写后内容被压缩了,命令也被简化了~

Redis 也会在触发阈值时自动去重写 AOF 文件。阈值也可以在 redis.conf 中配置:

# AOF文件比上次文件增长超过 100%(翻了一倍)则触发重写
auto-aof-rewrite-percentage 100
# AOF文件体积超过 64mb 就触发重写 
auto-aof-rewrite-min-size 64mb 

混合RDB和AOF

Redis 4.0 中提出了一个混合使用 AOF 日志和内存快照的方法。内存快照以一定的频率执行,在两次快照之间,使用 AOF 日志记录这期间的所有命令操作。这样,不用频繁执行快照,避免了频繁 fork 对主线程的影响。且,AOF 日志也只用记录两次快照间的操作,无需记录所有操作了,不会出现文件过大的情况,也可以避免重写开销。如下图所示,T1 和 T2 时刻的修改,用 AOF 日志记录,等到第二次做全量快照时,就可以清空 AOF 日志,因为此时的修改都已经记录到快照中了,恢复时就不再用日志了。

RDB与AOF对比

RDB 和 AOF 各有自己的优缺点,如果对数据安全性要求较高,在实际开发中往往会结合两者来使用。

- RDB AOF
持久化方式 定时对整个内存做快照 记录每一次执行的命令
数据完整性 不完整,两次备份之间会丢失 相对完整,取决于刷盘策略
文件大小 会有压缩,文件体积小 记录命令,文件体积大
宕机恢复速度 很快
数据恢复优先级 低,因为数据完整性不如 AOF 高,数据完整性高
系统资源占用 高,大量的 CPU 和内存消耗 低,主要是磁盘 IO 资源,但是 AOF 重写时会占用大量 CPU 和内存资源
使用场景 可以容忍数分钟的数据丢失,追求更快的启动速度 对数据安全性要求较高场景

Redis主从

搭建主从架构

单节点 Redis 的并发能力是有上限的,要进一步提高 Redis 的并发能力,就需要搭建主从集群,实现读写分离。

多个从结点承担读的请求,Redis 读取数据的能力可以得到极大的提升。

将 redis.conf 文件复制多份,分别启动多个 Redis。此时这样启动的还只是一个一个孤立的 Redis。

开启主从关系

要配置主从关系可以使用 replicaof 或 slaveof(5.0 以前的命令)。

有临时和永久两种模式

此处采用临时模式配置主从关系。

1️⃣用不同配置文件开启多个 redis 服务;

# 在 6370 端口开启一个 redis 实例
sudo ./bin/redis-server redis.conf
# 连接 6739 端口的 redis 实例
./bin/redis-cli -p 6379

# 在 7000 端口开启一个 redis 实例
sudo ./bin/redis-server 7000.conf
# 连接 7000 端口的 redis 实例
./bin/redis-cli -p 7000

2️⃣配置主从节点(只要指定从节点即可)

# 在 7000 redis-cli 中执行命令,将 7000 端口的 redis 指定为
# 6379 端口的从节点。
127.0.0.1:7000> SLAVEOF  172.26.26.72 6379
OK

3️⃣查看从节点的 key,然后在主节点中写入一些数据,看是否会同步到从节点中

# 主节点和从节点此时数据都为空
127.0.0.1:6379> keys *
(empty array)

127.0.0.1:7000> keys *
(empty array)

# 主节点中写数据
127.0.0.1:6379> set master somevalue
OK

# 从节点同步到了主节点的数据
127.0.0.1:7000> keys *
1) "master"

4️⃣查看集群状态信息 info replication

127.0.0.1:7000> info replication
# Replication
role:slave
master_host:127.0.0.1
master_port:6379
master_link_status:up
master_last_io_seconds_ago:3
master_sync_in_progress:0
slave_repl_offset:423
slave_priority:100
slave_read_only:1
connected_slaves:0
master_failover_state:no-failover
master_replid:b6811140f497c5bbe6f0ddd27c46b39a091fb7ac
master_replid2:0000000000000000000000000000000000000000
master_repl_offset:423
second_repl_offset:-1
repl_backlog_active:1
repl_backlog_size:1048576
repl_backlog_first_byte_offset:1
repl_backlog_histlen:423

主从同步原理

全量同步

主从第一次建立连接时,会执行全量同步,将 master 节点的所有数据都拷贝给 slave 节点,流程:

这里有一个问题,master 如何得知 slave 是第一次来连接呢??

有几个概念,可以作为判断依据:

因此 slave 做数据同步,必须向 master 声明自己的 replication id 和 offset,master 才可以判断到底需要同步哪些数据。

因为 slave 原本也是一个 master,有自己的 replid 和 offset,当第一次变成 slave,与 master 建立连接时,发送的 replid 和 offset 是自己的 replid 和 offset。

master 判断发现 slave 发送来的 replid 与自己的不一致,说明这是一个全新的 slave,就知道要做全量同步了。

master 会将自己的 replid 和 offset 都发送给这个 slave,slave 保存这些信息。以后 slave 的 replid 就与 master 一致了。

因此,master 判断一个节点是否是第一次同步的依据,就是看 replid 是否一致

完整流程描述

增量同步

全量同步需要先做 RDB,然后将 RDB 文件通过网络传输个 slave,成本太高了。因此除了第一次做全量同步,其它大多数时候 slave 与 master 都是做增量同步

增量同步就是只更新 slave 与 master 存在差异的部分数据。如图:

那么 master 怎么知道 slave 与自己的数据差异在哪里呢?简单来说是根据 master 和 slave 的 offset 的差值来判断的,如果 master 和 slave 的 offset 不一样,则说明主从需要进行同步。如果 master 的 offset 覆盖了未同步的数据,就得进行全增量同步了。具体原理请看 “repl_backlog 原理”

repl_backlog原理

master 怎么知道 slave 与自己的数据差异在哪里呢?这就要靠全量同步时的 repl_baklog 文件了。

这个文件是一个固定大小的数组,只不过数组是环形,也就是说角标到达数组末尾后,会再次从 0 开始读写,这样数组头部的数据就会被覆盖。

repl_baklog 中会记录 Redis 处理过的命令日志及 offset,包括 master 当前的 offset,和 slave 已经拷贝到的 offset(红色部分是尚未同步的内容):

slave 与 master 的 offset 之间的差异,就是 slave 需要增量拷贝的数据了。随着不断有数据写入,master 的 offset 逐渐变大, slave 也不断的拷贝,追赶 master 的 offset

直到数组被填满

此时,如果有新的数据写入,就会覆盖数组中的旧数据。不过,旧的数据只要是绿色的,说明是已经被同步到 slave 的数据,即便被覆盖了也没什么影响。因为未同步的仅仅是红色部分。

但是,如果 slave 出现网络阻塞,导致 master 的 offset 远远超过了 slave 的 offset:

如果 master 继续写入新数据,其 offset 就会覆盖旧的数据,直到将 slave 现在的 offset 也覆盖:

棕色框中的红色部分,就是尚未同步,但是却已经被覆盖的数据。此时如果 slave 恢复,需要同步,却发现自己的 offset 都没有了,无法完成增量同步了。只能做全量同步。

repl_baklog 大小有上限,写满后会覆盖最早的数据。如果 slave 断开时间过久,导致尚未备份的数据被覆盖,则无法基于 log 做增量同步,只能再次全量同步。

主从同步优化

主从同步可以保证主从数据的一致性,非常重要。可以从以下几个方面来优化 Redis 主从集群(如尽可能的避免全量同步,少做磁盘 IO)

上面两个都是在提高全量同步的性能,下面两点是从减少全量同步出发的

小结

简述全量同步和增量同步区别?

什么时候执行全量同步?

什么时候执行增量同步?

实际使用是全量同步+增量同步一起使用。

Redis哨兵

slave 节点宕机恢复后可以找 master 节点同步数据,那 master 节点宕机该如何处理?

Redis 提供了哨兵(Sentinel)机制来实现主从集群的自动故障恢复。而哨兵是用于监控整个集群做故障恢复的。

哨兵原理

集群的结构和作用

哨兵的作用如下

服务状态监控

Sentinel 基于心跳机制监测服务状态,每隔 1 秒向集群的每个实例发送 ping 命令:

故障恢复原理

一旦发现 master 故障,sentinel 需要在 slave 中选择一个作为新的 master,选择依据是这样的:

当选出一个新的 master 后,该如何实现切换呢?流程如下:

小结

Sentinel 的三个作用是什么?

Sentinel 如何判断一个 redis 实例是否健康?

故障转移步骤有哪些?

搭建哨兵集群

集群结构

先按之前的方式搭建好主从集群,然后继续搭建一个三节点形成的 Sentinel 集群,来监管之前的 Redis 主从集群。

三个 sentinel 实例信息如下

节点 IP PORT
s1 192.168.150.101 27001
s2 192.168.150.101 27002
s3 192.168.150.101 27003

准备实例和配置

要在同一台虚拟机开启 3 个实例,必须准备三份不同的配置文件和目录,配置文件所在目录也就是工作目录。

我们创建三个文件夹,名字分别叫 s1、s2、s3:

# 进入/tmp目录
cd /tmp
# 创建目录
mkdir s1 s2 s3

然后我们在 s1 目录创建一个 sentinel.conf 文件,添加下面的内容:

port 27001
sentinel announce-ip 172.26.26.72
sentinel monitor mymaster 172.26.26.72 27001 2
sentinel down-after-milliseconds mymaster 5000
sentinel failover-timeout mymaster 60000
dir "/tmp/s1"

解读:

然后将 s1/sentinel.conf 文件拷贝到 s2、s3 两个目录中(在 /tmp 目录执行下列命令):

# 方式一:逐个拷贝
cp s1/sentinel.conf s2
cp s1/sentinel.conf s3
# 方式二:管道组合命令,一键拷贝
echo s2 s3 | xargs -t -n 1 cp s1/sentinel.conf

修改 s2、s3 两个文件夹内的配置文件,将端口分别修改为 27002、27003:

sed -i -e 's/27001/27002/g' -e 's/s1/s2/g' s2/sentinel.conf
sed -i -e 's/27001/27003/g' -e 's/s1/s3/g' s3/sentinel.conf

启动

为了方便查看日志,我们打开 3 个 ssh 窗口,分别启动 3 个 redis 实例,启动命令:

# 第1个
redis-sentinel s1/sentinel.conf
# 第2个
redis-sentinel s2/sentinel.conf
# 第3个
redis-sentinel s3/sentinel.conf

测试

如果哨兵选举的时候一直失败,可能是没开发所有主从服务器和哨兵节点的端口 systemctl stop firewalld.service,如果是 ubuntu 系统则执行 sudo ufw disable 关闭防火墙。

尝试让 master 节点 7001 宕机,查看 sentinel 日志:

查看 7003 的日志:

查看 7002 的日志:

RedisTemplate

在 Sentinel 集群监管下的 Redis 主从集群,其节点会因为自动故障转移而发生变化,Redis 的客户端必须感知这种变化,及时更新连接信息。Spring 的 RedisTemplate 底层利用 lettuce 实现了节点的感知和自动切换。

导入Demo工程

redis-demo 这个文件夹

引入依赖

在项目的 pom 文件中引入依赖:

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>

配置Redis地址

在配置文件 application.yml 中指定 redis 的 sentinel 相关信息:

spring:
  redis:
    sentinel:
      master: mymaster
      nodes:
        - 192.168.150.101:27001
        - 192.168.150.101:27002
        - 192.168.150.101:27003

配置读写分离

在项目的启动类中,添加一个新的 bean:

@Bean
public LettuceClientConfigurationBuilderCustomizer clientConfigurationBuilderCustomizer(){
    return clientConfigurationBuilder -> clientConfigurationBuilder.readFrom(ReadFrom.REPLICA_PREFERRED);
}

这个 bean 中配置的就是读写策略,包括四种:

Redis分片集群

搭建分片集群

主从和哨兵可以解决高可用、高并发读的问题。但是依然有两个问题没有解决:

使用分片集群可以解决上述问题,如图:

分片集群特征:

集群结构

片集群需要的节点数量较多,这里我们搭建一个最小的分片集群,包含 3 个 master 节点,每个 master 包含一个 slave 节点,结构如下

这里我们会在同一台虚拟机中开启 6 个 redis 实例,模拟分片集群,信息如下

IP PORT 角色
192.168.150.101 7001 master
192.168.150.101 7002 master
192.168.150.101 7003 master
192.168.150.101 8001 slave
192.168.150.101 8002 slave
192.168.150.101 8003 slave

准备实例和配置

删除之前的 7001、7002、7003 这几个目录,重新创建出 7001、7002、7003、8001、8002、8003 目录:

# 进入/tmp目录
cd /tmp
# 删除旧的,避免配置干扰
rm -rf 7001 7002 7003
# 创建目录
mkdir 7001 7002 7003 8001 8002 8003

在 /tmp 下准备一个新的 redis.conf 文件,内容如下:

port 6379
# 开启集群功能
cluster-enabled yes
# 集群的配置文件名称,不需要我们创建,由redis自己维护
cluster-config-file /tmp/6379/nodes.conf
# 节点心跳失败的超时时间
cluster-node-timeout 5000
# 持久化文件存放目录
dir /tmp/6379
# 绑定地址
bind 0.0.0.0
# 让redis后台运行
daemonize yes
# 注册的实例ip
replica-announce-ip 192.168.150.101
# 保护模式
protected-mode no
# 数据库数量
databases 1
# 日志
logfile /tmp/6379/run.log

将这个文件拷贝到每个目录下:

# 进入/tmp目录
cd /tmp
# 执行拷贝
echo 7001 7002 7003 8001 8002 8003 | xargs -t -n 1 cp redis.conf

修改每个目录下的 redis.conf,将其中的 6379 修改为与所在目录一致:

# 进入/tmp目录
cd /tmp
# 修改配置文件
printf '%s\n' 7001 7002 7003 8001 8002 8003 | xargs -I{} -t sed -i 's/6379/{}/g' {}/redis.conf

启动

因为已经配置了后台启动模式,所以可以直接启动服务

# 进入/tmp目录
cd /tmp
# 一键启动所有服务
printf '%s\n' 7001 7002 7003 8001 8002 8003 | xargs -I{} -t redis-server {}/redis.conf

通过 ps 查看状态

ps -ef | grep redis

发现服务都已经正常启动

如果要关闭所有进程,可以执行命令

ps -ef | grep redis | awk '{print $2}' | xargs kill

或者(推荐这种方式)

printf '%s\n' 7001 7002 7003 8001 8002 8003 | xargs -I{} -t redis-cli -p {} shutdown

创建集群

虽然服务启动了,但是目前每个服务之间都是独立的,没有任何关联。

我们需要执行命令来创建集群,在 Redis5.0 之前创建集群比较麻烦,5.0 之后集群管理命令都集成到了 redis-cli 中。

1)Redis5.0 之前

Redis5.0 之前集群命令都是用 redis 安装包下的 src/redis-trib.rb 来实现的。因为 redis-trib.rb 是有 ruby 语言编写的所以需要安装 ruby 环境。

# 安装依赖
yum -y install zlib ruby rubygems
gem install redis

然后通过命令来管理集群

# 进入 redis 的 src 目录
cd /tmp/redis-6.2.4/src
# 创建集群
./redis-trib.rb create --replicas 1 192.168.150.101:7001 192.168.150.101:7002 192.168.150.101:7003 192.168.150.101:8001 192.168.150.101:8002 192.168.150.101:8003

2)Redis5.0 以后

我们使用的是 Redis6.2.4 版本,集群管理以及集成到了 redis-cli 中,格式如下:

redis-cli --cluster create --cluster-replicas 1 192.168.150.101:7001 192.168.150.101:7002 192.168.150.101:7003 192.168.150.101:8001 192.168.150.101:8002 192.168.150.101:8003

命令说明:

运行后的样子

这里输入 yes,则集群开始创建

通过命令可以查看集群状态

redis-cli -p 7001 cluster nodes

测试

尝试连接 7001 节点,存储一个数据

# 连接
redis-cli -p 7001
# 存储数据
set num 123
# 读取数据
get num
# 再次存储
set a 1

结果悲剧了

集群操作时,需要给 redis-cli 加上 -c 参数才可以

redis-cli -c -p 7001

这次可以了

散列插槽

插槽原理

Redis 会把每一个 master 节点映射到 0~16383 共 16384 个插槽(hash slot)上,查看集群信息时就能看到

数据 key 不是与节点绑定,而是与插槽绑定。redis 会根据 key 的有效部分计算插槽值。简单说就是根据 key 的哈希映射判断,这个 key 存储在哪里。

key 的有效部分分两种情况

例如:key 是 num,那么就根据 num 计算,如果是 {itcast} num,则根据 itcast 计算。计算方式是利用 CRC16 算法得到一个 hash 值,然后对 16384 取余,得到的结果就是 slot 值。

如图,在 7001 这个节点执行 set a 1 时,对 a 做 hash 运算,对 16384 取余,得到的结果是 15495,因此要存储到 103 节点。

到了 7003 后,执行 get num 时,对 num 做 hash 运算,对 16384 取余,得到的结果是 2765 插槽的范围不在 7003 内,而是在 7001 的插槽范围内,因此需要切换到 7001 节点。

为什么 Redis 的 key 要和插槽绑定而不是直接和 Redis 实例绑定呢?因为 Redis 的实例可能会宕机,key 直接和实例绑定的话,宕机了 key 就没有对应的实例了。如果和插槽绑定的话,插槽对应的实例是可以进行更替(更方便)的,数据跟着插槽走,永远都可以找到插槽的位置。

小结

Redis 如何判断某个 key 应该在哪个实例?

如何将同一类数据固定的保存在同一个 Redis 实例?

集群伸缩

redis-cli –cluster 提供了很多操作集群的命令,可以通过下面方式查看:

添加节点的命令

需求分析

需求:向集群中添加一个新的 master 节点,并向其中存储 num = 10

这里需要两个新的功能:

创建 Redis 实例

创建一个文件夹:

mkdir 7004

拷贝配置文件:

cp redis.conf /7004

修改配置文件:

sed /s/6379/7004/g 7004/redis.conf

启动

redis-server 7004/redis.conf

添加新节点到 redis

执行命令:

redis-cli --cluster add-node  192.168.150.101:7004 192.168.150.101:7001

通过命令查看集群状态:

redis-cli -p 7001 cluster nodes

如图,7004 加入了集群,并且默认是一个 master 节点:

但是,可以看到 7004 节点的插槽数量为 0,因此没有任何数据可以存储到 7004 上

转移插槽

我们要将 num 存储到 7004 节点,因此需要先看看 num 的插槽是多少:

如上图所示,num 的插槽为 2765.

我们可以将 0~3000 的插槽从 7001 转移到 7004,命令格式如下:

具体命令如下:

建立连接:

得到下面的反馈:

询问要移动多少个插槽,我们计划是 3000 个:

新的问题来了:

那个 node 来接收这些插槽??

显然是 7004,那么 7004 节点的 id 是多少呢?

复制这个 id,然后拷贝到刚才的控制台后:

这里询问,你的插槽是从哪里移动过来的?

这里我们要从 7001 获取,因此填写 7001 的 id:

填完后,点击 done,这样插槽转移就准备好了:

确认要转移吗?输入 yes:

然后,通过命令查看结果:

可以看到:

目的达成。

故障转移

集群初识状态是这样的:

其中 7001、7002、7003 都是 master,我们计划让 7002 宕机。

自动故障转移

当集群中有一个master宕机会发生什么呢?比如直接停止一个 redis 实例,例如 7002:

redis-cli -p 7002 shutdown

1)首先是该实例与其它实例失去连接

2)然后是疑似宕机:

3)最后是确定下线,自动提升一个 slave 为新的 master:

4)当 7002 再次启动,就会变为一个 slave 节点了:

手动故障转移

利用 cluster failover 命令可以手动让集群中的某个 master 宕机,切换到执行 cluster failover 命令的这个 slave 节点,实现无感知的数据迁移。其流程如下:

这种 failover 命令可以指定三种模式:

案例需求:在 7002 这个 slave 节点执行手动故障转移,重新夺回 master 地位。

步骤如下:

1)利用 redis-cli 连接 7002 这个节点

2)执行 cluster failover 命令

如图:

效果:

RedisTemplate访问分片集群

RedisTemplate 底层同样基于 lettuce 实现了分片集群的支持,而使用的步骤与哨兵模式基本一致:

1)引入 redis 的 starter 依赖

2)配置分片集群地址

3)配置读写分离

与哨兵模式相比,其中只有分片集群的配置方式略有差异,如下:

spring:
  redis:
    cluster:
      nodes:
        - 192.168.150.101:7001
        - 192.168.150.101:7002
        - 192.168.150.101:7003
        - 192.168.150.101:8001
        - 192.168.150.101:8002
        - 192.168.150.101:8003

多级缓存

什么是多级缓存

传统的缓存策略一般是请求到达 Tomcat 后,先查询 Redis,如果未命中则查询数据库,如图

存在下面的问题

多级缓存就是充分利用请求处理的每个环节,分别添加缓存,减轻 Tomcat 压力,提升服务性能

在多级缓存架构中,Nginx 内部需要编写本地缓存查询、Redis 查询、Tomcat 查询的业务逻辑,因此这样的 nginx 服务不再是一个反向代理服务器,而是一个编写业务的 Web 服务器了。

因此这样的业务 Nginx 服务也需要搭建集群来提高并发,再有专门的 nginx 服务来做反向代理,如图

另外,我们的 Tomcat 服务将来也会部署为集群模式

可见,多级缓存的关键有两个

其中 Nginx 编程则会用到 OpenResty 框架结合 Lua 这样的语言。这也是多级缓存的难点和重点。

JVM进程缓存

为了演示多级缓存的案例,我们先准备一个商品查询的业务。

导入项目

参考资料:《案例导入说明.md》

初识Caffeine

缓存在日常开发中启动至关重要的作用,由于是存储在内存中,数据的读取速度是非常快的,能大量减少对数据库的访问,减少数据库的压力。我们把缓存分为两类

此处利用 Caffeine 框架来实现 JVM 进程缓存。

Caffeine 是一个基于 Java8 开发的,提供了近乎最佳命中率的高性能的本地缓存库。目前 Spring 内部的缓存使用的就是 Caffeine。GitHub 地址:https://github.com/ben-manes/caffeine

Caffeine 的性能非常好,下图是官方给出的性能对比

可以看到 Caffeine 的性能遥遥领先!

缓存使用的基本 API

@Test
void testBasicOps() {
    // 构建cache对象
    Cache<String, String> cache = Caffeine.newBuilder().build();

    // 存数据
    cache.put("gf", "迪丽热巴");

    // 取数据
    String gf = cache.getIfPresent("gf");
    System.out.println("gf = " + gf);

    // 取数据,包含两个参数:
    // 参数一:缓存的key
    // 参数二:Lambda表达式,表达式参数就是缓存的key,方法体是查询数据库的逻辑
    // 优先根据key查询JVM缓存,如果未命中,则执行参数二的Lambda表达式
    String defaultGF = cache.get("defaultGF", key -> {
        // 根据key去数据库查询数据
        return "柳岩";
    });
    System.out.println("defaultGF = " + defaultGF);
}

Caffeine 既然是缓存的一种,肯定需要有缓存的清除策略,不然的话内存总会有耗尽的时候。

Caffeine 提供了三种缓存驱逐策略:

注意:在默认情况下,当一个缓存元素过期的时候,Caffeine 不会自动立即将其清理和驱逐。而是在一次读或写操作后,或者在空闲时间完成对失效数据的驱逐。

实现JVM进程缓存

需求

利用 Caffeine 实现下列需求

实现

首先,需要定义两个 Caffeine 的缓存对象,分别保存商品、库存的缓存数据。

在 item-service 的 com.heima.item.config 包下定义 CaffeineConfig

package com.heima.item.config;

import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Caffeine;
import com.heima.item.pojo.Item;
import com.heima.item.pojo.ItemStock;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class CaffeineConfig {

    @Bean
    public Cache<Long, Item> itemCache(){
        return Caffeine.newBuilder()
                .initialCapacity(100)
                .maximumSize(10_000)
                .build();
    }

    @Bean
    public Cache<Long, ItemStock> stockCache(){
        return Caffeine.newBuilder()
                .initialCapacity(100)
                .maximumSize(10_000)
                .build();
    }
}

然后,修改 item-service 中的 com.heima.item.web 包下的 ItemController 类,添加缓存逻辑

@RestController
@RequestMapping("item")
public class ItemController {

    @Autowired
    private IItemService itemService;
    @Autowired
    private IItemStockService stockService;

    @Autowired
    private Cache<Long, Item> itemCache;
    @Autowired
    private Cache<Long, ItemStock> stockCache;
    
    // ...其它略
    
    @GetMapping("/{id}")
    public Item findById(@PathVariable("id") Long id) {
        return itemCache.get(id, key -> itemService.query()
                .ne("status", 3).eq("id", key)
                .one()
        );
    }

    @GetMapping("/stock/{id}")
    public ItemStock findStockById(@PathVariable("id") Long id) {
        return stockCache.get(id, key -> stockService.getById(key));
    }
}

Lua语法入门

Nginx 编程需要用到 Lua 语言,因此我们必须先入门 Lua 的基本语法。

初识Lua

Lua 是一种轻量小巧的脚本语言,用标准 C 语言编写并以源代码形式开放, 其设计目的是为了嵌入应用程序中,从而为应用程序提供灵活的扩展和定制功能。官网:https://www.lua.org/

Lua 经常嵌入到 C 语言开发的程序中,例如游戏开发、游戏插件等。

Nginx 本身也是 C 语言开发,因此也允许基于 Lua 做拓展。

HelloWorld

CentOS7 默认已经安装了 Lua 语言环境,所以可以直接运行 Lua 代码,Ubuntu 则需要自行安装 sudo apt install lua5.1

1)在 Linux 虚拟机的任意目录下,新建一个 hello.lua 文件

touch hello.lua

2)添加下面的内容

print("hello lua!")  

3)运行

lua hello.lua
hello lua!

变量和循环

学习任何语言必然离不开变量,而变量的声明必须先知道数据的类型。

Lua的数据类型

Lua 中支持的常见数据类型包括

数据类型 描述
nil 无效值,只有值 nil 属于该类,在条件表达式中相当于 false
boolean 布尔值,包含 true 和 false
number 表示双精度类型的实浮点数
string 字符串由双引号或单引号表示
function 由 C 或 lua 编写的函数
table Lua 中的表 (table) 其实是一一个”关联数组” (associative arrays),数组的索引可以是数字、字符串或表类型。
在 Lua 里,table 的创建是通过”构造表达式”来完成,最简单构造表达式是 {},用来创建一一个空表。

另外,Lua 提供了 type() 函数来判断一个变量的数据类型

> print(type("hello"))
string
> print(type(10.4*3))
number

声明变量

Lua 声明变量的时候无需指定数据类型,而是用 local 来声明变量为局部变量

-- 声明字符串,可以用单引号或双引号,
local str = 'hello'
-- 字符串拼接可以使用 ..
local str2 = 'hello' .. 'world'
-- 声明数字
local num = 21
-- 声明布尔类型
local flag = true

Lua 中的 table 类型既可以作为数组,又可以作为 Java 中的 map 来使用。数组就是特殊的 table,key 是数组角标而已

-- 声明数组 ,key为角标的 table
local arr = {'java', 'python', 'lua'}
-- 声明table,类似java的map
local map =  {name='Jack', age=21}

Lua 中的数组角标是从 1 开始,访问的时候与 Java 中类似

-- 访问数组,lua数组的角标从1开始
print(arr[1])

Lua 中的 table 可以用 key 来访问

-- 访问table
print(map['name'])
print(map.name)

循环

对于 table,我们可以利用 for 循环来遍历。不过数组和普通 table 遍历略有差异。

遍历数组

-- 声明数组 key为索引的 table
local arr = {'java', 'python', 'lua'}
-- 遍历数组
for index,value in ipairs(arr) do
    print(index, value) 
end

遍历普通 table

-- 声明map,也就是table
local map = {name='Jack', age=21}
-- 遍历table
for key,value in pairs(map) do
   print(key, value) 
end

在 Lua 中 ipairs 函数用于遍历集合,与函数 pairs() 是同胞兄弟。

条件控制、函数

Lua 中的条件控制和函数声明与 Java 类似。

函数

定义函数的语法

function 函数名( argument1, argument2..., argumentn)
    -- 函数体
    return 返回值
end

例如,定义一个函数,用来打印数组

function printArr(arr)
    for index, value in ipairs(arr) do
        print(value)
    end
end

条件控制

类似 Java 的条件控制,例如 if、else 语法

if(布尔表达式)
then
   --[ 布尔表达式为 true 时执行该语句块 --]
else
   --[ 布尔表达式为 false 时执行该语句块 --]
end

与 Java 不同,布尔表达式中的逻辑运算是基于英文单词

操作符 描述 实例
and 逻辑与操作符。若 A 为 false,则返回 A,否则返回 B。 (A and B)为 false
or 逻辑与操作符。若 A 为 true,则返回 A,否则返回 B。 (A or B)为 true
not 逻辑非操作符。与逻辑运算结果相反,如果条件为 true,逻辑非为 false。 not(A and B)为 true

案例

需求:自定义一个函数,可以打印 table,当参数为 nil 时,打印错误信息

function printArr(arr)
    if not arr then
        print('数组不能为空!')
    end
    for index, value in ipairs(arr) do
        print(value)
    end
end

实现多级缓存

多级缓存的实现离不开 Nginx 编程,而 Nginx 编程又离不开 OpenResty。

安装OpenResty

OpenResty® 是一个基于 Nginx 的高性能 Web 平台,用于方便地搭建能够处理超高并发、扩展性极高的动态 Web 应用、Web 服务和动态网关。具备下列特点:

官方网站:https://openresty.org/cn/

安装 Lua 可以参考《安装OpenResty.md》

OpenResty快速入门

我们希望达到的多级缓存架构如图

其中

反向代理流程

现在,商品详情页使用的是假的商品数据。不过在浏览器中,可以看到页面有发起 ajax 请求查询真实商品数据。

这个请求如下:

请求地址是 localhost,端口是 80,就被 windows 上安装的 Nginx 服务给接收到了。然后代理给了 OpenResty 集群:

我们需要在 OpenResty 中编写业务,查询商品数据并返回到浏览器。但是这次,我们先在 OpenResty 接收请求,返回假的商品数据。

OpenResty监听请求

OpenResty 的很多功能都依赖于其目录下的 Lua 库,需要在 nginx.conf 中指定依赖库的目录,并导入依赖:

1)添加对 OpenResty 的 Lua 模块的加载

修改 /usr/local/openresty/nginx/conf/nginx.conf 文件,在其中的 http 下面,添加下面代码

#lua 模块
lua_package_path "/usr/local/openresty/lualib/?.lua;;";
#c模块     
lua_package_cpath "/usr/local/openresty/lualib/?.so;;";  

2)监听 /api/item 路径

修改 /usr/local/openresty/nginx/conf/nginx.conf 文件,在 nginx.conf 的 server 下面,添加对 /api/item 这个路径的监听

location  /api/item {
    # 默认的响应类型
    default_type application/json;
    # 响应结果由lua/item.lua文件来决定
    content_by_lua_file lua/item.lua;
}

这个监听,就类似于 SpringMVC 中的 @GetMapping("/api/item") 做路径映射。

content_by_lua_file lua/item.lua 则相当于调用 item.lua 这个文件,执行其中的业务,把结果返回给用户。相当于 java 中调用 service。

编写item.lua

1)在 /usr/loca/openresty/nginx 目录创建文件夹:lua

2)在 /usr/loca/openresty/nginx/lua 文件夹下,新建文件:item.lua

3)编写 item.lua,返回假数据

item.lua 中,利用 ngx.say() 函数返回数据到 Response 中

ngx.say('{"id":10001,"name":"SALSA AIR","title":"RIMOWA 21寸托运箱拉杆箱 SALSA AIR系列果绿色 820.70.36.4","price":17900,"image":"https://m.360buyimg.com/mobilecms/s720x720_jfs/t6934/364/1195375010/84676/e9f2c55f/597ece38N0ddcbc77.jpg!q70.jpg.webp","category":"拉杆箱","brand":"RIMOWA","spec":"","status":1,"createTime":"2019-04-30T16:00:00.000+00:00","updateTime":"2019-04-30T16:00:00.000+00:00","stock":2999,"sold":31290}')

4)重新加载配置

nginx -s reload

刷新商品页面:http://localhost/item.html?id=1001,即可看到效果

请求参数处理

前面在 OpenResty 接收前端请求,返回的是假数据。要返回真实数据,必须根据前端传递来的商品 id,查询商品信息才可以。那么如何获取前端传递的商品参数呢?

获取参数的API

OpenResty 中提供了一些 API 用来获取不同类型的前端请求参数:

获取参数并返回

在前端发起的 ajax 请求如图

可以看到商品 id 是以路径占位符方式传递的,因此可以利用正则表达式匹配的方式来获取ID

1)获取商品 id

修改 /usr/loca/openresty/nginx/nginx.conf 文件中监听 /api/item 的代码,利用正则表达式获取 ID

location ~ /api/item/(\d+) {
    # 默认的响应类型
    default_type application/json;
    # 响应结果由lua/item.lua文件来决定
    content_by_lua_file lua/item.lua;
}

2)拼接 ID 并返回

修改 /usr/loca/openresty/nginx/lua/item.lua 文件,获取 id 并拼接到结果中返回

-- 获取商品id
local id = ngx.var[1]
-- 拼接并返回
ngx.say('{"id":' .. id .. ',"name":"SALSA AIR","title":"RIMOWA 21寸托运箱拉杆箱 SALSA AIR系列果绿色 820.70.36.4","price":17900,"image":"https://m.360buyimg.com/mobilecms/s720x720_jfs/t6934/364/1195375010/84676/e9f2c55f/597ece38N0ddcbc77.jpg!q70.jpg.webp","category":"拉杆箱","brand":"RIMOWA","spec":"","status":1,"createTime":"2019-04-30T16:00:00.000+00:00","updateTime":"2019-04-30T16:00:00.000+00:00","stock":2999,"sold":31290}')

3)重新加载并测试

运行命令以重新加载 OpenResty 配置

nginx -s reload

刷新页面可以看到结果中已经带上了 ID

查询Tomcat

拿到商品 ID 后,本应去缓存中查询商品信息,不过目前我们还未建立 nginx、redis 缓存。因此,这里我们先根据商品 id 去 tomcat 查询商品信息。

需要注意的是,我们的 OpenResty 是在虚拟机,Tomcat 是在 Windows 电脑上。两者 IP 一定不要搞错了。

发送http请求的API

nginx 提供了内部 API 用以发送 http 请求

local resp = ngx.location.capture("/path",{
    method = ngx.HTTP_GET,   -- 请求方式
    args = {a=1,b=2},  -- get方式传参数
})

返回的响应内容包括

注意:这里的 path 是路径,并不包含 IP 和端口。这个请求会被 nginx 内部的 server 监听并处理。

但是我们希望这个请求发送到 Tomcat 服务器,所以还需要编写一个 server 来对这个路径做反向代理

 location /path {
     # 这里是 windows 电脑的 ip 和 Java 服务端口,需要确保 windows 防火墙处于关闭状态
     proxy_pass http://192.168.150.1:8081; 
 }

原理如图

封装http工具

封装一个发送 Http 请求的工具,基于 ngx.location.capture 来实现查询 tomcat。

1)添加反向代理到 windows 的 Java 服务

因为 item-service 中的接口都是 /item 开头,所以我们监听 /item 路径,代理到 windows 上的 tomcat 服务。

修改 /usr/local/openresty/nginx/conf/nginx.conf 文件,添加一个 location

location /item {
    proxy_pass http://192.168.150.1:8081;
}

以后,只要我们调用 ngx.location.capture("/item"),就一定能发送请求到 windows 的 tomcat 服务。

2)封装工具类

之前我们说过,OpenResty 启动时会加载以下两个目录中的工具文件

所以,自定义的 http 工具也需要放到这个目录下。

/usr/local/openresty/lualib 目录下,新建一个 common.lua 文件

vi /usr/local/openresty/lualib/common.lua

内容如下

-- 封装函数,发送http请求,并解析响应
local function read_http(path, params)
    local resp = ngx.location.capture(path,{
        method = ngx.HTTP_GET,
        args = params,
    })
    if not resp then
        -- 记录错误信息,返回404
        ngx.log(ngx.ERR, "http请求查询失败, path: ", path , ", args: ", args)
        ngx.exit(404)
    end
    return resp.body
end
-- 将方法导出
local _M = {  
    read_http = read_http
}  
return _M

这个工具将 read_http 函数封装到 _M 这个 table 类型的变量中,并且返回,这类似于导出。

使用的时候,可以利用 require('common') 来导入该函数库,这里的 common 是函数库的文件名。

3)实现商品查询

最后,我们修改 /usr/local/openresty/lua/item.lua 文件,利用刚刚封装的函数库实现对 tomcat 的查询

-- 引入自定义common工具模块,返回值是common中返回的 _M
local common = require("common")
-- 从 common中获取read_http这个函数
local read_http = common.read_http
-- 获取路径参数
local id = ngx.var[1]
-- 根据id查询商品
local itemJSON = read_http("/item/".. id, nil)
-- 根据id查询商品库存
local itemStockJSON = read_http("/item/stock/".. id, nil)

这里查询到的结果是 json 字符串,并且包含商品、库存两个 json 字符串,页面最终需要的是把两个 json 拼接为一个 json

这就需要我们先把 JSON 变为 lua 的 table,完成数据整合后,再转为 JSON。

CJSON工具类

OpenResty 提供了一个 cjson 的模块用来处理 JSON 的序列化和反序列化。

官方地址:https://github.com/openresty/lua-cjson/

1)引入 cjson 模块

local cjson = require "cjson"

2)序列化

local obj = {
    name = 'jack',
    age = 21
}
-- 把 table 序列化为 json
local json = cjson.encode(obj)

3)反序列化

local json = '{"name": "jack", "age": 21}'
-- 反序列化 json为 table
local obj = cjson.decode(json);
print(obj.name)

实现Tomcat查询

修改之前的 item.lua 中的业务,添加 json 处理功能

-- 导入common函数库
local common = require('common')
local read_http = common.read_http
-- 导入cjson库
local cjson = require('cjson')

-- 获取路径参数
local id = ngx.var[1]
-- 根据id查询商品
local itemJSON = read_http("/item/".. id, nil)
-- 根据id查询商品库存
local itemStockJSON = read_http("/item/stock/".. id, nil)

-- JSON转化为lua的table
local item = cjson.decode(itemJSON)
local stock = cjson.decode(stockJSON)

-- 组合数据
item.stock = stock.stock
item.sold = stock.sold

-- 把item序列化为json 返回结果
ngx.say(cjson.encode(item))

基于ID负载均衡

刚才的代码中,我们的 tomcat 是单机部署。而实际开发中,tomcat 一定是集群模式

因此,OpenResty 需要对 tomcat 集群做负载均衡。

而默认的负载均衡规则是轮询模式,当我们查询 /item/10001 时

因为轮询的原因,第一次查询 8081 形成的 JVM 缓存并未生效,直到下一次再次访问到 8081 时才可以生效,缓存命中率太低了。

怎么办?

如果能让同一个商品,每次查询时都访问同一个 tomcat 服务,那么 JVM 缓存就一定能生效了。也就是说,我们需要根据商品 id 做负载均衡,而不是轮询。

1)原理

nginx 提供了基于请求路径做负载均衡的算法。nginx 可以根据请求路径做 hash 运算,把得到的数值对 tomcat 服务的数量取余,余数是几,就访问第几个服务,实现负载均衡。

例如:

只要 id 不变,每次 hash 运算结果也不会变,那就可以保证同一个商品,一直访问同一个 tomcat 服务,确保 JVM 缓存生效。

2)实现

修改 /usr/local/openresty/nginx/conf/nginx.conf 文件,实现基于 ID 做负载均衡。

首先,定义 tomcat 集群,并设置基于路径做负载均衡

upstream tomcat-cluster {
    hash $request_uri;
    server 192.168.150.1:8081;
    server 192.168.150.1:8082;
}

然后,修改对 tomcat 服务的反向代理,目标指向 tomcat 集群

location /item {
    proxy_pass http://tomcat-cluster;
}

重新加载 OpenResty

nginx -s reload

3)测试

启动两台 tomcat 服务

同时启动

清空日志后,再次访问页面,可以看到不同 id 的商品,访问到了不同的 tomcat 服务

Redis缓存预热

Redis 缓存会面临冷启动问题:

冷启动:服务刚刚启动时,Redis 中并没有缓存,如果所有商品数据都在第一次查询时添加缓存,可能会给数据库带来较大压力。

缓存预热:在实际开发中,我们可以利用大数据统计用户访问的热点数据,在项目启动时将这些热点数据提前查询并保存到 Redis 中。

我们数据量较少,并且没有数据统计相关功能,目前可以在启动时将所有数据都放入缓存中。

1)利用 Docker 安装 Redis

docker run --name redis -p 6379:6379 -d redis redis-server --appendonly yes

2)在 item-service 服务中引入 Redis 依赖

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>

3)配置 Redis 地址

spring:
  redis:
    host: 192.168.150.101

4)编写初始化类

缓存预热需要在项目启动时完成,并且必须是拿到 RedisTemplate 之后。

这里利用 InitializingBean 接口来实现,因为 InitializingBean 可以在对象被 Spring 创建并且成员变量全部注入后执行。

package com.heima.item.config;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.heima.item.pojo.Item;
import com.heima.item.pojo.ItemStock;
import com.heima.item.service.IItemService;
import com.heima.item.service.IItemStockService;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Component;

import java.util.List;

@Component
public class RedisHandler implements InitializingBean {

    @Autowired
    private StringRedisTemplate redisTemplate;

    @Autowired
    private IItemService itemService;
    @Autowired
    private IItemStockService stockService;

    private static final ObjectMapper MAPPER = new ObjectMapper();

    @Override
    public void afterPropertiesSet() throws Exception {
        // 初始化缓存
        // 1.查询商品信息
        List<Item> itemList = itemService.list();
        // 2.放入缓存
        for (Item item : itemList) {
            // 2.1.item序列化为JSON
            String json = MAPPER.writeValueAsString(item);
            // 2.2.存入redis
            redisTemplate.opsForValue().set("item:id:" + item.getId(), json);
        }

        // 3.查询商品库存信息
        List<ItemStock> stockList = stockService.list();
        // 4.放入缓存
        for (ItemStock stock : stockList) {
            // 2.1.item序列化为JSON
            String json = MAPPER.writeValueAsString(stock);
            // 2.2.存入redis
            redisTemplate.opsForValue().set("item:stock:id:" + stock.getId(), json);
        }
    }
}

查询Redis缓存

现在,Redis 缓存已经准备就绪,我们可以再 OpenResty 中实现查询 Redis 的逻辑了。如下图红框所示:

当请求进入 OpenResty 之后:

封装Redis工具

OpenResty 提供了操作 Redis 的模块,我们只要引入该模块就能直接使用。但是为了方便,我们将 Redis 操作封装到之前的 common.lua 工具库中。

修改 /usr/local/openresty/lualib/common.lua 文件

1)引入 Redis 模块,并初始化 Redis 对象

-- 导入redis
local redis = require('resty.redis')
-- 初始化redis
local red = redis:new()
red:set_timeouts(1000, 1000, 1000)

2)封装函数,用来释放 Redis 连接,其实是放入连接池

-- 关闭redis连接的工具方法,其实是放入连接池
local function close_redis(red)
    local pool_max_idle_time = 10000 -- 连接的空闲时间,单位是毫秒
    local pool_size = 100 --连接池大小
    local ok, err = red:set_keepalive(pool_max_idle_time, pool_size)
    if not ok then
        ngx.log(ngx.ERR, "放入redis连接池失败: ", err)
    end
end

3)封装函数,根据 key 查询 Redis 数据

-- 查询redis的方法 ip和port是redis地址,key是查询的key
local function read_redis(ip, port, key)
    -- 获取一个连接
    local ok, err = red:connect(ip, port)
    if not ok then
        ngx.log(ngx.ERR, "连接redis失败 : ", err)
        return nil
    end
    -- 查询redis
    local resp, err = red:get(key)
    -- 查询失败处理
    if not resp then
        ngx.log(ngx.ERR, "查询Redis失败: ", err, ", key = " , key)
    end
    --得到的数据为空处理
    if resp == ngx.null then
        resp = nil
        ngx.log(ngx.ERR, "查询Redis数据为空, key = ", key)
    end
    close_redis(red)
    return resp
end

4)导出

-- 将方法导出
local _M = {  
    read_http = read_http,
    read_redis = read_redis
}  
return _M

完整的 common.lua

-- 导入redis
local redis = require('resty.redis')
-- 初始化redis
local red = redis:new()
red:set_timeouts(1000, 1000, 1000)

-- 关闭redis连接的工具方法,其实是放入连接池
local function close_redis(red)
    local pool_max_idle_time = 10000 -- 连接的空闲时间,单位是毫秒
    local pool_size = 100 --连接池大小
    local ok, err = red:set_keepalive(pool_max_idle_time, pool_size)
    if not ok then
        ngx.log(ngx.ERR, "放入redis连接池失败: ", err)
    end
end

-- 查询redis的方法 ip和port是redis地址,key是查询的key
local function read_redis(ip, port, key)
    -- 获取一个连接
    local ok, err = red:connect(ip, port)
    if not ok then
        ngx.log(ngx.ERR, "连接redis失败 : ", err)
        return nil
    end
    -- 查询redis
    local resp, err = red:get(key)
    -- 查询失败处理
    if not resp then
        ngx.log(ngx.ERR, "查询Redis失败: ", err, ", key = " , key)
    end
    --得到的数据为空处理
    if resp == ngx.null then
        resp = nil
        ngx.log(ngx.ERR, "查询Redis数据为空, key = ", key)
    end
    close_redis(red)
    return resp
end

-- 封装函数,发送http请求,并解析响应
local function read_http(path, params)
    local resp = ngx.location.capture(path,{
        method = ngx.HTTP_GET,
        args = params,
    })
    if not resp then
        -- 记录错误信息,返回404
        ngx.log(ngx.ERR, "http查询失败, path: ", path , ", args: ", args)
        ngx.exit(404)
    end
    return resp.body
end
-- 将方法导出
local _M = {  
    read_http = read_http,
    read_redis = read_redis
}  
return _M

实现Redis查询

接下来,我们就可以去修改 item.lua 文件,实现对 Redis 的查询了。

查询逻辑是

1)修改 /usr/local/openresty/lua/item.lua 文件,添加一个查询函数:

-- 导入common函数库
local common = require('common')
local read_http = common.read_http
local read_redis = common.read_redis
-- 封装查询函数
function read_data(key, path, params)
    -- 查询本地缓存
    local val = read_redis("127.0.0.1", 6379, key)
    -- 判断查询结果
    if not val then
        ngx.log(ngx.ERR, "redis查询失败,尝试查询http, key: ", key)
        -- redis查询失败,去查询http
        val = read_http(path, params)
    end
    -- 返回数据
    return val
end

2)而后修改商品查询、库存查询的业务:

3)完整的 item.lua 代码

-- 导入common函数库
local common = require('common')
local read_http = common.read_http
local read_redis = common.read_redis
-- 导入cjson库
local cjson = require('cjson')

-- 封装查询函数
function read_data(key, path, params)
    -- 查询本地缓存
    local val = read_redis("127.0.0.1", 6379, key)
    -- 判断查询结果
    if not val then
        ngx.log(ngx.ERR, "redis查询失败,尝试查询http, key: ", key)
        -- redis查询失败,去查询http
        val = read_http(path, params)
    end
    -- 返回数据
    return val
end

-- 获取路径参数
local id = ngx.var[1]

-- 查询商品信息
local itemJSON = read_data("item:id:" .. id,  "/item/" .. id, nil)
-- 查询库存信息
local stockJSON = read_data("item:stock:id:" .. id, "/item/stock/" .. id, nil)

-- JSON转化为lua的table
local item = cjson.decode(itemJSON)
local stock = cjson.decode(stockJSON)
-- 组合数据
item.stock = stock.stock
item.sold = stock.sold

-- 把item序列化为json 返回结果
ngx.say(cjson.encode(item))

Nginx本地缓存

现在,整个多级缓存中只差最后一环,也就是 nginx 的本地缓存了。

本地缓存API

OpenResty 为 Nginx 提供了 shard dict 的功能,可以在 nginx 的多个 worker 之间共享数据,实现缓存功能。

1)开启共享字典,在 nginx.conf 的 http 下添加配置

 # 共享字典,也就是本地缓存,名称叫做:item_cache,大小150m
 lua_shared_dict item_cache 150m; 

2)操作共享字典

-- 获取本地缓存对象
local item_cache = ngx.shared.item_cache
-- 存储, 指定key、value、过期时间,单位s,默认为0代表永不过期
item_cache:set('key', 'value', 1000)
-- 读取
local val = item_cache:get('key')

实现本地缓存查询

1)修改 /usr/local/openresty/lua/item.lua 文件,修改 read_data 查询函数,添加本地缓存逻辑

-- 导入共享词典,本地缓存
local item_cache = ngx.shared.item_cache

-- 封装查询函数
function read_data(key, expire, path, params)
    -- 查询本地缓存
    local val = item_cache:get(key)
    if not val then
        ngx.log(ngx.ERR, "本地缓存查询失败,尝试查询Redis, key: ", key)
        -- 查询redis
        val = read_redis("127.0.0.1", 6379, key)
        -- 判断查询结果
        if not val then
            ngx.log(ngx.ERR, "redis查询失败,尝试查询http, key: ", key)
            -- redis查询失败,去查询http
            val = read_http(path, params)
        end
    end
    -- 查询成功,把数据写入本地缓存
    item_cache:set(key, val, expire)
    -- 返回数据
    return val
end

2)修改 item.lua 中查询商品和库存的业务,实现最新的 read_data 函数

其实就是多了缓存时间参数,过期后 nginx 缓存会自动删除,下次访问即可更新缓存。

这里给商品基本信息设置超时时间为 30 分钟,库存为 1 分钟。

因为库存更新频率较高,如果缓存时间过长,可能与数据库差异较大。

3)完整的 item.lua 文件

-- 导入common函数库
local common = require('common')
local read_http = common.read_http
local read_redis = common.read_redis
-- 导入cjson库
local cjson = require('cjson')
-- 导入共享词典,本地缓存
local item_cache = ngx.shared.item_cache

-- 封装查询函数
function read_data(key, expire, path, params)
    -- 查询本地缓存
    local val = item_cache:get(key)
    if not val then
        ngx.log(ngx.ERR, "本地缓存查询失败,尝试查询Redis, key: ", key)
        -- 查询redis
        val = read_redis("127.0.0.1", 6379, key)
        -- 判断查询结果
        if not val then
            ngx.log(ngx.ERR, "redis查询失败,尝试查询http, key: ", key)
            -- redis查询失败,去查询http
            val = read_http(path, params)
        end
    end
    -- 查询成功,把数据写入本地缓存
    item_cache:set(key, val, expire)
    -- 返回数据
    return val
end

-- 获取路径参数
local id = ngx.var[1]

-- 查询商品信息
local itemJSON = read_data("item:id:" .. id, 1800,  "/item/" .. id, nil)
-- 查询库存信息
local stockJSON = read_data("item:stock:id:" .. id, 60, "/item/stock/" .. id, nil)

-- JSON转化为lua的table
local item = cjson.decode(itemJSON)
local stock = cjson.decode(stockJSON)
-- 组合数据
item.stock = stock.stock
item.sold = stock.sold

-- 把item序列化为json 返回结果
ngx.say(cjson.encode(item))

缓存同步

大多数情况下,浏览器查询到的都是缓存数据,如果缓存数据与数据库数据存在较大差异,可能会产生比较严重的后果。

所以我们必须保证数据库数据、缓存数据的一致性,这就是缓存与数据库的同步。

数据同步策略

缓存数据同步的常见方式有三种:

设置有效期:给缓存设置有效期,到期后自动删除。再次查询时更新

同步双写:在修改数据库的同时,直接修改缓存

异步通知:修改数据库时发送事件通知,相关服务监听到通知后修改缓存数据

而异步实现又可以基于 MQ 或者 Canal 来实现:

1)基于MQ的异步通知

依然有少量的代码侵入。

2)基于 Canal 的通知

代码零侵入

安装Canal

认识Canal

Canal [kə’næl],译意为水道/管道/沟渠,canal 是阿里巴巴旗下的一款开源项目,基于 Java 开发。基于数据库增量日志解析,提供增量数据订阅&消费。GitHub 的地址:https://github.com/alibaba/canal

Canal 是基于 mysql 的主从同步来实现的,MySQL 主从同步的原理如下:

而 Canal 就是把自己伪装成 MySQL 的一个 slave 节点,从而监听 master 的 binary log 变化。再把得到的变化信息通知给 Canal 的客户端,进而完成对其它数据库的同步。

安装Canal

安装和配置 Canal 参考资料《安装 Cannal.md》

监听Canal

Canal 提供了各种语言的客户端,当 Canal 监听到 binlog 变化时,会通知 Canal 的客户端。

我们可以利用 Canal 提供的 Java 客户端,监听 Canal 通知消息。当收到变化的消息时,完成对缓存的更新。

不过这里我们会使用 GitHub 上的第三方开源的 canal-starter 客户端。地址:https://github.com/NormanGyllenhaal/canal-client

与 SpringBoot 完美整合,自动装配,比官方客户端要简单好用很多。

引入依赖

<dependency>
    <groupId>top.javatool</groupId>
    <artifactId>canal-spring-boot-starter</artifactId>
    <version>1.2.1-RELEASE</version>
</dependency>

编写配置

canal:
  destination: heima # canal的集群名字,要与安装canal时设置的名称一致
  server: 192.168.150.101:11111 # canal服务地址

修改Item实体类

通过 @Id、@Column、等注解完成 Item 与数据库表字段的映射

package com.heima.item.pojo;

import com.baomidou.mybatisplus.annotation.IdType;
import com.baomidou.mybatisplus.annotation.TableField;
import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.annotation.TableName;
import lombok.Data;
import org.springframework.data.annotation.Id;
import org.springframework.data.annotation.Transient;

import javax.persistence.Column;
import java.util.Date;

@Data
@TableName("tb_item")
public class Item {
    @TableId(type = IdType.AUTO)
    @Id
    private Long id;//商品id
    @Column(name = "name")
    private String name;//商品名称
    private String title;//商品标题
    private Long price;//价格(分)
    private String image;//商品图片
    private String category;//分类名称
    private String brand;//品牌名称
    private String spec;//规格
    private Integer status;//商品状态 1-正常,2-下架
    private Date createTime;//创建时间
    private Date updateTime;//更新时间
    @TableField(exist = false)
    @Transient
    private Integer stock;
    @TableField(exist = false)
    @Transient
    private Integer sold;
}

编写监听器

通过实现 EntryHandler<T> 接口编写监听器,监听 Canal 消息。注意两点:

package com.heima.item.canal;

import com.github.benmanes.caffeine.cache.Cache;
import com.heima.item.config.RedisHandler;
import com.heima.item.pojo.Item;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import top.javatool.canal.client.annotation.CanalTable;
import top.javatool.canal.client.handler.EntryHandler;

@CanalTable("tb_item")
@Component
public class ItemHandler implements EntryHandler<Item> {

    @Autowired
    private RedisHandler redisHandler;
    @Autowired
    private Cache<Long, Item> itemCache;

    @Override
    public void insert(Item item) {
        // 写数据到JVM进程缓存
        itemCache.put(item.getId(), item);
        // 写数据到redis
        redisHandler.saveItem(item);
    }

    @Override
    public void update(Item before, Item after) {
        // 写数据到JVM进程缓存
        itemCache.put(after.getId(), after);
        // 写数据到redis
        redisHandler.saveItem(after);
    }

    @Override
    public void delete(Item item) {
        // 删除数据到JVM进程缓存
        itemCache.invalidate(item.getId());
        // 删除数据到redis
        redisHandler.deleteItemById(item.getId());
    }
}

在这里对 Redis 的操作都封装到了 RedisHandler 这个对象中,是我们之前做缓存预热时编写的一个类,内容如下

package com.heima.item.config;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.heima.item.pojo.Item;
import com.heima.item.pojo.ItemStock;
import com.heima.item.service.IItemService;
import com.heima.item.service.IItemStockService;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Component;

import java.util.List;

@Component
public class RedisHandler implements InitializingBean {

    @Autowired
    private StringRedisTemplate redisTemplate;

    @Autowired
    private IItemService itemService;
    @Autowired
    private IItemStockService stockService;

    private static final ObjectMapper MAPPER = new ObjectMapper();

    @Override
    public void afterPropertiesSet() throws Exception {
        // 初始化缓存
        // 1.查询商品信息
        List<Item> itemList = itemService.list();
        // 2.放入缓存
        for (Item item : itemList) {
            // 2.1.item序列化为JSON
            String json = MAPPER.writeValueAsString(item);
            // 2.2.存入redis
            redisTemplate.opsForValue().set("item:id:" + item.getId(), json);
        }

        // 3.查询商品库存信息
        List<ItemStock> stockList = stockService.list();
        // 4.放入缓存
        for (ItemStock stock : stockList) {
            // 2.1.item序列化为JSON
            String json = MAPPER.writeValueAsString(stock);
            // 2.2.存入redis
            redisTemplate.opsForValue().set("item:stock:id:" + stock.getId(), json);
        }
    }

    public void saveItem(Item item) {
        try {
            String json = MAPPER.writeValueAsString(item);
            redisTemplate.opsForValue().set("item:id:" + item.getId(), json);
        } catch (JsonProcessingException e) {
            throw new RuntimeException(e);
        }
    }

    public void deleteItemById(Long id) {
        redisTemplate.delete("item:id:" + id);
    }
}