在结识 solo 的时候,就被它的同步到 sym 的思想所吸引,所以对这个同步的代码有必要研究一下,直入正题。
同步代码根源
通过查看代码不难找到一个 org.b3log.solo.event.rhythm.ArticleSender
,这是发送代码的根源,就是将一个封装好的文章 JSONObject
,封装成 post
请求的参数,然后对着 solo.properties
中的 rhythm.servePath
发送一个 HTTP 请求
通过反推找出调用逻辑
我的做法比较简单直接了,直接全项目搜索 org.b3log.solo.event.rhythm.ArticleSender
的关键字 ArticleSender
,会发现在 org.b3log.solo.SoloServletListener
中这样用了一下:
private void registerEventProcessor() {
Stopwatchs.start("Register Event Processors");
LOGGER.debug("Registering event processors....");
try {
final EventManager eventManager = beanManager.getReference(EventManager.class);
//省略代码
// Sync
eventManager.registerListener(new ArticleSender());
} catch (final Exception e) {
LOGGER.log(Level.ERROR, "Register event processors error", e);
throw new IllegalStateException(e);
}
LOGGER.debug("Registering event processors....");
Stopwatchs.end();
}
进这个 registerListener(new AarticleSender());
方法看看,在类 org.b3log.latke.event.EventManager
中
public void registerListener(final AbstractEventListener<?> eventListener) {
synchronizedEventQueue.addListener(eventListener);
}
进去看看
synchronized void addListener(
final AbstractEventListener<?> listener) {
if (null == listener) {
throw new NullPointerException();
}
final String eventType = listener.getEventType();
if (null == eventType) {
throw new NullPointerException();
}
List<AbstractEventListener<?>> listenerList = listeners.get(eventType);
if (null == listenerList) {
listenerList = new ArrayList<AbstractEventListener<?>>();
listeners.put(eventType, listenerList);
}
listenerList.add(listener);
}
看上面这段代码,简单点理解,就是将传进来的
AbstractEventListener
的getEventType()
作为 key,传进来的AbstractEventListener
放进一个List
中存起来。再反过来看看上文说的同步新增博客的源码是在org.b3log.solo.event.rhythm.ArticleSender
,看看它是什么继承结构:public final class ArticleSender extends AbstractEventListener<JSONObject>
,这就前后呼应了,它是AbstractEventListener
的子类,所以可以上面这样用。
观察者模式从通知入手
很明显,上面的这种用法是观察者模式,就跟微信公众号订阅一样,公众号发一个消息后可以通知到各个订阅者,直接在新增博客中的代码找,在 org.b3log.solo.processor.console.ArticleConsole.addArticle(HttpServletRequest, HttpServletResponse, HTTPRequestContext, JSONObject)
,仔细看看会进到 org.b3log.solo.service.ArticleMgmtService.addArticleInternal(JSONObject)
,这个方法中在新增完 solo 的博客后,会执行下面的代码:
if (article.optBoolean(Article.ARTICLE_IS_PUBLISHED)) {
// Fire add article event
final JSONObject eventData = new JSONObject();
eventData.put(Article.ARTICLE, article);
eventManager.fireEventSynchronously(new Event<>(EventTypes.ADD_ARTICLE, eventData));
}
调用的是 org.b3log.latke.event.EventManager.fireEventSynchronously(Event<?>)
,在它里面又调 org.b3log.latke.event.SynchronizedEventQueue.fireEvent(Event<?>)
如下:
synchronized void fireEvent(final Event<?> event) throws EventException {
final String eventType = event.getType();
List<Event<?>> events = synchronizedEvents.get(eventType);
if (null == events) {
events = new ArrayList<Event<?>>();
synchronizedEvents.put(eventType, events);
}
events.add(event);
setChanged();
notifyListeners(event);
}
最后一步调用的是 org.b3log.latke.event.AbstractEventQueue.notifyListeners(Event<?>)
,来看看
public void notifyListeners(final Event<?> event) throws EventException {
AbstractEventListener<?>[] arrLocal = null;
synchronized (this) {
if (!changed) {
return;
}
final String eventType = event.getType();
final List<AbstractEventListener<?>> listenerList = listeners.get(eventType);
final AbstractEventListener<?>[] types = new AbstractEventListener<?>[1];
if (null != listenerList && !listenerList.isEmpty()) {
arrLocal = listenerList.<AbstractEventListener<?>>toArray(types);
clearChanged();
}
}
if (null != arrLocal) {
for (int i = arrLocal.length - 1; i >= 0; i--) {
arrLocal[i].performAction(this, event);
}
}
}
这里看不懂的话,得结合上文中提到的:
org.b3log.latke.event.AbstractEventQueue.addListener(AbstractEventListener<?>)
来看,就是在容器启动的时候,这些被存起来,现在新增一篇文章观察者模式被通知了,通知的代码从上面也可以看出,它事件的KEY
是叫Add Article
,通过该KEY
就能取到对应的org.b3log.solo.event.rhythm.ArticleSender
,取到后想都不用想了,肯定是执行了啊
看 org.b3log.latke.event.AbstractEventListener.performAction(AbstractEventQueue, Event<?>)
方法就可以看到执行
final void performAction(final AbstractEventQueue eventQueue, final Event<?> event) throws EventException {
@SuppressWarnings("unchecked")
final Event<T> eventObject = (Event<T>) event;
try {
action(eventObject);
} catch (final Exception e) {
LOGGER.log(Level.WARN, "Event perform failed", e);
} finally { // remove event from event queue
if (eventQueue instanceof SynchronizedEventQueue) {
final SynchronizedEventQueue synchronizedEventQueue = (SynchronizedEventQueue) eventQueue;
synchronizedEventQueue.removeEvent(eventObject);
}
}
}
这个时候的 action 执行的不就是
org.b3log.solo.event.rhythm.ArticleSender
中的 action 吗,因为是它在调用嘛。
总结
这篇文章是心血来潮,根据 solo 中新增博客同步的代码解析同步过程,其他的同步过程类似,有兴趣可以看看,如若解析的有什么不妥的地方,望指出。
欢迎来到这里!
我们正在构建一个小众社区,大家在这里相互信任,以平等 • 自由 • 奔放的价值观进行分享交流。最终,希望大家能够找到与自己志同道合的伙伴,共同成长。
注册 关于