聊聊eureka的delta配置

聊聊eureka的delta配置

本文主要研究一下eureka的delta配置

client端配置

    {
      "sourceType": "org.springframework.cloud.netflix.eureka.EurekaClientConfigBean",
      "defaultValue": false,
      "name": "eureka.client.disable-delta",
      "description": "Indicates whether the eureka client should disable fetching of delta and should\n rather resort to getting the full registry information.\n\n Note that the delta fetches can reduce the traffic tremendously, because the rate\n of change with the eureka server is normally much lower than the rate of fetches.\n\n The changes are effective at runtime at the next registry fetch cycle as specified\n by registryFetchIntervalSeconds",
      "type": "java.lang.Boolean"
    },
    {
      "sourceType": "org.springframework.cloud.netflix.eureka.EurekaClientConfigBean",
      "defaultValue": false,
      "name": "eureka.client.log-delta-diff",
      "description": "Indicates whether to log differences between the eureka server and the eureka\n client in terms of registry information.\n\n Eureka client tries to retrieve only delta changes from eureka server to minimize\n network traffic. After receiving the deltas, eureka client reconciles the\n information from the server to verify it has not missed out some information.\n Reconciliation failures could happen when the client has had network issues\n communicating to server.If the reconciliation fails, eureka client gets the full\n registry information.\n\n While getting the full registry information, the eureka client can log the\n differences between the client and the server and this setting controls that.\n\n The changes are effective at runtime at the next registry fetch cycle as specified\n by registryFetchIntervalSecondsr",
      "type": "java.lang.Boolean"
    }
复制代码

DiscoveryClient.fetchRegistry

eureka-client-1.8.8-sources.jar!/com/netflix/discovery/DiscoveryClient.java

    /**
     * Fetches the registry information.
     *
     * <p>
     * This method tries to get only deltas after the first fetch unless there
     * is an issue in reconciling eureka server and client registry information.
     * </p>
     *
     * @param forceFullRegistryFetch Forces a full registry fetch.
     *
     * @return true if the registry was fetched
     */
    private boolean fetchRegistry(boolean forceFullRegistryFetch) {
        Stopwatch tracer = FETCH_REGISTRY_TIMER.start();

        try {
            // If the delta is disabled or if it is the first time, get all
            // applications
            Applications applications = getApplications();

            if (clientConfig.shouldDisableDelta()
                    || (!Strings.isNullOrEmpty(clientConfig.getRegistryRefreshSingleVipAddress()))
                    || forceFullRegistryFetch
                    || (applications == null)
                    || (applications.getRegisteredApplications().size() == 0)
                    || (applications.getVersion() == -1)) //Client application does not have latest library supporting delta
            {
                logger.info("Disable delta property : {}", clientConfig.shouldDisableDelta());
                logger.info("Single vip registry refresh property : {}", clientConfig.getRegistryRefreshSingleVipAddress());
                logger.info("Force full registry fetch : {}", forceFullRegistryFetch);
                logger.info("Application is null : {}", (applications == null));
                logger.info("Registered Applications size is zero : {}",
                        (applications.getRegisteredApplications().size() == 0));
                logger.info("Application version is -1: {}", (applications.getVersion() == -1));
                getAndStoreFullRegistry();
            } else {
                getAndUpdateDelta(applications);
            }
            applications.setAppsHashCode(applications.getReconcileHashCode());
            logTotalInstances();
        } catch (Throwable e) {
            logger.error(PREFIX + "{} - was unable to refresh its cache! status = {}", appPathIdentifier, e.getMessage(), e);
            return false;
        } finally {
            if (tracer != null) {
                tracer.stop();
            }
        }

        // Notify about cache refresh before updating the instance remote status
        onCacheRefreshed();

        // Update remote status based on refreshed data held in the cache
        updateInstanceRemoteStatus();

        // registry was fetched successfully, so return true
        return true;
    }
复制代码

如果禁用了delta,则client端则调用getAndStoreFullRegistry()全量接口;开启的话,则调用getAndUpdateDelta(applications),对本地region的applications进行delta更新。

getAndUpdateDelta

    /**
     * Get the delta registry information from the eureka server and update it locally.
     * When applying the delta, the following flow is observed:
     *
     * if (update generation have not advanced (due to another thread))
     *   atomically try to: update application with the delta and get reconcileHashCode
     *   abort entire processing otherwise
     *   do reconciliation if reconcileHashCode clash
     * fi
     *
     * @return the client response
     * @throws Throwable on error
     */
    private void getAndUpdateDelta(Applications applications) throws Throwable {
        long currentUpdateGeneration = fetchRegistryGeneration.get();

        Applications delta = null;
        EurekaHttpResponse<Applications> httpResponse = eurekaTransport.queryClient.getDelta(remoteRegionsRef.get());
        if (httpResponse.getStatusCode() == Status.OK.getStatusCode()) {
            delta = httpResponse.getEntity();
        }

        if (delta == null) {
            logger.warn("The server does not allow the delta revision to be applied because it is not safe. "
                    + "Hence got the full registry.");
            getAndStoreFullRegistry();
        } else if (fetchRegistryGeneration.compareAndSet(currentUpdateGeneration, currentUpdateGeneration + 1)) {
            logger.debug("Got delta update with apps hashcode {}", delta.getAppsHashCode());
            String reconcileHashCode = "";
            if (fetchRegistryUpdateLock.tryLock()) {
                try {
                    updateDelta(delta);
                    reconcileHashCode = getReconcileHashCode(applications);
                } finally {
                    fetchRegistryUpdateLock.unlock();
                }
            } else {
                logger.warn("Cannot acquire update lock, aborting getAndUpdateDelta");
            }
            // There is a diff in number of instances for some reason
            if (!reconcileHashCode.equals(delta.getAppsHashCode()) || clientConfig.shouldLogDeltaDiff()) {
                reconcileAndLogDifference(delta, reconcileHashCode);  // this makes a remoteCall
            }
        } else {
            logger.warn("Not updating application delta as another thread is updating it already");
            logger.debug("Ignoring delta update with apps hashcode {}, as another thread is updating it already", delta.getAppsHashCode());
        }
    }
复制代码

这里可以看到,如果开启了logDeltaDiff的话,则会调用reconcileAndLogDifference方法 另外这里底层请求的是queryClient.getDelta(remoteRegionsRef.get())方法

DiscoveryClient.updateDelta

eureka-client-1.8.8-sources.jar!/com/netflix/discovery/DiscoveryClient.java

    /**
     * Updates the delta information fetches from the eureka server into the
     * local cache.
     *
     * @param delta
     *            the delta information received from eureka server in the last
     *            poll cycle.
     */
    private void updateDelta(Applications delta) {
        int deltaCount = 0;
        for (Application app : delta.getRegisteredApplications()) {
            for (InstanceInfo instance : app.getInstances()) {
                Applications applications = getApplications();
                String instanceRegion = instanceRegionChecker.getInstanceRegion(instance);
                if (!instanceRegionChecker.isLocalRegion(instanceRegion)) {
                    Applications remoteApps = remoteRegionVsApps.get(instanceRegion);
                    if (null == remoteApps) {
                        remoteApps = new Applications();
                        remoteRegionVsApps.put(instanceRegion, remoteApps);
                    }
                    applications = remoteApps;
                }

                ++deltaCount;
                if (ActionType.ADDED.equals(instance.getActionType())) {
                    Application existingApp = applications.getRegisteredApplications(instance.getAppName());
                    if (existingApp == null) {
                        applications.addApplication(app);
                    }
                    logger.debug("Added instance {} to the existing apps in region {}", instance.getId(), instanceRegion);
                    applications.getRegisteredApplications(instance.getAppName()).addInstance(instance);
                } else if (ActionType.MODIFIED.equals(instance.getActionType())) {
                    Application existingApp = applications.getRegisteredApplications(instance.getAppName());
                    if (existingApp == null) {
                        applications.addApplication(app);
                    }
                    logger.debug("Modified instance {} to the existing apps ", instance.getId());

                    applications.getRegisteredApplications(instance.getAppName()).addInstance(instance);

                } else if (ActionType.DELETED.equals(instance.getActionType())) {
                    Application existingApp = applications.getRegisteredApplications(instance.getAppName());
                    if (existingApp == null) {
                        applications.addApplication(app);
                    }
                    logger.debug("Deleted instance {} to the existing apps ", instance.getId());
                    applications.getRegisteredApplications(instance.getAppName()).removeInstance(instance);
                }
            }
        }
        logger.debug("The total number of instances fetched by the delta processor : {}", deltaCount);

        getApplications().setVersion(delta.getVersion());
        getApplications().shuffleInstances(clientConfig.shouldFilterOnlyUpInstances());

        for (Applications applications : remoteRegionVsApps.values()) {
            applications.setVersion(delta.getVersion());
            applications.shuffleInstances(clientConfig.shouldFilterOnlyUpInstances());
        }
    }
复制代码

可以看到这里根据获取的delta数据的ActionType来修改本地的数据

queryClient.getDelta

eureka-client-1.8.8-sources.jar!/com/netflix/discovery/shared/transport/jersey/AbstractJerseyEurekaHttpClient.java

    @Override
    public EurekaHttpResponse<Applications> getDelta(String... regions) {
        return getApplicationsInternal("apps/delta", regions);
    }
复制代码

可以看到请求的是/apps/delta接口

server端配置

    {
      "sourceType": "org.springframework.cloud.netflix.eureka.server.EurekaServerConfigBean",
      "defaultValue": false,
      "name": "eureka.server.disable-delta",
      "type": "java.lang.Boolean"
    },
    {
      "sourceType": "org.springframework.cloud.netflix.eureka.server.EurekaServerConfigBean",
      "defaultValue": 0,
      "name": "eureka.server.delta-retention-timer-interval-in-ms",
      "type": "java.lang.Long"
    },
    {
      "sourceType": "org.springframework.cloud.netflix.eureka.server.EurekaServerConfigBean",
      "defaultValue": 0,
      "name": "eureka.server.retention-time-in-m-s-in-delta-queue",
      "type": "java.lang.Long"
    }
复制代码

ApplicationsResource.getContainerDifferential

eureka-core-1.8.8-sources.jar!/com/netflix/eureka/resources/ApplicationsResource.java

    /**
     * Get information about all delta changes in {@link com.netflix.discovery.shared.Applications}.
     *
     * <p>
     * The delta changes represent the registry information change for a period
     * as configured by
     * {@link EurekaServerConfig#getRetentionTimeInMSInDeltaQueue()}. The
     * changes that can happen in a registry include
     * <em>Registrations,Cancels,Status Changes and Expirations</em>. Normally
     * the changes to the registry are infrequent and hence getting just the
     * delta will be much more efficient than getting the complete registry.
     * </p>
     *
     * <p>
     * Since the delta information is cached over a period of time, the requests
     * may return the same data multiple times within the window configured by
     * {@link EurekaServerConfig#getRetentionTimeInMSInDeltaQueue()}.The clients
     * are expected to handle this duplicate information.
     * <p>
     *
     * @param version the version of the request.
     * @param acceptHeader the accept header to indicate whether to serve  JSON or XML data.
     * @param acceptEncoding the accept header to indicate whether to serve compressed or uncompressed data.
     * @param eurekaAccept an eureka accept extension, see {@link com.netflix.appinfo.EurekaAccept}
     * @param uriInfo  the {@link java.net.URI} information of the request made.
     * @return response containing the delta information of the
     *         {@link AbstractInstanceRegistry}.
     */
    @Path("delta")
    @GET
    public Response getContainerDifferential(
            @PathParam("version") String version,
            @HeaderParam(HEADER_ACCEPT) String acceptHeader,
            @HeaderParam(HEADER_ACCEPT_ENCODING) String acceptEncoding,
            @HeaderParam(EurekaAccept.HTTP_X_EUREKA_ACCEPT) String eurekaAccept,
            @Context UriInfo uriInfo, @Nullable @QueryParam("regions") String regionsStr) {

        boolean isRemoteRegionRequested = null != regionsStr && !regionsStr.isEmpty();

        // If the delta flag is disabled in discovery or if the lease expiration
        // has been disabled, redirect clients to get all instances
        if ((serverConfig.shouldDisableDelta()) || (!registry.shouldAllowAccess(isRemoteRegionRequested))) {
            return Response.status(Status.FORBIDDEN).build();
        }

        String[] regions = null;
        if (!isRemoteRegionRequested) {
            EurekaMonitors.GET_ALL_DELTA.increment();
        } else {
            regions = regionsStr.toLowerCase().split(",");
            Arrays.sort(regions); // So we don't have different caches for same regions queried in different order. EurekaMonitors.GET_ALL_DELTA_WITH_REMOTE_REGIONS.increment(); } CurrentRequestVersion.set(Version.toEnum(version)); KeyType keyType = Key.KeyType.JSON; String returnMediaType = MediaType.APPLICATION_JSON; if (acceptHeader == null || !acceptHeader.contains(HEADER_JSON_VALUE)) { keyType = Key.KeyType.XML; returnMediaType = MediaType.APPLICATION_XML; } Key cacheKey = new Key(Key.EntityType.Application, ResponseCacheImpl.ALL_APPS_DELTA, keyType, CurrentRequestVersion.get(), EurekaAccept.fromString(eurekaAccept), regions ); if (acceptEncoding != null && acceptEncoding.contains(HEADER_GZIP_VALUE)) { return Response.ok(responseCache.getGZIP(cacheKey)) .header(HEADER_CONTENT_ENCODING, HEADER_GZIP_VALUE) .header(HEADER_CONTENT_TYPE, returnMediaType) .build(); } else { return Response.ok(responseCache.get(cacheKey)) .build(); } } 复制代码

server端提供/apps/delta接口

ResponseCacheImpl.generatePayload

eureka-core-1.8.8-sources.jar!/com/netflix/eureka/registry/ResponseCacheImpl.java

    /*
     * Generate pay load for the given key.
     */
    private Value generatePayload(Key key) {
        Stopwatch tracer = null;
        try {
            String payload;
            switch (key.getEntityType()) {
                case Application:
                    boolean isRemoteRegionRequested = key.hasRegions();

                    if (ALL_APPS.equals(key.getName())) {
                        if (isRemoteRegionRequested) {
                            tracer = serializeAllAppsWithRemoteRegionTimer.start();
                            payload = getPayLoad(key, registry.getApplicationsFromMultipleRegions(key.getRegions()));
                        } else {
                            tracer = serializeAllAppsTimer.start();
                            payload = getPayLoad(key, registry.getApplications());
                        }
                    } else if (ALL_APPS_DELTA.equals(key.getName())) {
                        if (isRemoteRegionRequested) {
                            tracer = serializeDeltaAppsWithRemoteRegionTimer.start();
                            versionDeltaWithRegions.incrementAndGet();
                            versionDeltaWithRegionsLegacy.incrementAndGet();
                            payload = getPayLoad(key,
                                    registry.getApplicationDeltasFromMultipleRegions(key.getRegions()));
                        } else {
                            tracer = serializeDeltaAppsTimer.start();
                            versionDelta.incrementAndGet();
                            versionDeltaLegacy.incrementAndGet();
                            payload = getPayLoad(key, registry.getApplicationDeltas());
                        }
                    } else {
                        tracer = serializeOneApptimer.start();
                        payload = getPayLoad(key, registry.getApplication(key.getName()));
                    }
                    break;
                case VIP:
                case SVIP:
                    tracer = serializeViptimer.start();
                    payload = getPayLoad(key, getApplicationsForVip(key, registry));
                    break;
                default:
                    logger.error("Unidentified entity type: {} found in the cache key.", key.getEntityType());
                    payload = "";
                    break;
            }
            return new Value(payload);
        } finally {
            if (tracer != null) {
                tracer.stop();
            }
        }
    }
复制代码

这里提供了缓存不存在时触发的方法,具体重点看ALL_APPS_DELTA的方法,重点调用了registry.getApplicationDeltas()及registry.getApplicationDeltasFromMultipleRegions方法

registry.getApplicationDeltas()

    /**
     * Get the registry information about the delta changes. The deltas are
     * cached for a window specified by
     * {@link EurekaServerConfig#getRetentionTimeInMSInDeltaQueue()}. Subsequent
     * requests for delta information may return the same information and client
     * must make sure this does not adversely affect them.
     *
     * @return all application deltas.
     * @deprecated use {@link #getApplicationDeltasFromMultipleRegions(String[])} instead. This method has a
     * flawed behavior of transparently falling back to a remote region if no instances for an app is available locally.
     * The new behavior is to explicitly specify if you need a remote region.
     */
    @Deprecated
    public Applications getApplicationDeltas() {
        GET_ALL_CACHE_MISS_DELTA.increment();
        Applications apps = new Applications();
        apps.setVersion(responseCache.getVersionDelta().get());
        Map<String, Application> applicationInstancesMap = new HashMap<String, Application>();
        try {
            write.lock();
            Iterator<RecentlyChangedItem> iter = this.recentlyChangedQueue.iterator();
            logger.debug("The number of elements in the delta queue is : {}",
                    this.recentlyChangedQueue.size());
            while (iter.hasNext()) {
                Lease<InstanceInfo> lease = iter.next().getLeaseInfo();
                InstanceInfo instanceInfo = lease.getHolder();
                logger.debug(
                        "The instance id {} is found with status {} and actiontype {}",
                        instanceInfo.getId(), instanceInfo.getStatus().name(), instanceInfo.getActionType().name());
                Application app = applicationInstancesMap.get(instanceInfo
                        .getAppName());
                if (app == null) {
                    app = new Application(instanceInfo.getAppName());
                    applicationInstancesMap.put(instanceInfo.getAppName(), app);
                    apps.addApplication(app);
                }
                app.addInstance(decorateInstanceInfo(lease));
            }

            boolean disableTransparentFallback = serverConfig.disableTransparentFallbackToOtherRegion();

            if (!disableTransparentFallback) {
                Applications allAppsInLocalRegion = getApplications(false);

                for (RemoteRegionRegistry remoteRegistry : this.regionNameVSRemoteRegistry.values()) {
                    Applications applications = remoteRegistry.getApplicationDeltas();
                    for (Application application : applications.getRegisteredApplications()) {
                        Application appInLocalRegistry =
                                allAppsInLocalRegion.getRegisteredApplications(application.getName());
                        if (appInLocalRegistry == null) {
                            apps.addApplication(application);
                        }
                    }
                }
            }

            Applications allApps = getApplications(!disableTransparentFallback);
            apps.setAppsHashCode(allApps.getReconcileHashCode());
            return apps;
        } finally {
            write.unlock();
        }
    }
复制代码

可以看到这里主要是从recentlyChangedQueue取数据

recentlyChangedQueue

    private ConcurrentLinkedQueue<RecentlyChangedItem> recentlyChangedQueue = new ConcurrentLinkedQueue<RecentlyChangedItem>();
复制代码

这里存放的是RecentlyChangedItem AbstractInstanceRegistry在register、cancel、statusUpdate、deleteStatus等操作里头都会往该队列添加RecentlyChangedItem

deltaRetentionTimer

eureka-core-1.8.8-sources.jar!/com/netflix/eureka/registry/AbstractInstanceRegistry.java

    protected AbstractInstanceRegistry(EurekaServerConfig serverConfig, EurekaClientConfig clientConfig, ServerCodecs serverCodecs) {
        this.serverConfig = serverConfig;
        this.clientConfig = clientConfig;
        this.serverCodecs = serverCodecs;
        this.recentCanceledQueue = new CircularQueue<Pair<Long, String>>(1000);
        this.recentRegisteredQueue = new CircularQueue<Pair<Long, String>>(1000);

        this.renewsLastMin = new MeasuredRate(1000 * 60 * 1);

        this.deltaRetentionTimer.schedule(getDeltaRetentionTask(),
                serverConfig.getDeltaRetentionTimerIntervalInMs(),
                serverConfig.getDeltaRetentionTimerIntervalInMs());
    }
复制代码

这里初始化了DeltaRetentionTask,调度间隔是serverConfig.getDeltaRetentionTimerIntervalInMs(),默认是30 * 1000

DeltaRetentionTask

eureka-core-1.8.8-sources.jar!/com/netflix/eureka/registry/ResponseCacheImpl.java

    private TimerTask getDeltaRetentionTask() {
        return new TimerTask() {

            @Override
            public void run() {
                Iterator<RecentlyChangedItem> it = recentlyChangedQueue.iterator();
                while (it.hasNext()) {
                    if (it.next().getLastUpdateTime() <
                            System.currentTimeMillis() - serverConfig.getRetentionTimeInMSInDeltaQueue()) {
                        it.remove();
                    } else {
                        break;
                    }
                }
            }

        };
    }
复制代码

该任务主要是距离上次更新时间超过serverConfig.getRetentionTimeInMSInDeltaQueue()的值给remove掉 serverConfig.getRetentionTimeInMSInDeltaQueue()的值默认为3 * MINUTES

小结

eureka提供了delta参数,在client端及server端都有。client端主要是控制刷新registry的时候,是否使用调用/apps/delta接口,然后根据返回数据的ActionType来作用于本地数据。而server端则提供/apps/delta接口,它的主要逻辑是在registry的修改操作都会放recentlyChangedQueue存放RecentlyChangedItem事件,然后有个定时任务去剔除距离上次更新时间超过指定阈值的item;而查询接口则是从recentlyChangedQueue获取数据然后返回。

client端主要是eureka.client.disable-delta、eureka.client.log-delta-diff两个参数;server端主要是eureka.server.disable-delta、eureka.server.delta-retention-timer-interval-in-ms、eureka.server.retention-time-in-m-s-in-delta-queue三个参数。

doc

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。

发布者:全栈程序员-用户IM,转载请注明出处:https://javaforall.cn/101644.html原文链接:https://javaforall.cn

【正版授权,激活自己账号】: Jetbrains全家桶Ide使用,1年售后保障,每天仅需1毛

【官方授权 正版激活】: 官方授权 正版激活 支持Jetbrains家族下所有IDE 使用个人JB账号...

(0)


相关推荐

  • #Photoshop#_pdf文档解析失败

    #Photoshop#_pdf文档解析失败来源:导入AdobePhotoshop(.psd)图像AdobePhotoshop档案格式规格:https://www.adobe.com/devnet-apps/photoshop/fileformatashtml/#50577409_89817vs2017案例

  • keras提供的网络_kubernetes网络

    keras提供的网络_kubernetes网络GoogleNet网络详解与keras实现GoogleNet网络详解与keras实现GoogleNet系列网络的概览Pascal_VOC数据集第一层目录第二层目录第三层目录InceptionV1模块介绍Inception的架构GoogleNet的图片Keras代码实现为了搭建Inception网络我们使用了以下策略整个代码的流程如下实验结果实验结果分析本博客相关引用本

  • spring aop面向切面原理,用处和实力讲解

    spring aop面向切面原理,用处和实力讲解spring aop面向切面原理,用处和实力讲解

  • 十一、代理模式 —专注,做最好的自己!#和设计模式一起旅行#[通俗易懂]

    专注,把更多的时间放到提示自己核心竞争能力上面来!其他的事情交给别人去做吧。故事背景我和设计模式MM开的奶茶店火了,一个是设计模式MM长的好看,波涛汹涌,还有一个是我们的饮品的确好喝,并且还特别在乎用户的体验,上一篇的模板方法模式中,我们最后的例子就询问顾客是否要加入调味料,而不是强制加入。俗话说,人怕出名,猪怕壮!出名后好多的媒体想让我参加他们的节目说一下我这个创业的…

  • MATLAB(2)–MATLAB矩阵的表示

    MATLAB(2)–MATLAB矩阵的表示MATLAB–MATLAB矩阵的表示矩阵的建立冒号表达式linspace结构矩阵单元矩阵最后矩阵的建立利用直接输入法建立矩阵:将矩阵的元素用中括号括起来,按矩阵的顺序输入各元素,同一行的各元素之间用逗号或者空格分隔,不同的元素之间用分号分隔。利用已建好的矩阵建立更大的矩阵:一个大矩阵可以由已经建立好的小矩阵拼接而成。可以用实部矩阵和虚部矩阵构成复数矩阵。冒号表达式冒号是一个重要的运算符,利用它可以产生行向量。冒号表达式的一般格式为:e1:e2:e3其中,e1为初始值,e2为步长,e3为终

  • Java设计模式之结构型:适配器模式

    Java设计模式之结构型:适配器模式

发表回复

您的电子邮箱地址不会被公开。

关注全栈程序员社区公众号