您现在的位置是:亿华云 > 系统运维
Nacos2.0配置灰度发布原理源码解析
亿华云2025-10-03 03:08:33【系统运维】6人已围观
简介今天分享的是我们组的一个实习生写的一篇源码解析文章,小伙子实习期间在社区Nacos2.0的基础上对灰度发布的能力进行了增强,并完成了MSE Nacos2.0上从管控到内核的灰度发布能力的研发。以下是他
今天分享的置灰是我们组的一个实习生写的一篇源码解析文章,小伙子实习期间在社区Nacos2.0的布原基础上对灰度发布的能力进行了增强,并完成了MSE Nacos2.0上从管控到内核的理源灰度发布能力的研发。以下是码解他对配置发布流程的代码解析,相信看完之后你会感叹:现在的置灰实习生都有这个水平了吗?
说到灰度发布,就不得不提到阿里的布原安全生产三板斧:可监控、可灰度、理源可回滚。码解在阿里内部,置灰对于安全生产是布原高度重视的,灰度可以说是理源发布之前的必备流程。因此,码解作为阿里的置灰配置中心,Nacos同样支持了配置灰度的布原功能,可以通过控制台进行配置的理源灰度推送、云服务器提供商回滚,从而实现安全的配置发布。一般来说,我们按照下图所示流程进行配置的安全修改。只有在小规模机器上验证配置按预期生效之后才会正式发布配置,否则就回滚灰度配置。
发布流程
配置灰度发布流程
社区Nacos的灰度是基于IP的方式进行的,用户需要在控制台,选择需要灰度的配置,然后新建灰度配置,选择灰度机器的IP进行配置推送。整个交互流程如下图所示。
IP灰度机制
具体的使用方法,如果使用的是自建的社区Nacos,可以访问http://ip:port/nacos进入控制台,在配置管理的编辑页面进行配置灰度发布,如下图。
社区Nacos控制台
如果使用的是阿里云的服务器托管MSE微服务引擎,可以查看MSE配置灰度发布帮助文档了解使用方法,目前在Nacos2.0专业版上已经支持灰度功能,在MSE控制台打开Beta按钮即可,如下图所示。
MSE Beta发布
Nacos灰度原理
Nacos的灰度发布原理其实并不复杂,本质就如同下面这张流程图。
灰度原理
乍一看,这个流程好复杂,实际上定睛一看,好像也没啥。整个过程就是Client、Server和Console之间的交互。Client端监听Server上的配置,建立长连接并上报自己的客户端信息,例如IP地址。Console负责进行配置灰度的调用,将用户所需要的灰度配置请求发送到Server端。然后Server端根据用户的灰度配置请求中的IP地址,过滤与客户端的长连接,然后将灰度配置定向推送到对应IP的客户端中即可。下面笔者从长连接的亿华云建立到配置灰度,进行详细的源码分析。
长连接建立
在Nacos2.0版本之前,Nacos主要采用长轮询的方式在客户端拉取服务端的配置信息。而在Nacos2.0版本中,引入了基于gRPC的长连接模型来提升配置监听的性能,客户端和服务端会建立长连接来监听配置的变更,一旦服务端有配置变更,就会将配置信息推送到客户端中。在Nacos源码中,这一过程主要涉及到两个组件之间的交互,即com.alibaba.nacos.common.remote.client.grpc包下的GrpcSdkClient类和com.alibaba.nacos.core.remote.grpc包下的GrpcBiStreamRequestAcceptor类。然而,GrpcSdkClient中没有定义具体的连接逻辑,其主要逻辑在其父类GrpcClient中。下面这段代码就是客户端连接服务端的核心代码,位于GrpcClient的connectToServer方法。
@Override public Connection connectToServer(ServerInfo serverInfo) { try { // ...... int port = serverInfo.getServerPort() + rpcPortOffset(); // 创建一个Grpc的Stub RequestGrpc.RequestFutureStub newChannelStubTemp = createNewChannelStub(serverInfo.getServerIp(), port); if (newChannelStubTemp != null) { // 检查服务端是否可用 Response response = serverCheck(serverInfo.getServerIp(), port, newChannelStubTemp); if (response == null || !(response instanceof ServerCheckResponse)) { shuntDownChannel((ManagedChannel) newChannelStubTemp.getChannel()); return null; } // 创建一个Grpc的Stream BiRequestStreamGrpc.BiRequestStreamStub biRequestStreamStub = BiRequestStreamGrpc .newStub(newChannelStubTemp.getChannel()); // 创建连接信息,保存Grpc的连接信息,也就是长连接的一个holder GrpcConnection grpcConn = new GrpcConnection(serverInfo, grpcExecutor); grpcConn.setConnectionId(((ServerCheckResponse) response).getConnectionId()); // 创建stream请求同时绑定到当前连接中 StreamObserver<Payload> payloadStreamObserver = bindRequestStream(biRequestStreamStub, grpcConn); // 绑定Grpc相关连接信息 grpcConn.setPayloadStreamObserver(payloadStreamObserver); grpcConn.setGrpcFutureServiceStub(newChannelStubTemp); grpcConn.setChannel((ManagedChannel) newChannelStubTemp.getChannel()); // 发送一个初始化连接请求,用于上报客户端的一些信息,例如标签、客户端版本等 ConnectionSetupRequest conSetupRequest = new ConnectionSetupRequest(); conSetupRequest.setClientVersion(VersionUtils.getFullClientVersion()); conSetupRequest.setLabels(super.getLabels()); conSetupRequest.setAbilities(super.clientAbilities); conSetupRequest.setTenant(super.getTenant()); grpcConn.sendRequest(conSetupRequest); // 等待连接建立成功 Thread.sleep(100L); return grpcConn; } return null; } catch (Exception e) { LOGGER.error("[{ }]Fail to connect to server!,error={ }", GrpcClient.this.getName(), e); } return null; }上面这段代码主要功能有两个,一个是与服务端建立gRPC的长连接,另一个功能主要是初始化连接,后者是实现配置灰度发布的前提。在上文中有提到,配置灰度发布的过程中,需要根据控制台的灰度配置请求中的IP信息过滤长连接,在服务端就是根据连接建立初始化时上报的信息实现的过滤。从上面的代码中可以看到,ConnectionSetupRequest作为一个初始化请求,携带着客户端版本、标签等信息,但是好像并没有携带IP地址的信息。实际上,ConnectionSetupRequest也确实没有携带IP地址信息。因为在Nacos设计中,采用Request来表明客户端的请求信息,而IP地址更像是属于连接层的信息,应该属于连接的元信息,因此并没有放在Request中进行显式的设置,而是在发送请求时自动的作为Metadata信息发送到服务端中。可以看一下com.alibaba.nacos.common.remote.client.grpc包下的GrpcConnection的sendRequest方法,该方法接收一个Request请求作为参数,将请求发送给服务端。
public void sendRequest(Request request) { // 将request转换为Grpc的Payload Payload convert = GrpcUtils.convert(request); // 通过Grpc的流发送请求 payloadStreamObserver.onNext(convert); }IP地址的设置,就在com.alibaba.nacos.common.remote.client.grpc包下的GrpcUtils的convert方法中,该方法主要将一个Request转换为gRPC的Payload。
/** * convert request to payload. * * @param request request. * @return payload. */ public static Payload convert(Request request) { // 设置元信息 Metadata newMeta = Metadata.newBuilder().setType(request.getClass().getSimpleName()) .setClientIp(NetUtils.localIP()).putAllHeaders(request.getHeaders()).build(); request.clearHeaders(); // 转换为json String jsonString = toJson(request); Payload.Builder builder = Payload.newBuilder(); // 创建Payload return builder .setBody(Any.newBuilder().setValue(ByteString.copyFrom(jsonString, Charset.forName(Constants.ENCODE)))) .setMetadata(newMeta).build(); }可以看到,这里通过NetUtils.localIP()方法获取客户端的IP信息,并存入到Metadata中,跟随Payload一起上报给服务端。到这里,客户端这里的连接过程就暂时完成了,下面介绍一下服务端接收到连接请求的响应过程。
在服务端,主要通过GrpcBiStreamRequestAcceptor的requestBiStream方法接收客户端请求,如下所示。
@Override public StreamObserver<Payload> requestBiStream(StreamObserver<Payload> responseObserver) { StreamObserver<Payload> streamObserver = new StreamObserver<Payload>() { final String connectionId = CONTEXT_KEY_CONN_ID.get(); final Integer localPort = CONTEXT_KEY_CONN_LOCAL_PORT.get(); final int remotePort = CONTEXT_KEY_CONN_REMOTE_PORT.get(); String remoteIp = CONTEXT_KEY_CONN_REMOTE_IP.get(); String clientIp = ""; @Override public void onNext(Payload payload) { // 获取客户端IP clientIp = payload.getMetadata().getClientIp(); traceDetailIfNecessary(payload); Object parseObj; try { parseObj = GrpcUtils.parse(payload); } catch (Throwable throwable) { Loggers.REMOTE_DIGEST .warn("[{ }]Grpc request bi stream,payload parse error={ }", connectionId, throwable); return; } if (parseObj == null) { Loggers.REMOTE_DIGEST .warn("[{ }]Grpc request bi stream,payload parse null ,body={ },meta={ }", connectionId, payload.getBody().getValue().toStringUtf8(), payload.getMetadata()); return; } // 处理初始化请求 if (parseObj instanceof ConnectionSetupRequest) { ConnectionSetupRequest setUpRequest = (ConnectionSetupRequest) parseObj; Map<String, String> labels = setUpRequest.getLabels(); String appName = "-"; if (labels != null && labels.containsKey(Constants.APPNAME)) { appName = labels.get(Constants.APPNAME); } ConnectionMeta metaInfo = new ConnectionMeta(connectionId, payload.getMetadata().getClientIp(), remoteIp, remotePort, localPort, ConnectionType.GRPC.getType(), setUpRequest.getClientVersion(), appName, setUpRequest.getLabels()); metaInfo.setTenant(setUpRequest.getTenant()); // 服务端的长连接信息holder Connection connection = new GrpcConnection(metaInfo, responseObserver, CONTEXT_KEY_CHANNEL.get()); connection.setAbilities(setUpRequest.getAbilities()); boolean rejectSdkOnStarting = metaInfo.isSdkSource() && !ApplicationUtils.isStarted(); // 注册connection到connectionManager中 if (rejectSdkOnStarting || !connectionManager.register(connectionId, connection)) { //Not register to the connection manager if current server is over limit or server is starting. try { Loggers.REMOTE_DIGEST.warn("[{ }]Connection register fail,reason:{ }", connectionId, rejectSdkOnStarting ? " server is not started" : " server is over limited."); connection.request(new ConnectResetRequest(), 3000L); connection.close(); } catch (Exception e) { //Do nothing. if (connectionManager.traced(clientIp)) { Loggers.REMOTE_DIGEST .warn("[{ }]Send connect reset request error,error={ }", connectionId, e); } } } } else if (parseObj instanceof Response) { Response response = (Response) parseObj; if (connectionManager.traced(clientIp)) { Loggers.REMOTE_DIGEST .warn("[{ }]Receive response of server request ,response={ }", connectionId, response); } RpcAckCallbackSynchronizer.ackNotify(connectionId, response); connectionManager.refreshActiveTime(connectionId); } else { Loggers.REMOTE_DIGEST .warn("[{ }]Grpc request bi stream,unknown payload receive ,parseObj={ }", connectionId, parseObj); } } // ...... }; return streamObserver; }这里我们主要看onNext方法,其负责处理客户端的请求信息,即Payload信息。如果是初始化连接的请求ConnectionSetupRequest,就会记录与客户端之间的长连接信息,并注册到ConnectionManager中。ConnectionManager是服务端维护所有客户端连接信息的类,持有所有的长连接信息,后续的配置推送等都需要通过ConnectionManager获取长连接信息。可以简单看一下ConnectionManager的源码,在com.alibaba.nacos.core.remote包下,如下所示。
/** * connect manager. * * @author liuzunfei * @version $Id: ConnectionManager.java, v 0.1 2020年07月13日 7:07 PM liuzunfei Exp $ */ @Service public class ConnectionManager extends Subscriber<ConnectionLimitRuleChangeEvent> { // ...... Map<String, Connection> connections = new ConcurrentHashMap<String, Connection>(); // ...... /** * register a new connect. * * @param connectionId connectionId * @param connection connection */ public synchronized boolean register(String connectionId, Connection connection) { if (connection.isConnected()) { if (connections.containsKey(connectionId)) { return true; } if (!checkLimit(connection)) { return false; } if (traced(connection.getMetaInfo().clientIp)) { connection.setTraced(true); } // 注册connection connections.put(connectionId, connection); connectionForClientIp.get(connection.getMetaInfo().clientIp).getAndIncrement(); clientConnectionEventListenerRegistry.notifyClientConnected(connection); Loggers.REMOTE_DIGEST .info("new connection registered successfully, connectionId = { },connection={ } ", connectionId, connection); return true; } return false; } // ...... }可以看到,在ConnectionManager中,维护了一个Map。在调用register方法时,将Connection注册到Map中,以供后续的逻辑使用。这里有一个细节,注册到ConnectionManager中的GrpcConnection与客户端持有的GrpcConnection不是一个类。这里的GrpcConnection位于com.alibaba.nacos.core.remote.grpc包,而客户端的GrpcConnection位于com.alibaba.nacos.common.remote.client.grpc包。事实上与客户端有关的gRPC相关的类都在com.alibaba.nacos.common.remote.client.grpc。com.alibaba.nacos.core.remote.grpc则是服务端的相关实现。
到这里,长连接建立的核心流程已经介绍完了,接下来笔者将详细介绍一下配置灰度的推送过程,由于Nacos在这里使用了发布订阅模式以及异步的方法调用,理解起来可能稍微要麻烦一点。
灰度推送
在Nacos中,提供了一组OpenAPI进行配置的管理,配置灰度发布也是其中一个功能,可以在com.alibaba.nacos.config.server.controller包下的ConfigController中查看,包括了BetaConfig的发布、停止和查询,接下来笔者将会一一介绍他们的原理。
创建BetaConfig
创建BetaConfig的API代码如下,一个简单的Web的API。
/** * Adds or updates non-aggregated data. * * @throws NacosException NacosException. */ @PostMapping @Secured(action = ActionTypes.WRITE, parser = ConfigResourceParser.class) public Boolean publishConfig(HttpServletRequest request, HttpServletResponse response, @RequestParam(value = "dataId") String dataId, @RequestParam(value = "group") String group, @RequestParam(value = "tenant", required = false, defaultValue = StringUtils.EMPTY) String tenant, @RequestParam(value = "content") String content, @RequestParam(value = "tag", required = false) String tag, @RequestParam(value = "appName", required = false) String appName, @RequestParam(value = "src_user", required = false) String srcUser, @RequestParam(value = "config_tags", required = false) String configTags, @RequestParam(value = "desc", required = false) String desc, @RequestParam(value = "use", required = false) String use, @RequestParam(value = "effect", required = false) String effect, @RequestParam(value = "type", required = false) String type, @RequestParam(value = "schema", required = false) String schema) throws NacosException { final String srcIp = RequestUtil.getRemoteIp(request); final String requestIpApp = RequestUtil.getAppName(request); srcUser = RequestUtil.getSrcUserName(request); //check type if (!ConfigType.isValidType(type)) { type = ConfigType.getDefaultType().getType(); } // check tenant ParamUtils.checkTenant(tenant); ParamUtils.checkParam(dataId, group, "datumId", content); ParamUtils.checkParam(tag); Map<String, Object> configAdvanceInfo = new HashMap<String, Object>(10); MapUtil.putIfValNoNull(configAdvanceInfo, "config_tags", configTags); MapUtil.putIfValNoNull(configAdvanceInfo, "desc", desc); MapUtil.putIfValNoNull(configAdvanceInfo, "use", use); MapUtil.putIfValNoNull(configAdvanceInfo, "effect", effect); MapUtil.putIfValNoNull(configAdvanceInfo, "type", type); MapUtil.putIfValNoNull(configAdvanceInfo, "schema", schema); ParamUtils.checkParam(configAdvanceInfo); if (AggrWhitelist.isAggrDataId(dataId)) { LOGGER.warn("[aggr-conflict] { } attempt to publish single data, { }, { }", RequestUtil.getRemoteIp(request), dataId, group); throw new NacosException(NacosException.NO_RIGHT, "dataId:" + dataId + " is aggr"); } final Timestamp time = TimeUtils.getCurrentTime(); // 目标灰度机器的IP地址。 String betaIps = request.getHeader("betaIps"); ConfigInfo configInfo = new ConfigInfo(dataId, group, tenant, appName, content); configInfo.setType(type); if (StringUtils.isBlank(betaIps)) { if (StringUtils.isBlank(tag)) { persistService.insertOrUpdate(srcIp, srcUser, configInfo, time, configAdvanceInfo, false); ConfigChangePublisher .notifyConfigChange(new ConfigDataChangeEvent(false, dataId, group, tenant, time.getTime())); } else { persistService.insertOrUpdateTag(configInfo, tag, srcIp, srcUser, time, false); ConfigChangePublisher.notifyConfigChange( new ConfigDataChangeEvent(false, dataId, group, tenant, tag, time.getTime())); } } else { // 发布Beta 配置 persistService.insertOrUpdateBeta(configInfo, betaIps, srcIp, srcUser, time, false); // 通知配置变更 ConfigChangePublisher .notifyConfigChange(new ConfigDataChangeEvent(true, dataId, group, tenant, time.getTime())); } ConfigTraceService .logPersistenceEvent(dataId, group, tenant, requestIpApp, time.getTime(), InetUtils.getSelfIP(), ConfigTraceService.PERSISTENCE_EVENT_PUB, content); return true; }该方法接收一个创建配置的请求,包括配置的data-id、content等信息。从代码中可以看出,该方法是通过判断请求的Header中有无betaIps的值来确定是发布正式配置还是Beta配置的。如果betaIps的值不为空,则表明待发布的配置是一个Beta配置。而配置发布的过程,实际上就是把配置插入或者更新到数据库中。在Nacos中,正式配置和灰度配置是分别存储在不同的表中的,一旦发布就会通过ConfigChangePublisher发布一个ConfigDataChangeEvent事件,然后由订阅了该事件的监听者推送配置信息到客户端。ConfigDataChangeEvent的监听者是AsyncNotifyService类,位于com.alibaba.nacos.config.server.service.notify包下,该类主要用作执行集群之间的数据Dump操作。该类在初始化的时候,会向事件中心NotifyCenter注册一个监听者,用以监听数据变更事件并异步执行数据的Dump操作,如下所示。
/** * Async notify service. * * @author Nacos */ @Service public class AsyncNotifyService { private static final Logger LOGGER = LoggerFactory.getLogger(AsyncNotifyService.class); private final NacosAsyncRestTemplate nacosAsyncRestTemplate = HttpClientManager.getNacosAsyncRestTemplate(); private static final int MIN_RETRY_INTERVAL = 500; private static final int INCREASE_STEPS = 1000; private static final int MAX_COUNT = 6; @Autowired private DumpService dumpService; @Autowired private ConfigClusterRpcClientProxy configClusterRpcClientProxy; private ServerMemberManager memberManager; @Autowired public AsyncNotifyService(ServerMemberManager memberManager) { this.memberManager = memberManager; // Register ConfigDataChangeEvent to NotifyCenter. NotifyCenter.registerToPublisher(ConfigDataChangeEvent.class, NotifyCenter.ringBufferSize); // Register A Subscriber to subscribe ConfigDataChangeEvent. NotifyCenter.registerSubscriber(new Subscriber() { @Override public void onEvent(Event event) { // Generate ConfigDataChangeEvent concurrently if (event instanceof ConfigDataChangeEvent) { ConfigDataChangeEvent evt = (ConfigDataChangeEvent) event; long dumpTs = evt.lastModifiedTs; String dataId = evt.dataId; String group = evt.group; String tenant = evt.tenant; String tag = evt.tag; Collection<Member> ipList = memberManager.allMembers(); // In fact, any type of queue here can be Queue<NotifySingleTask> httpQueue = new LinkedList<NotifySingleTask>(); Queue<NotifySingleRpcTask> rpcQueue = new LinkedList<NotifySingleRpcTask>(); for (Member member : ipList) { // 判断是否是长轮询 if (!MemberUtil.isSupportedLongCon(member)) { // 添加一个长轮询的异步dump任务 httpQueue.add(new NotifySingleTask(dataId, group, tenant, tag, dumpTs, member.getAddress(), evt.isBeta)); } else { // 添加一个长连接的异步dump任务 rpcQueue.add( new NotifySingleRpcTask(dataId, group, tenant, tag, dumpTs, evt.isBeta, member)); } } // 判断并执行长轮询的异步dump任务 if (!httpQueue.isEmpty()) { ConfigExecutor.executeAsyncNotify(new AsyncTask(nacosAsyncRestTemplate, httpQueue)); } // 判断并执行长连接的异步dump任务 if (!rpcQueue.isEmpty()) { ConfigExecutor.executeAsyncNotify(new AsyncRpcTask(rpcQueue)); } } } @Override public Class<? extends Event> subscribeType() { return ConfigDataChangeEvent.class; } }); } }在接收到ConfigDataChangeEvent之后,如果Nacos2.0以上的版本,会创建一个RpcTask用以执行配置变更的通知,由内部类AsyncRpcTask执行,AsyncRpcTask具体逻辑如下所示。
class AsyncRpcTask implements Runnable { private Queue<NotifySingleRpcTask> queue; public AsyncRpcTask(Queue<NotifySingleRpcTask> queue) { this.queue = queue; } @Override public void run() { while (!queue.isEmpty()) { NotifySingleRpcTask task = queue.poll(); // 创建配置变更请求 ConfigChangeClusterSyncRequest syncRequest = new ConfigChangeClusterSyncRequest(); syncRequest.setDataId(task.getDataId()); syncRequest.setGroup(task.getGroup()); syncRequest.setBeta(task.isBeta); syncRequest.setLastModified(task.getLastModified()); syncRequest.setTag(task.tag); syncRequest.setTenant(task.getTenant()); Member member = task.member; // 如果是自身的数据变更,直接执行dump操作 if (memberManager.getSelf().equals(member)) { if (syncRequest.isBeta()) { // 同步Beta配置 dumpService.dump(syncRequest.getDataId(), syncRequest.getGroup(), syncRequest.getTenant(), syncRequest.getLastModified(), NetUtils.localIP(), true); } else { // 同步正式配置 dumpService.dump(syncRequest.getDataId(), syncRequest.getGroup(), syncRequest.getTenant(), syncRequest.getTag(), syncRequest.getLastModified(), NetUtils.localIP()); } continue; } // 通知其他服务端进行dump if (memberManager.hasMember(member.getAddress())) { // start the health check and there are ips that are not monitored, put them directly in the notification queue, otherwise notify boolean unHealthNeedDelay = memberManager.isUnHealth(member.getAddress()); if (unHealthNeedDelay) { // target ip is unhealthy, then put it in the notification list ConfigTraceService.logNotifyEvent(task.getDataId(), task.getGroup(), task.getTenant(), null, task.getLastModified(), InetUtils.getSelfIP(), ConfigTraceService.NOTIFY_EVENT_UNHEALTH, 0, member.getAddress()); // get delay time and set fail count to the task asyncTaskExecute(task); } else { if (!MemberUtil.isSupportedLongCon(member)) { asyncTaskExecute( new NotifySingleTask(task.getDataId(), task.getGroup(), task.getTenant(), task.tag, task.getLastModified(), member.getAddress(), task.isBeta)); } else { try { configClusterRpcClientProxy .syncConfigChange(member, syncRequest, new AsyncRpcNotifyCallBack(task)); } catch (Exception e) { MetricsMonitor.getConfigNotifyException().increment(); asyncTaskExecute(task); } } } } else { //No nothig if member has offline. } } } }这里首先创建了一个ConfigChangeClusterSyncRequest,并将配置信息写入。然后获取集群信息,通知相应的Server处理的数据同步请求。同步配置变更信息的核心逻辑由DumpService来执行。我们主要查看同步Beta配置的操作,DumpService的dump方法如下所示。
/** * Add DumpTask to TaskManager, it will execute asynchronously. */ public void dump(String dataId, String group, String tenant, long lastModified, String handleIp, boolean isBeta) { String groupKey = GroupKey2.getKey(dataId, group, tenant); String taskKey = String.join("+", dataId, group, tenant, String.valueOf(isBeta)); dumpTaskMgr.addTask(taskKey, new DumpTask(groupKey, lastModified, handleIp, isBeta)); DUMP_LOG.info("[dump-task] add task. groupKey={ }, taskKey={ }", groupKey, taskKey); }在该方法中,这里会根据配置变更信息,提交一个异步的DumpTask任务,后续会由DumpProcessor类的process方法进行处理,该方法如下所示。
/** * dump processor. * * @author Nacos * @date 2020/7/5 12:19 PM */ public class DumpProcessor implements NacosTaskProcessor { final DumpService dumpService; public DumpProcessor(DumpService dumpService) { this.dumpService = dumpService; } @Override public boolean process(NacosTask task) { final PersistService persistService = dumpService.getPersistService(); DumpTask dumpTask = (DumpTask) task; String[] pair = GroupKey2.parseKey(dumpTask.getGroupKey()); String dataId = pair[0]; String group = pair[1]; String tenant = pair[2]; long lastModified = dumpTask.getLastModified(); String handleIp = dumpTask.getHandleIp(); boolean isBeta = dumpTask.isBeta(); String tag = dumpTask.getTag(); ConfigDumpEvent.ConfigDumpEventBuilder build = ConfigDumpEvent.builder().namespaceId(tenant).dataId(dataId) .group(group).isBeta(isBeta).tag(tag).lastModifiedTs(lastModified).handleIp(handleIp); if (isBeta) { // 更新Beta配置的缓存 ConfigInfo4Beta cf = persistService.findConfigInfo4Beta(dataId, group, tenant); build.remove(Objects.isNull(cf)); build.betaIps(Objects.isNull(cf) ? null : cf.getBetaIps()); build.content(Objects.isNull(cf) ? null : cf.getContent()); return DumpConfigHandler.configDump(build.build()); } if (StringUtils.isBlank(tag)) { ConfigInfo cf = persistService.findConfigInfo(dataId, group, tenant); build.remove(Objects.isNull(cf)); build.content(Objects.isNull(cf) ? null : cf.getContent()); build.type(Objects.isNull(cf) ? null : cf.getType()); } else { ConfigInfo4Tag cf = persistService.findConfigInfo4Tag(dataId, group, tenant, tag); build.remove(Objects.isNull(cf)); build.content(Objects.isNull(cf) ? null : cf.getContent()); } return DumpConfigHandler.configDump(build.build()); } }可以看到,如果是Beta配置,则获取最新的Beta配置信息,然后触发DumpConfigHandler的configDump方法。进入configDump可以看到,该方法主要用来更新缓存的配置信息,调用ConfigCacheService的相关操作进行配置的更新。
/** * Dump config subscriber. * * @author <a href="mailto:liaochuntao@live.com">liaochuntao</a> */ public class DumpConfigHandler extends Subscriber<ConfigDumpEvent> { /** * trigger config dump event. * * @param event { @link ConfigDumpEvent} * @return { @code true} if the config dump task success , else { @code false} */ public static boolean configDump(ConfigDumpEvent event) { final String dataId = event.getDataId(); final String group = event.getGroup(); final String namespaceId = event.getNamespaceId(); final String content = event.getContent(); final String type = event.getType(); final long lastModified = event.getLastModifiedTs(); if (event.isBeta()) { boolean result = false; // 删除操作 if (event.isRemove()) { result = ConfigCacheService.removeBeta(dataId, group, namespaceId); if (result) { ConfigTraceService.logDumpEvent(dataId, group, namespaceId, null, lastModified, event.getHandleIp(), ConfigTraceService.DUMP_EVENT_REMOVE_OK, System.currentTimeMillis() - lastModified, 0); } return result; } else { // 更新或者发布 result = ConfigCacheService .dumpBeta(dataId, group, namespaceId, content, lastModified, event.getBetaIps()); if (result) { ConfigTraceService.logDumpEvent(dataId, group, namespaceId, null, lastModified, event.getHandleIp(), ConfigTraceService.DUMP_EVENT_OK, System.currentTimeMillis() - lastModified, content.length()); } } return result; } // ...... } @Override public void onEvent(ConfigDumpEvent event) { configDump(event); } @Override public Class<? extends Event> subscribeType() { return ConfigDumpEvent.class; } }在ConfigCacheService中,会对比配置信息,如果配置有变化,则发布事件LocalDataChangeEvent,触发RpcConfigChangeNotifier的configDataChanged方法来推送配置,configDataChanged方法代码如下。
/** * ConfigChangeNotifier. * * @author liuzunfei * @version $Id: ConfigChangeNotifier.java, v 0.1 2020年07月20日 3:00 PM liuzunfei Exp $ */ @Component(value = "rpcConfigChangeNotifier") public class RpcConfigChangeNotifier extends Subscriber<LocalDataChangeEvent> { // ...... @Autowired ConfigChangeListenContext configChangeListenContext; @Autowired private RpcPushService rpcPushService; @Autowired private ConnectionManager connectionManager; /** * adaptor to config module ,when server side config change ,invoke this method. * * @param groupKey groupKey */ public void configDataChanged(String groupKey, String dataId, String group, String tenant, boolean isBeta, List<String> betaIps, String tag) { // 获取配置的所有监听者 Set<String> listeners = configChangeListenContext.getListeners(groupKey); if (CollectionUtils.isEmpty(listeners)) { return; } int notifyClientCount = 0; // 遍历所有监听者 for (final String client : listeners) { // 获取长连接信息 Connection connection = connectionManager.getConnection(client); if (connection == null) { continue; } String clientIp = connection.getMetaInfo().getClientIp(); String clientTag = connection.getMetaInfo().getTag(); // 判断是否是Beta的Ip if (isBeta && betaIps != null && !betaIps.contains(clientIp)) { continue; } // tag check if (StringUtils.isNotBlank(tag) && !tag.equals(clientTag)) { continue; } // 配置变更推送请求 ConfigChangeNotifyRequest notifyRequest = ConfigChangeNotifyRequest.build(dataId, group, tenant); // 执行推送任务 RpcPushTask rpcPushRetryTask = new RpcPushTask(notifyRequest, 50, client, clientIp, connection.getMetaInfo().getAppName()); push(rpcPushRetryTask); notifyClientCount++; } Loggers.REMOTE_PUSH.info("push [{ }] clients ,groupKey=[{ }]", notifyClientCount, groupKey); } @Override public void onEvent(LocalDataChangeEvent event) { String groupKey = event.groupKey; boolean isBeta = event.isBeta; List<String> betaIps = event.betaIps; String[] strings = GroupKey.parseKey(groupKey); String dataId = strings[0]; String group = strings[1]; String tenant = strings.length > 2 ? strings[2] : ""; String tag = event.tag; configDataChanged(groupKey, dataId, group, tenant, isBeta, betaIps, tag); } // ...... }到这里,基本上就是配置变更推送的最后一个步骤了,如代码中注释所示,通过调用ConnectionManager的getConnection方法,遍历所有监听者的连接,根据其中的Meta信息判断是否是Beta推送的目标,然后执行推送任务,也就是执行push方法,如下所示。
private void push(RpcPushTask retryTask) { ConfigChangeNotifyRequest notifyRequest = retryTask.notifyRequest; // 判断是否重试次数达到限制 if (retryTask.isOverTimes()) { Loggers.REMOTE_PUSH .warn("push callback retry fail over times .dataId={ },group={ },tenant={ },clientId={ },will unregister client.", notifyRequest.getDataId(), notifyRequest.getGroup(), notifyRequest.getTenant(), retryTask.connectionId); // 主动注销连接 connectionManager.unregister(retryTask.connectionId); } else if (connectionManager.getConnection(retryTask.connectionId) != null) { // first time :delay 0s; sencond time:delay 2s ;third time :delay 4s // 尝试执行配置推送 ConfigExecutor.getClientConfigNotifierServiceExecutor() .schedule(retryTask, retryTask.tryTimes * 2, TimeUnit.SECONDS); } else { // client is already offline,ingnore task. } }这里实际上也是一个异步执行的过程,推送任务RpcPushTask会被提交到ClientConfigNotifierServiceExecutor来计划执行,第一次会立即推送配置,即调用RpcPushTask的run方法,如果失败则延迟重试次数x2的秒数再次执行,直到超过重试次数,主动注销当前连接。其中,RpcPushTask的定义如下。
class RpcPushTask implements Runnable { ConfigChangeNotifyRequest notifyRequest; int maxRetryTimes = -1; int tryTimes = 0; String connectionId; String clientIp; String appName; public RpcPushTask(ConfigChangeNotifyRequest notifyRequest, int maxRetryTimes, String connectionId, String clientIp, String appName) { this.notifyRequest = notifyRequest; this.maxRetryTimes = maxRetryTimes; this.connectionId = connectionId; this.clientIp = clientIp; this.appName = appName; } public boolean isOverTimes() { return maxRetryTimes > 0 && this.tryTimes >= maxRetryTimes; } @Override public void run() { tryTimes++; if (!tpsMonitorManager.applyTpsForClientIp(POINT_CONFIG_PUSH, connectionId, clientIp)) { push(this); } else { // 推送配置 rpcPushService.pushWithCallback(connectionId, notifyRequest, new AbstractPushCallBack(3000L) { @Override public void onSuccess() { tpsMonitorManager.applyTpsForClientIp(POINT_CONFIG_PUSH_SUCCESS, connectionId, clientIp); } @Override public void onFail(Throwable e) { tpsMonitorManager.applyTpsForClientIp(POINT_CONFIG_PUSH_FAIL, connectionId, clientIp); Loggers.REMOTE_PUSH.warn("Push fail", e); push(RpcPushTask.this); } }, ConfigExecutor.getClientConfigNotifierServiceExecutor()); } } }可以看到,在RpcPushTask的run方法中,调用了RpcPushService的pushWithCallback方法,如下所示。
/** * push response to clients. * * @author liuzunfei * @version $Id: PushService.java, v 0.1 2020年07月20日 1:12 PM liuzunfei Exp $ */ @Service public class RpcPushService { @Autowired private ConnectionManager connectionManager; /** * push response with no ack. * * @param connectionId connectionId. * @param request request. * @param requestCallBack requestCallBack. */ public void pushWithCallback(String connectionId, ServerRequest request, PushCallBack requestCallBack, Executor executor) { Connection connection = connectionManager.getConnection(connectionId); if (connection != null) { try { // 执行配置推送 connection.asyncRequest(request, new AbstractRequestCallBack(requestCallBack.getTimeout()) { @Override public Executor getExecutor() { return executor; } @Override public void onResponse(Response response) { if (response.isSuccess()) { requestCallBack.onSuccess(); } else { requestCallBack.onFail(new NacosException(response.getErrorCode(), response.getMessage())); } } @Override public void onException(Throwable e) { requestCallBack.onFail(e); } }); } catch (ConnectionAlreadyClosedException e) { connectionManager.unregister(connectionId); requestCallBack.onSuccess(); } catch (Exception e) { Loggers.REMOTE_DIGEST .error("error to send push response to connectionId ={ },push response={ }", connectionId, request, e); requestCallBack.onFail(e); } } else { requestCallBack.onSuccess(); } } }其持有ConnectionManager对象,当需要推送配置到客户端时,会获取相应的Connection,然后执行asyncRequest将配置推送到客户端中。如果连接已经关闭,则注销连接。在asyncRequest底层即是调用Grpc建立的Stream的onNext方法,将配置推送给客户端,如下。
/** * grpc connection. * * @author liuzunfei * @version $Id: GrpcConnection.java, v 0.1 2020年07月13日 7:26 PM liuzunfei Exp $ */ public class GrpcConnection extends Connection { private StreamObserver streamObserver; private Channel channel; public GrpcConnection(ConnectionMeta metaInfo, StreamObserver streamObserver, Channel channel) { super(metaInfo); this.streamObserver = streamObserver; this.channel = channel; } @Override public void asyncRequest(Request request, RequestCallBack requestCallBack) throws NacosException { sendRequestInner(request, requestCallBack); } private DefaultRequestFuture sendRequestInner(Request request, RequestCallBack callBack) throws NacosException { final String requestId = String.valueOf(PushAckIdGenerator.getNextId()); request.setRequestId(requestId); DefaultRequestFuture defaultPushFuture = new DefaultRequestFuture(getMetaInfo().getConnectionId(), requestId, callBack, () -> RpcAckCallbackSynchronizer.clearFuture(getMetaInfo().getConnectionId(), requestId)); RpcAckCallbackSynchronizer.syncCallback(getMetaInfo().getConnectionId(), requestId, defaultPushFuture); sendRequestNoAck(request); return defaultPushFuture; } private void sendRequestNoAck(Request request) throws NacosException { try { //StreamObserver#onNext() is not thread-safe,synchronized is required to avoid direct memory leak. synchronized (streamObserver) { Payload payload = GrpcUtils.convert(request); traceIfNecessary(payload); streamObserver.onNext(payload); } } catch (Exception e) { if (e instanceof StatusRuntimeException) { throw new ConnectionAlreadyClosedException(e); } throw e; } } }主要推送逻辑的代码如上所示,调用asyncRequest之后,会将请求交给sendRequestInner处理,sendRequestInner又会调用sendRequestNoAck将推送请求推入gRPC的流中,客户端收到配置更新的请求,就会更新客户端的配置了。至此,一个灰度配置就发布成功了。
删除/查询BetaConfig
删除和查询BetaConfig的方法都很简单,都是简单的操作数据库即可。如果是删除配置,则会触发ConfigDataChangeEvent来告知客户端更新配置,这里笔者就不多加赘述了。
/** * Execute to remove beta operation. * * @param dataId dataId string value. * @param group group string value. * @param tenant tenant string value. * @return Execute to operate result. */ @DeleteMapping(params = "beta=true") @Secured(action = ActionTypes.WRITE, parser = ConfigResourceParser.class) public RestResult<Boolean> stopBeta(@RequestParam(value = "dataId") String dataId, @RequestParam(value = "group") String group, @RequestParam(value = "tenant", required = false, defaultValue = StringUtils.EMPTY) String tenant) { try { persistService.removeConfigInfo4Beta(dataId, group, tenant); } catch (Throwable e) { LOGGER.error("remove beta data error", e); return RestResultUtils.failed(500, false, "remove beta data error"); } ConfigChangePublisher .notifyConfigChange(new ConfigDataChangeEvent(true, dataId, group, tenant, System.currentTimeMillis())); return RestResultUtils.success("stop beta ok", true); } /** * Execute to query beta operation. * * @param dataId dataId string value. * @param group group string value. * @param tenant tenant string value. * @return RestResult for ConfigInfo4Beta. */ @GetMapping(params = "beta=true") @Secured(action = ActionTypes.READ, parser = ConfigResourceParser.class) public RestResult<ConfigInfo4Beta> queryBeta(@RequestParam(value = "dataId") String dataId, @RequestParam(value = "group") String group, @RequestParam(value = "tenant", required = false, defaultValue = StringUtils.EMPTY) String tenant) { try { ConfigInfo4Beta ci = persistService.findConfigInfo4Beta(dataId, group, tenant); return RestResultUtils.success("stop beta ok", ci); } catch (Throwable e) { LOGGER.error("remove beta data error", e); return RestResultUtils.failed("remove beta data error"); } }总结
Nacos2.0使用长连接代替了短连接的长轮询,性能几乎提升了10倍。在阿里内部,也在逐渐推进Nacos2作为统一的配置中心。目前在微服务引擎(Micro Service Engine,简称 MSE),Nacos作为注册配置中心,提供了纯托管的服务,只需要购买Nacos专业版即可享受到10倍的性能提升。
此外,MSE微服务引擎顾名思义,是一个面向业界主流开源微服务生态的一站式微服务平台, 帮助微服务用户更稳定、更便捷、更低成本的使用开源微服务技术构建微服务体系。不但提供注册中心、配置中心全托管(兼容 Nacos/ZooKeeper/Eureka),而且提供网关(兼容 Ingress/Enovy)和无侵入的开源增强服务治理能力。
在阿里,MSE微服务引擎已经被大规模的接入使用,经历阿里内部生产考验以及反复淬炼,其中微服务服务治理能力支撑了大量的微服务系统,对包括Spring Cloud、Dubbo等微服务框架的治理功能增强,提供了无损上下线、金丝雀发布、离群摘除以及无损滚动升级的功能。
如果有快速搭建高性能微服务以及大规模服务治理的需求,相比于从零搭建和运维,MSE微服务引擎是一个不错的选择。
很赞哦!(8)