阿里 memcached 客户端 socket 连接池源码分析

本贴最后更新于 3012 天前,其中的信息可能已经渤澥桑田

上篇文章中,我们对阿里memcached客户端主要源码进行了分析,其中socket连接池部分是系统关键部分,涉及到整个客户端运行是否稳定,与服务端连接是否高效,本篇就来分析socket连接池实现的这部分源码。

连接池源码位于com.alisoft.xplatform.asf.cache.memcached.client下的SockIOPool类,该类还包括2个内部类,分别是MaintThread和SockIO。SockIOPool主要功能是用于维护与memcached服务端的持久化连接;并提供初始化连接池,获取、释放连接,以及设置服务器权重,连接池维护等功能。

初始化连接池

在系统启动时,会首先读取缓存服务设置的相关配置文件,读取成功后即开始实例化一个SockIOPool实例,再将pool的相关设置参数赋给该实例,如

//所有memcahed服务端ip地址,以及对应的权重
private String[] servers;
private Integer[] weights;

然后调用初始化逻辑,即initialize方法;该方法中,用一个支持并发的concurrenthashmap初始化一个容器socketpool,容器大小由配置的memcached服务端和初始化连接数决定。

socketPool = new ConcurrentHashMap<String, ConcurrentMap<SockIO, Integer>>(servers.length*initConn);

该容器中存放的是每个server地址,以及对应socket的一个包装内部类SockIO(在该内部类中完成所有的socket相关操作),同时该socket的状态也被记录下来,下面看看初始化代码:

public void initialize()
{
      // check to see if already initialized
      if (initialized && (buckets != null || consistentBuckets != null)&& (socketPool != null))
	{
		log.error("++++ trying to initialize an already initialized pool");
		return;
	}
        //加锁,防止多线程并发问题
	initDeadLock.lock();
	try
	{
	   // check to see if already initialized
	   if (initialized && (buckets != null || consistentBuckets != null)&& (socketPool != null))
	   {
		log.error("++++ trying to initialize an already initialized pool");
		return;
	   }
   // pools 
   socketPool = new ConcurrentHashMap&lt;String, ConcurrentMap&lt;SockIO, Integer&gt;&gt;(
				servers.length * initConn);
		
fastPool = new HashMap&lt;String,SockIO&gt;();
		
hostDeadDur = new ConcurrentHashMap&lt;String, Long&gt;();
hostDead = new ConcurrentHashMap&lt;String, Date&gt;();
maxCreate = (poolMultiplier &gt; minConn) ? minConn : minConn
		/ poolMultiplier; // only create up to maxCreate
			// connections at once
		
.......

// if servers is not set, or it empty, then
// throw a runtime exception
if (servers == null || servers.length &lt;= 0)
{
	log.error("++++ trying to initialize with no servers");
	throw new IllegalStateException("++++ trying to initialize with no servers");
}

// 初始化hash环结构的同时,创建每个server的socket,具体初始化hash算法可以参见上篇文章
if (this.hashingAlg == CONSISTENT_HASH)  //如果是hash一致性算法
	populateConsistentBuckets(); 
else
	populateBuckets();

// mark pool as initialized
this.initialized = true;

// 开始执行维护线程,该新线程会根据用户设定的时间间隔(maintsleep)进行连接池的维护工作
if (this.maintSleep &gt; 0)
this.startMaintThread();

} finally
{
   initDeadLock.unlock();
}

}

创建sokcet连接

由代码中我们可以看出,初始化hash环结构时,即为每个server初始化了socket连接,创建连接逻辑由createSocket方法负责。

如果给定的server发生故障,或者其他原因,无法创建socket的话,策略就是将其加入故障服务器队列hostDead,并且设置故障过期时间,下次再有需要对该server创建socket时,会先检测hostDead中是否包含该server,以及过期时间是否已经过了,如果包含并且未过过期时间的话,直接返回null,不在创建socket。无论创建是否成功,都会调用addSocketToPool方法将socket放入容器socketPool中。

注意:同一个server,有可能被创建多个socket。

protected <T> boolean addSocketToPool(ConcurrentMap<String, ConcurrentMap<SockIO, T>> pool, String host,SockIO socket, T oldValue,T newValue, boolean needReplace)
{
	.....
	if (!pool.containsKey(host))
	{
	sockets = new ConcurrentHashMap<SockIO, T>();
	pool.putIfAbsent(host, sockets);
	}
	sockets = pool.get(host);
	if (sockets != null)
	{
		if (needReplace)
		{       //对于同一个host,有可能创建多个socket
		    sockets.put(socket, newValue);
		    result = true;
		} 
		else{
			return sockets.replace(socket, oldValue, newValue);
		}
	}
	return result;
}

创建socket的逻辑就是构建一个SockIO对象,默认使用NIO建立socket,部分代码如下:

public SockIO(SockIOPool pool, String host, int timeout,
	           int connectTimeout, boolean noDelay) throws IOException,UnknownHostException
{
	.......
	// 创建真正的sokcet对象,默认使用NIO
 sock = getSocket(host.substring(0,index), Integer.parseInt(host.substring(index+1)), connectTimeout);
if (timeout &gt;= 0)
this.sock.setSoTimeout(timeout);
// testing only
sock.setTcpNoDelay(noDelay);
// 包装输入输出流
in = new DataInputStream(sock.getInputStream());
out = new BufferedOutputStream(sock.getOutputStream());

this.host = host;

}

从SocketChannel中获取一个socket连接。

protected static Socket getSocket(String host, int port, int timeout) throws IOException
{
	SocketChannel sock = SocketChannel.open();
	sock.socket().connect(new InetSocketAddress(host, port), timeout);
	return sock.socket();
}

如何获取socket连接

连接池初始化成功,socket也创建完毕,那么下面看看如何获取一个指定server的socket连接。

public SockIO getConnection(String host)
{
      ........
	if (socketPool != null && !socketPool.isEmpty())
	{
		//该host对应的map中可能包含多个socket对象
		Map<SockIO, Integer> aSockets = socketPool.get(host);
	//fast check
	SockIO socket = fastPool.get(host);
	if (socket != null)
	{
		if (isFreeSocket(socket,aSockets))
		return socket;
	}
		
	if (aSockets != null &amp;&amp; !aSockets.isEmpty())
	{       //aSockets中可能会包含一个host的多个socket,随机指定一个
		int start = (random.nextInt() % aSockets.size());	
		if (start &lt; 0)  start*= -1;
		int count = 0;
		//下面2个for循环,是对整个host对应的所有生成的socket连接进行遍历,随机遍历	
		for (Iterator&lt;SockIO&gt; i = aSockets.keySet().iterator(); i.hasNext();)
		{
			if (count &lt; start){
				i.next();count++;continue;
			}
			socket = i.next(); //从比起始位置start大的第一个socket开始,判断连接是否可用
			if (isFreeSocket(socket,aSockets))
				return socket;
		}
		//如果没有结果,就从第一个socket开始,逐渐到比start小的第一个socket结束,判断是否可用
		for (Iterator&lt;SockIO&gt; i = aSockets.keySet().iterator();i.hasNext();)
		{
			if (count &gt; 0)
			{
				socket = i.next();
				if (isFreeSocket(socket,aSockets))
					return socket;
					count--;
			}else break;
		}		
	}
}
// create one socket -- let the maint thread take care of creating more
SockIO socket = createSocket(host);
if (socket != null)
{
   addSocketToPool(socketPool, host, socket,SOCKET_STATUS_BUSY,SOCKET_STATUS_BUSY, true);
}
return socket;

}

获取一个连接,希望通过高效的方式获取到一个合适的socket对象,所有采用了取余的一种算法。

线程池的维护MaintThread

前面说过,初始化连接后,即开始执行维护线程MaintThread,用于在指定时间间隔内维护socketpool,其内部实现其实也是调用SockIOPool的内部方法selfMaint,该方法维护socket连接池的步骤如下:

  • 在socketpool中找出需要建立socket的host,并且计算需要建立几个socket,其实就是根据配置参数minConn进行计算
  • 为每个host创建需要增加的socket实例,创建成功后放入socketpool中
  • 计算所有的空闲状态的活动socket,并且计算每个host的多余的空闲socket实例个数,即大于maxConn的,同时将这些socket状态置为SOCKET_STATUS_DEAD
  • 清理socketpool中所有状态为SOCKET_STATUS_DEAD的socket,从socketpool中删除,当然是先关闭socket再删除

总结

在基于NIO的基础上,该客户端实现了socket的灵活管理,使用多线程做连接池的定期维护,使的连接池始终保持在高校可用的状态;使用合理的包装,以符合分布式memcached缓存的实现需求。

相关帖子

欢迎来到这里!

我们正在构建一个小众社区,大家在这里相互信任,以平等 • 自由 • 奔放的价值观进行分享交流。最终,希望大家能够找到与自己志同道合的伙伴,共同成长。

注册 关于
请输入回帖内容 ...

推荐标签 标签

  • 资讯

    资讯是用户因为及时地获得它并利用它而能够在相对短的时间内给自己带来价值的信息,资讯有时效性和地域性。

    55 引用 • 85 回帖
  • Solo

    Solo 是一款小而美的开源博客系统,专为程序员设计。Solo 有着非常活跃的社区,可将文章作为帖子推送到社区,来自社区的回帖将作为博客评论进行联动(具体细节请浏览 B3log 构思 - 分布式社区网络)。

    这是一种全新的网络社区体验,让热爱记录和分享的你不再感到孤单!

    1434 引用 • 10054 回帖 • 490 关注
  • Lute

    Lute 是一款结构化的 Markdown 引擎,支持 Go 和 JavaScript。

    25 引用 • 191 回帖 • 16 关注
  • 钉钉

    钉钉,专为中国企业打造的免费沟通协同多端平台, 阿里巴巴出品。

    15 引用 • 67 回帖 • 339 关注
  • Bug

    Bug 本意是指臭虫、缺陷、损坏、犯贫、窃听器、小虫等。现在人们把在程序中一些缺陷或问题统称为 bug(漏洞)。

    75 引用 • 1737 回帖 • 5 关注
  • IBM

    IBM(国际商业机器公司)或万国商业机器公司,简称 IBM(International Business Machines Corporation),总公司在纽约州阿蒙克市。1911 年托马斯·沃森创立于美国,是全球最大的信息技术和业务解决方案公司,拥有全球雇员 30 多万人,业务遍及 160 多个国家和地区。

    17 引用 • 53 回帖 • 136 关注
  • 微服务

    微服务架构是一种架构模式,它提倡将单一应用划分成一组小的服务。服务之间互相协调,互相配合,为用户提供最终价值。每个服务运行在独立的进程中。服务于服务之间才用轻量级的通信机制互相沟通。每个服务都围绕着具体业务构建,能够被独立的部署。

    96 引用 • 155 回帖 • 1 关注
  • 招聘

    哪里都缺人,哪里都不缺人。

    190 引用 • 1057 回帖
  • iOS

    iOS 是由苹果公司开发的移动操作系统,最早于 2007 年 1 月 9 日的 Macworld 大会上公布这个系统,最初是设计给 iPhone 使用的,后来陆续套用到 iPod touch、iPad 以及 Apple TV 等产品上。iOS 与苹果的 Mac OS X 操作系统一样,属于类 Unix 的商业操作系统。

    85 引用 • 139 回帖 • 1 关注
  • 博客

    记录并分享人生的经历。

    273 引用 • 2388 回帖
  • 安装

    你若安好,便是晴天。

    132 引用 • 1184 回帖
  • FlowUs

    FlowUs.息流 个人及团队的新一代生产力工具。

    让复杂的信息管理更轻松、自由、充满创意。

    1 引用 • 1 关注
  • Eclipse

    Eclipse 是一个开放源代码的、基于 Java 的可扩展开发平台。就其本身而言,它只是一个框架和一组服务,用于通过插件组件构建开发环境。

    75 引用 • 258 回帖 • 617 关注
  • 互联网

    互联网(Internet),又称网际网络,或音译因特网、英特网。互联网始于 1969 年美国的阿帕网,是网络与网络之间所串连成的庞大网络,这些网络以一组通用的协议相连,形成逻辑上的单一巨大国际网络。

    98 引用 • 344 回帖
  • 开源中国

    开源中国是目前中国最大的开源技术社区。传播开源的理念,推广开源项目,为 IT 开发者提供了一个发现、使用、并交流开源技术的平台。目前开源中国社区已收录超过两万款开源软件。

    7 引用 • 86 回帖
  • 创业

    你比 99% 的人都优秀么?

    84 引用 • 1399 回帖
  • SVN

    SVN 是 Subversion 的简称,是一个开放源代码的版本控制系统,相较于 RCS、CVS,它采用了分支管理系统,它的设计目标就是取代 CVS。

    29 引用 • 98 回帖 • 680 关注
  • Latke

    Latke 是一款以 JSON 为主的 Java Web 框架。

    71 引用 • 535 回帖 • 787 关注
  • 心情

    心是产生任何想法的源泉,心本体会陷入到对自己本体不能理解的状态中,因为心能产生任何想法,不能分出对错,不能分出自己。

    59 引用 • 369 回帖
  • 新人

    让我们欢迎这对新人。哦,不好意思说错了,让我们欢迎这位新人!
    新手上路,请谨慎驾驶!

    52 引用 • 228 回帖
  • IPFS

    IPFS(InterPlanetary File System,星际文件系统)是永久的、去中心化保存和共享文件的方法,这是一种内容可寻址、版本化、点对点超媒体的分布式协议。请浏览 IPFS 入门笔记了解更多细节。

    21 引用 • 245 回帖 • 241 关注
  • jQuery

    jQuery 是一套跨浏览器的 JavaScript 库,强化 HTML 与 JavaScript 之间的操作。由 John Resig 在 2006 年 1 月的 BarCamp NYC 上释出第一个版本。全球约有 28% 的网站使用 jQuery,是非常受欢迎的 JavaScript 库。

    63 引用 • 134 回帖 • 724 关注
  • wolai

    我来 wolai:不仅仅是未来的云端笔记!

    2 引用 • 14 回帖
  • TGIF

    Thank God It's Friday! 感谢老天,总算到星期五啦!

    287 引用 • 4484 回帖 • 669 关注
  • SEO

    发布对别人有帮助的原创内容是最好的 SEO 方式。

    35 引用 • 200 回帖 • 22 关注
  • 国际化

    i18n(其来源是英文单词 internationalization 的首末字符 i 和 n,18 为中间的字符数)是“国际化”的简称。对程序来说,国际化是指在不修改代码的情况下,能根据不同语言及地区显示相应的界面。

    8 引用 • 26 回帖
  • Dubbo

    Dubbo 是一个分布式服务框架,致力于提供高性能和透明化的 RPC 远程服务调用方案,是 [阿里巴巴] SOA 服务化治理方案的核心框架,每天为 2,000+ 个服务提供 3,000,000,000+ 次访问量支持,并被广泛应用于阿里巴巴集团的各成员站点。

    60 引用 • 82 回帖 • 595 关注