为什么要自定义负载均衡
在实际的开发中总是会遇到如下的场景:A,B两个成员一起开发,A在开发C服务,B在开发D服务,测试环境中也部署有C服务和D服务,A和B的诉求是不互相干扰对方。但是实际中nacos中注册了服务之后默认的负载策略是根据权重来决定访问哪个服务,当权重一致时就是各50%的访问,很容易就对其他成员造成干扰,所以我们需要一个自定义负载均衡器来做一个分流。
负载均衡的实现
在springcloud的解决方案中负载均衡器使用的LoadBalancer框架,我们需要实现ReactorServiceInstanceLoadBalancer接口来完成自定义负载均衡。所以我们的思路就是在网关中进行打标,然后通过自定义负载均衡器来确定具体调用的服务。
网关处理打标
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28
| @Component @Slf4j public class LoadBalancerFilter implements GlobalFilter, Ordered { @Resource private PomeloLoadBalancerProperties pomeloLoadBalancerProperties; @Override public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) { ServerHttpRequest request = exchange.getRequest(); ServerHttpRequest.Builder mutate = request.mutate(); //灰度标识分两种情况,1.通过header传过来,2.通过url传过来 String gray = request.getHeaders().getFirst("gray"); if (StrUtil.isNotEmpty(gray)) { HeaderUtils.addHeader(mutate, pomeloLoadBalancerProperties.getHeader(), gray, true); return chain.filter(exchange.mutate().request(mutate.build()).build()); } String grayUrl = request.getQueryParams().getFirst("gray"); if (StrUtil.isNotEmpty(grayUrl)) { HeaderUtils.addHeader(mutate, pomeloLoadBalancerProperties.getHeader(), grayUrl, true); return chain.filter(exchange.mutate().request(mutate.build()).build()); } return chain.filter(exchange); }
@Override public int getOrder() { return 0; } }
|
自定义负载均衡处理器
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77
| public class PomeloLoadBalancer implements ReactorServiceInstanceLoadBalancer { private static final Logger log = LoggerFactory.getLogger(PomeloLoadBalancer.class); @Resource private PomeloLoadBalancerProperties pomeloLoadBalancerProperties;
private final ObjectProvider<ServiceInstanceListSupplier> serviceInstanceListSupplierProvider; private final String serviceId;
private final AtomicInteger position;
public PomeloLoadBalancer(ObjectProvider<ServiceInstanceListSupplier> serviceInstanceListSupplierProvider , String serviceId,PomeloLoadBalancerProperties pomeloLoadBalancerProperties) { this(serviceInstanceListSupplierProvider,serviceId,new Random().nextInt(1000),pomeloLoadBalancerProperties); }
public PomeloLoadBalancer(ObjectProvider<ServiceInstanceListSupplier> serviceInstanceListSupplierProvider, String serviceId, int seedPosition, PomeloLoadBalancerProperties pomeloLoadBalancerProperties) { this.serviceId = serviceId; this.serviceInstanceListSupplierProvider = serviceInstanceListSupplierProvider; this.position = new AtomicInteger(seedPosition); this.pomeloLoadBalancerProperties = pomeloLoadBalancerProperties; }
@Override public Mono<Response<ServiceInstance>> choose(Request request) { ServiceInstanceListSupplier supplier = this.serviceInstanceListSupplierProvider .getIfAvailable(NoopServiceInstanceListSupplier::new); return supplier.get(request).next() .map(serviceInstances -> processInstanceResponse(serviceInstances,request));
}
/** * 灰度处理 * @param instances * @param request * @return */ private Response<ServiceInstance> processInstanceResponse(List<ServiceInstance> instances, Request request) { if (instances.isEmpty()) { return new EmptyResponse(); } DefaultRequestContext requestContext = (DefaultRequestContext) request.getContext(); RequestData clientRequest = (RequestData) requestContext.getClientRequest(); HttpHeaders headers = clientRequest.getHeaders(); // get Request Header String reqVersion = headers.getFirst(pomeloLoadBalancerProperties.getHeader());
List<ServiceInstance> normalServiceInstances = instances.stream() .filter(instance -> !instance.getMetadata().containsKey(pomeloLoadBalancerProperties.getHeader())) .collect(Collectors.toList()); log.debug("X-Pomelo-LoadBalancer:{}",reqVersion); if (!StrUtil.isEmpty(reqVersion)) { // filter service instances List<ServiceInstance> gsServiceInstances = instances.stream() .filter(instance -> reqVersion.equals(instance.getMetadata().get(pomeloLoadBalancerProperties.getHeader()))) .collect(Collectors.toList());
if (gsServiceInstances.size() > 0) { return processRibbonInstanceResponse(gsServiceInstances); } } return processRibbonInstanceResponse(normalServiceInstances); }
/** * 负载均衡器 * 参考 org.springframework.cloud.loadbalancer.core.RoundRobinLoadBalancer#getInstanceResponse */ private Response<ServiceInstance> processRibbonInstanceResponse(List<ServiceInstance> instances) { int pos = Math.abs(this.position.incrementAndGet()); ServiceInstance instance = instances.get(pos % instances.size()); return new DefaultResponse(instance); } }
|