`
aine_pan
  • 浏览: 43842 次
  • 性别: Icon_minigender_1
  • 来自: 南京
社区版块
存档分类
最新评论

Mule 基于源码实现控制台效果

 
阅读更多
概述
最近研究了ESB,重点研究了Mule的实现。现在分享一下学习的结果,也备自己查看。
Mule的简介就不多说了,有社区版和企业版两个版本,我们自己研究就是用社区版了,重点关注的是Mule的控制台实现。
前期准备
本文介绍知识需要基于对Mule和ESB有基本的认知,关于Mule的基本实现,以及源码的解读不在本文范畴,如果有需要,另外开一篇博客介绍。
目标
在准备将Mule移植到我们的项目上时,发现了一个问题,就是控制台的实现,具体的是对每个服务的起停操作,流量统计/清空,log记录等功能实现。
其中log功能可以使用Mule自带的Log组件实现,给特殊的功能可以使用filter来实现,本文不再赘述。
流量统计不管是写表还是记在文件中,或者开一片内存区域都可以实现,本文不再赘述。
下文重点实现对具体的服务控制的实现。
实现步骤
一 服务是什么
Mule中对服务的定义最小单元是FlowConstruct,Mule中可以对FlowConstruct进行编排,形成一套完整的服务。所以如果我们需要对具体的服务进行控制,就需要定位到具体的哪个FlowConstruct。
在看FlowConstruct之前,我们先回顾一下Mule的几大对象:
1 FlowConstruct: 就是我们说的最小的服务定义单元
2 Model:在 FlowConstruct之前,定义最小的服务模块,现在已经很少用
3 Agent:代理,用来代理服务
4 Connector:各种协议的连接对象抽象,是我们服务定义的底层实现者
5 QueueManager:内置的通知管理器,备用JMS
6 Stoppable/Startable类:其他实现这两个接口的资源信息


FlowConstruct是一个接口,我们来看看他的接口定义:
public interface FlowConstruct extends NamedObject, LifecycleStateEnabled
{

    /**
     * @return The exception listener that will be used to handle exceptions that may be thrown at different
     *         points during the message flow defined by this construct.
     */
    MessagingExceptionHandler getExceptionListener();

    /**
     * @return The statistics holder used by this flow construct to keep track of its activity.
     */
    FlowConstructStatistics getStatistics();

    /**
     * @return This implementation of {@link MessageInfoMapping} used to control how Important message
     *         information is pulled from the current message.
     */
    MessageInfoMapping getMessageInfoMapping();

    /**
     * @return This muleContext that this flow construct belongs to and runs in the context of.
     */
    MuleContext getMuleContext();

}

既然是接口,我们操作的肯定是他的实现类,他有哪些实现类呢?(怎么传图片到这里?)
直接说了,我们常用的是实现类Flow:
public Flow extends AbstractPipeline implements MessageProcessor, StageNameSourceProvider, DynamicPipeline

public abstract class AbstractPipeline extends AbstractFlowConstruct implements Pipeline

public abstract class AbstractFlowConstruct implements FlowConstruct, Lifecycle, AnnotatedObject

public interface FlowConstruct extends NamedObject, LifecycleStateEnabled

不知道这样看官能否了解这个继承结构。

二 误区1 从服务启动入手
想要启动或者停止某个单独的服务,第一反应是想到,看看框架是怎么在启动的时候加载这些服务的,看明白启动的步骤和逻辑,应该就能知道怎么实现个别服务的启动和停止了。

服务启动逻辑/停止逻辑
MuleContextLifecycleManager muleContext.getRegistry().fireLifecycle(phaseName);
MuleRegistryHelper fireLifecycle(String phase);
DefaultRegistryBroker(AbstractRegistryBroker fireLifecycle(String phase));
RegistryBrokerLifecycleManager(RegistryLifecycleManager)
	fireLifecycle(String destinationPhase);
	invokePhase(String phase, Object object, LifecycleCallback callback);
内部类:RegistryLifecycleCallback onTransition()
	 Collection<?> targetsObj = getLifecycleObject().lookupObjectsForLifecycle(lo.getType());


这里的lookupObjectsForLifecycle(lo.getType())方法会把Mule注册的所有类型都遍历一边,根据上面类聚的几大类对象,根据类型找到以后再次遍历。
最后会在查找到的所有资源中循环调用 phase.applyLifecycle(o);方法。通过这一连串的代码跳转,相信看官也跟我一样云里雾里的找不清方向,我具体解释一下Stop的逻辑,start的逻辑跟他是一样的,得益于状态模式。
Stop逻辑
首先查找到的是FlowConstruct类型的对象:
Stop的时候,实际上调用的是AbstractFlowConstruct.stop();方法,具体的stop方法被其子类AbstractPipeline复写了。
1 首先stop message source:
  在DefaultInboundEndpoint. stop()中, getConnector().unregisterListener(this, flowConstruct);,即已经将对象中的connector上的监听解除了。具体方法在 AbstractConnector.unregisterListener()中。
2 Stop MessageProcessor:
  从上面的结构中可以看出来,processor有很多个,只要实现了Stoppable接口的,逐个调用Stop()方法。
3 Stop MessageProcessorPollingMessageReceiver,基本没有polling的。
4 回到pipeline 类中,调用stopIfStoppable(pipeline):
  实际上是stop InterceptingChainLifecycleWrapper。同样从上面的对象中可以看到,wrapper有很多个,只要实现stoppable接口的都会调用,重点关注CxfInboundMessageProcessor对象的stop,因为他代理了org.apache.cxf.endpoint.ServerImpl,实际上是调用server的stop方法。至此,CXF实际上已经停止了,对于其他类型的协议,到这里也应该就停止了。
5 回到pipeline 类中,调用doStop方法:可惜是个空方法,我操!
6 回到外围基类时,调用DefaultMessagingExceptionStrategy的stop。结束后完成所有stop操作。
其次查找到connector类型的对象:
根据前面的分析,在停止Flow的时候,实际上已经停了相应的connector,那这里的停止是做什么事情呢?
同样,先来看看connector对象的结构,从分析flow 中可以看出,这个对象异常复杂,因为他需要封装所有的连接协议,我们看看他的类继承关系图吧:(不好意思,不会贴图)
正式停止connector
停止从AbstractConnector.stop()中开始,在onTransition回调函数中:
1 shutdownScheduler():
调用ScheduledThreadPoolExecutor.stop to Disable new tasks from being submitted。等一段时间后,彻底关闭ScheduleExecutor shutdownNow。
2 doStop():
到子类HttpConnector中HttpConnectionManager. Dispose();清除socket数据,并且调用MuleWorkManager.dispose();
MuleWorkManager 中关闭ThreadPoolExecutor.shutdown(); to Disable new tasks from being submitted,然后等一段时间后彻底关闭Executor:workExecutorService.shutdownNow()。
3 disposeWorkManagers:
清除dispatcherWorkManager,requesterWorkManager,receiverWorkManager中的所有workManager。
4 清除transport中的Dispatchers和Requesters
DefaultConfigurableKeyedObjectPool和GenericKeyedObjectPool
接着查找所有实现stoppable接口的资源:
前面已经提到很多资源使用stoppable接口停止了,比如MessageProcessor,InterceptingChainLifecycleWrapper等,在最后,Mule会把所以资源统一梳理一遍,再做一次Dispose。
1 NotificationLifecycleObject
DefaultInboundEndpoint:在flow中已经stop过endpoint了吗?答曰,正解。在stop message source的时候已经通过endpoint找到connector stop了所有的endpoint 监听了。看看这里做的是什么操作。直接忽略了,我操!
DefaultProcessingTimeWatcher:清除守护线程?watcherThread:ProcessingTimeChecker
申明:忽略了两个选项,一个是MessageQ,还有一个是modle(已经基本不用了):
至此,stop的操作就结束了,这个stop不是将将框架停止,不是MuleServer的stop,只是停止了所有的服务,没有调用框架的Dispose方法,所以属于轻量的停止。
重启所有的服务,基本上是按照stop的流程,将所有的资源重新启动一遍,这里就可以看出来状态模式的好处,基本 不用太多的特殊处理。
问题:可否实现单个服务的起停?
单个服务是什么?FlowConstruct。与之对应的是connector,如果能做到将FlowConstruct和对应的connector起停了,是否可以实现对单个服务的起停?从Mule的框架来看是不具备这样的设计的,他的所有通知都是基于所有的类型。
结论:看了这么多代码,其实发现还是不能实现从FlowConstruct找到对应的所有资源,比如connector,还有workManager等等。

三 误区2 从registry入手

先解释一下registry的定义:registry是注册管理器,Mule用来注册所有的资源,Mule在context中通过registry broker来维护spring registry,transient registry,guice registry。
spring registry的功能是维护通过spring注入的所有实例;
transient registry的功能是维护默认的处理函数链表_muleContextProcessor,_muleExpressionEvaluatorProcessor,_muleExpressionEnricherProcessor,_muleLifecycleStateInjectorProcessor,_muleLifecycleManager
还有mule配置注入的connector,endpoint等对象,对象在创建时会主动注册其监听的事件。
guice registry维护通过guice框架注入的对象。
mule访问registry中具体对象的流程为:
muleContext->muleRegistry(muleRegistryHelper)-->DefaultRegistryBroker-->registry array--> registry --> object。
目前在spring启动的方式下,就包括了spring 和transient 两个registry。

在框架refresh之后,调用firelifecycle,进入RegistryLifecycleManager,对启动阶段的模块调用start进行初始化操作。
从这里分析,我们是否可以从registry入手来移除注册的服务呢?这个看似可行的逻辑却犯了原则行的错误。
很想把registryBroker和几个manager的类图贴上来,很清晰的表达了几个对象之间的关系,很抱歉,大家自己去看类图,这里大家看一下框架是怎么实现注册的。
public void registerObject(String key, Object value, Object metadata) throws RegistrationException
    {
        Iterator it = getRegistries().iterator();
        Registry reg;
        while (it.hasNext())
        {
            reg = (Registry) it.next();//第一次取到的是TransientRegistry
            if (!reg.isReadOnly())
            {
                reg.registerObject(key, value, metadata);
                break;
            }
        }
    }

最终的结果就final Object previousObject = registry.put(key, object);

对于注册机制的几点疑问:
1 注册transformer时默认将所有transformer都注册了,一共31个。是否可以简化之?
2 注册registerObjects时默认注册了所有的Parser,约20个,是否可以简化之?
根据上面解释的,spring启动的具体的对象都是注册在spring registry中,问题就处在:
TransientRegistry底层就是维护一个map,可以通过查找的方式unregistry。
SpringRegistry底层是通过application查找维护的bean对象的。
但是只读的,不可以通过lookup的方式查找之后去unregistry,他妈的是只读的!

总结:好了,到这里可以看出来这条路是走不下去的,根本问题是registry注册的是框架上的注册,而我们要起停的服务,不是从框架上将其移除,只是为了暂停运行而已,说白了,可能就是把connector关了,相关资源释放了,然后可以重启就OK了!

四 stopMessageProcessing的启发
Mule ESB的消息在流服务中传递时如果发生异常,Mule ESB服务器会将异常信息输出到本地日志文件中,如果希望捕获该异常进行处理,可以在Flow中编排异常策。
在异常策略节点内可编排一个出口端点,Mule ESB将把异常错误信息以数据传给该出口端点(数据类型为org.mule.message.ExceptionMessage)。缺省的异常处理策略节点具有以下两个属性:
1 enableNotifications 默认为true,是否触发异常处理策略
2 stopMessageProcessing 默认为flase,是否停止flow服务,如果为true则流程服务将被停止,服务器重启前不能再相应请求。
我操啊,这个正是我想要的结果,这个尼玛的踏破铁鞋无觅处啊!来看看源码中的实现:
 protected void stopFlow(FlowConstruct flow)
    {
        if (flow instanceof Stoppable)
        {
            logger.info("Stopping flow '" + flow.getName() + "' due to exception");

            try
            {
                ((Lifecycle) flow).stop();
            }
            catch (MuleException e)
            {
                logger.error("Unable to stop flow '" + flow.getName() + "'", e);
            }
        }
        else
        {
            logger.warn("Flow is not stoppable");
        }
    }

这样就简单了,不用细说了,自己把代码挖过来,直接实现了这个效果。
Mule中有很多这样风格的代码:
 if (flow [color=red]instanceof Stoppable[/color])
        {
            try
            {
                ([color=red](Lifecycle)[/color] flow).stop();
            }
            catch (MuleException e)
            {
               ...
            }
        }

值得学习一下。
五 总结
至此把mule的源码翻了一遍,比较奇怪的是,为什么Mule不提供一个这样的控制台效果的功能,另外对于Mule的源码还需要继续挖,还有不少有疑问的地方,今天就不写了,已经很长了。

学会插图片了,以后再改吧!
  • 大小: 47.3 KB
分享到:
评论
3 楼 canghaizhiyue 2016-01-19  
大牛呀,能不能加下QQ交流下 25464978,小弟正在学习这块。多谢了!
2 楼 逐客叫我 2015-01-14  
太牛了,我只能膜拜了。
1 楼 helei050 2014-11-12  
兄弟写的很好,希望能加q交流一下 1056466499

相关推荐

Global site tag (gtag.js) - Google Analytics