进学阁

业精于勤荒于嬉,行成于思毁于随

0%

OpenFeign调用时如何传递传递Token并且多线程环境也能适用

为什么要自定义负载均衡

在实际的开发中总是会遇到如下的场景:A,B两个成员一起开发,A在开发C服务,B在开发D服务,测试环境中也部署有C服务和D服务,A和B的诉求是不互相干扰对方。但是实际中nacos中注册了服务之后默认的负载策略是根据权重来决定访问哪个服务,当权重一致时就是各50%的访问,很容易就对其他成员造成干扰,所以我们需要一个自定义负载均衡器来做一个分流。

负载均衡的实现

在springcloud的解决方案中负载均衡器使用的LoadBalancer框架,我们需要实现ReactorServiceInstanceLoadBalancer接口来完成自定义负载均衡。所以我们的思路就是在网关中进行打标,然后通过自定义负载均衡器来确定具体调用的服务。

网关处理打标

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
@Component
@Slf4j
public class LoadBalancerFilter implements GlobalFilter, Ordered {
@Resource
private PomeloLoadBalancerProperties pomeloLoadBalancerProperties;
@Override
public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
ServerHttpRequest request = exchange.getRequest();
ServerHttpRequest.Builder mutate = request.mutate();
//灰度标识分两种情况,1.通过header传过来,2.通过url传过来
String gray = request.getHeaders().getFirst("gray");
if (StrUtil.isNotEmpty(gray)) {
HeaderUtils.addHeader(mutate, pomeloLoadBalancerProperties.getHeader(), gray, true);
return chain.filter(exchange.mutate().request(mutate.build()).build());
}
String grayUrl = request.getQueryParams().getFirst("gray");
if (StrUtil.isNotEmpty(grayUrl)) {
HeaderUtils.addHeader(mutate, pomeloLoadBalancerProperties.getHeader(), grayUrl, true);
return chain.filter(exchange.mutate().request(mutate.build()).build());
}
return chain.filter(exchange);
}

@Override
public int getOrder() {
return 0;
}
}

自定义负载均衡处理器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
public class PomeloLoadBalancer implements ReactorServiceInstanceLoadBalancer {
private static final Logger log = LoggerFactory.getLogger(PomeloLoadBalancer.class);
@Resource
private PomeloLoadBalancerProperties pomeloLoadBalancerProperties;

private final ObjectProvider<ServiceInstanceListSupplier> serviceInstanceListSupplierProvider;
private final String serviceId;

private final AtomicInteger position;

public PomeloLoadBalancer(ObjectProvider<ServiceInstanceListSupplier> serviceInstanceListSupplierProvider
, String serviceId,PomeloLoadBalancerProperties pomeloLoadBalancerProperties) {
this(serviceInstanceListSupplierProvider,serviceId,new Random().nextInt(1000),pomeloLoadBalancerProperties);
}

public PomeloLoadBalancer(ObjectProvider<ServiceInstanceListSupplier> serviceInstanceListSupplierProvider,
String serviceId, int seedPosition,
PomeloLoadBalancerProperties pomeloLoadBalancerProperties) {
this.serviceId = serviceId;
this.serviceInstanceListSupplierProvider = serviceInstanceListSupplierProvider;
this.position = new AtomicInteger(seedPosition);
this.pomeloLoadBalancerProperties = pomeloLoadBalancerProperties;
}

@Override
public Mono<Response<ServiceInstance>> choose(Request request) {
ServiceInstanceListSupplier supplier = this.serviceInstanceListSupplierProvider
.getIfAvailable(NoopServiceInstanceListSupplier::new);
return supplier.get(request).next()
.map(serviceInstances -> processInstanceResponse(serviceInstances,request));

}

/**
* 灰度处理
* @param instances
* @param request
* @return
*/
private Response<ServiceInstance> processInstanceResponse(List<ServiceInstance> instances, Request request) {
if (instances.isEmpty()) {
return new EmptyResponse();
}
DefaultRequestContext requestContext = (DefaultRequestContext) request.getContext();
RequestData clientRequest = (RequestData) requestContext.getClientRequest();
HttpHeaders headers = clientRequest.getHeaders();
// get Request Header
String reqVersion = headers.getFirst(pomeloLoadBalancerProperties.getHeader());

List<ServiceInstance> normalServiceInstances = instances.stream()
.filter(instance -> !instance.getMetadata().containsKey(pomeloLoadBalancerProperties.getHeader()))
.collect(Collectors.toList());
log.debug("X-Pomelo-LoadBalancer:{}",reqVersion);
if (!StrUtil.isEmpty(reqVersion)) {
// filter service instances
List<ServiceInstance> gsServiceInstances = instances.stream()
.filter(instance -> reqVersion.equals(instance.getMetadata().get(pomeloLoadBalancerProperties.getHeader())))
.collect(Collectors.toList());

if (gsServiceInstances.size() > 0) {
return processRibbonInstanceResponse(gsServiceInstances);
}
}
return processRibbonInstanceResponse(normalServiceInstances);
}

/**
* 负载均衡器
* 参考 org.springframework.cloud.loadbalancer.core.RoundRobinLoadBalancer#getInstanceResponse
*/
private Response<ServiceInstance> processRibbonInstanceResponse(List<ServiceInstance> instances) {
int pos = Math.abs(this.position.incrementAndGet());
ServiceInstance instance = instances.get(pos % instances.size());
return new DefaultResponse(instance);
}
}