阿里 memcached 客户端 socket 连接池源码分析

本贴最后更新于 3042 天前,其中的信息可能已经渤澥桑田

上篇文章中,我们对阿里memcached客户端主要源码进行了分析,其中socket连接池部分是系统关键部分,涉及到整个客户端运行是否稳定,与服务端连接是否高效,本篇就来分析socket连接池实现的这部分源码。

连接池源码位于com.alisoft.xplatform.asf.cache.memcached.client下的SockIOPool类,该类还包括2个内部类,分别是MaintThread和SockIO。SockIOPool主要功能是用于维护与memcached服务端的持久化连接;并提供初始化连接池,获取、释放连接,以及设置服务器权重,连接池维护等功能。

初始化连接池

在系统启动时,会首先读取缓存服务设置的相关配置文件,读取成功后即开始实例化一个SockIOPool实例,再将pool的相关设置参数赋给该实例,如

//所有memcahed服务端ip地址,以及对应的权重
private String[] servers;
private Integer[] weights;

然后调用初始化逻辑,即initialize方法;该方法中,用一个支持并发的concurrenthashmap初始化一个容器socketpool,容器大小由配置的memcached服务端和初始化连接数决定。

socketPool = new ConcurrentHashMap<String, ConcurrentMap<SockIO, Integer>>(servers.length*initConn);

该容器中存放的是每个server地址,以及对应socket的一个包装内部类SockIO(在该内部类中完成所有的socket相关操作),同时该socket的状态也被记录下来,下面看看初始化代码:

public void initialize()
{
      // check to see if already initialized
      if (initialized && (buckets != null || consistentBuckets != null)&& (socketPool != null))
	{
		log.error("++++ trying to initialize an already initialized pool");
		return;
	}
        //加锁,防止多线程并发问题
	initDeadLock.lock();
	try
	{
	   // check to see if already initialized
	   if (initialized && (buckets != null || consistentBuckets != null)&& (socketPool != null))
	   {
		log.error("++++ trying to initialize an already initialized pool");
		return;
	   }
   // pools 
   socketPool = new ConcurrentHashMap&lt;String, ConcurrentMap&lt;SockIO, Integer&gt;&gt;(
				servers.length * initConn);
		
fastPool = new HashMap&lt;String,SockIO&gt;();
		
hostDeadDur = new ConcurrentHashMap&lt;String, Long&gt;();
hostDead = new ConcurrentHashMap&lt;String, Date&gt;();
maxCreate = (poolMultiplier &gt; minConn) ? minConn : minConn
		/ poolMultiplier; // only create up to maxCreate
			// connections at once
		
.......

// if servers is not set, or it empty, then
// throw a runtime exception
if (servers == null || servers.length &lt;= 0)
{
	log.error("++++ trying to initialize with no servers");
	throw new IllegalStateException("++++ trying to initialize with no servers");
}

// 初始化hash环结构的同时,创建每个server的socket,具体初始化hash算法可以参见上篇文章
if (this.hashingAlg == CONSISTENT_HASH)  //如果是hash一致性算法
	populateConsistentBuckets(); 
else
	populateBuckets();

// mark pool as initialized
this.initialized = true;

// 开始执行维护线程,该新线程会根据用户设定的时间间隔(maintsleep)进行连接池的维护工作
if (this.maintSleep &gt; 0)
this.startMaintThread();

} finally
{
   initDeadLock.unlock();
}

}

创建sokcet连接

由代码中我们可以看出,初始化hash环结构时,即为每个server初始化了socket连接,创建连接逻辑由createSocket方法负责。

如果给定的server发生故障,或者其他原因,无法创建socket的话,策略就是将其加入故障服务器队列hostDead,并且设置故障过期时间,下次再有需要对该server创建socket时,会先检测hostDead中是否包含该server,以及过期时间是否已经过了,如果包含并且未过过期时间的话,直接返回null,不在创建socket。无论创建是否成功,都会调用addSocketToPool方法将socket放入容器socketPool中。

注意:同一个server,有可能被创建多个socket。

protected <T> boolean addSocketToPool(ConcurrentMap<String, ConcurrentMap<SockIO, T>> pool, String host,SockIO socket, T oldValue,T newValue, boolean needReplace)
{
	.....
	if (!pool.containsKey(host))
	{
	sockets = new ConcurrentHashMap<SockIO, T>();
	pool.putIfAbsent(host, sockets);
	}
	sockets = pool.get(host);
	if (sockets != null)
	{
		if (needReplace)
		{       //对于同一个host,有可能创建多个socket
		    sockets.put(socket, newValue);
		    result = true;
		} 
		else{
			return sockets.replace(socket, oldValue, newValue);
		}
	}
	return result;
}

创建socket的逻辑就是构建一个SockIO对象,默认使用NIO建立socket,部分代码如下:

public SockIO(SockIOPool pool, String host, int timeout,
	           int connectTimeout, boolean noDelay) throws IOException,UnknownHostException
{
	.......
	// 创建真正的sokcet对象,默认使用NIO
 sock = getSocket(host.substring(0,index), Integer.parseInt(host.substring(index+1)), connectTimeout);
if (timeout &gt;= 0)
this.sock.setSoTimeout(timeout);
// testing only
sock.setTcpNoDelay(noDelay);
// 包装输入输出流
in = new DataInputStream(sock.getInputStream());
out = new BufferedOutputStream(sock.getOutputStream());

this.host = host;

}

从SocketChannel中获取一个socket连接。

protected static Socket getSocket(String host, int port, int timeout) throws IOException
{
	SocketChannel sock = SocketChannel.open();
	sock.socket().connect(new InetSocketAddress(host, port), timeout);
	return sock.socket();
}

如何获取socket连接

连接池初始化成功,socket也创建完毕,那么下面看看如何获取一个指定server的socket连接。

public SockIO getConnection(String host)
{
      ........
	if (socketPool != null && !socketPool.isEmpty())
	{
		//该host对应的map中可能包含多个socket对象
		Map<SockIO, Integer> aSockets = socketPool.get(host);
	//fast check
	SockIO socket = fastPool.get(host);
	if (socket != null)
	{
		if (isFreeSocket(socket,aSockets))
		return socket;
	}
		
	if (aSockets != null &amp;&amp; !aSockets.isEmpty())
	{       //aSockets中可能会包含一个host的多个socket,随机指定一个
		int start = (random.nextInt() % aSockets.size());	
		if (start &lt; 0)  start*= -1;
		int count = 0;
		//下面2个for循环,是对整个host对应的所有生成的socket连接进行遍历,随机遍历	
		for (Iterator&lt;SockIO&gt; i = aSockets.keySet().iterator(); i.hasNext();)
		{
			if (count &lt; start){
				i.next();count++;continue;
			}
			socket = i.next(); //从比起始位置start大的第一个socket开始,判断连接是否可用
			if (isFreeSocket(socket,aSockets))
				return socket;
		}
		//如果没有结果,就从第一个socket开始,逐渐到比start小的第一个socket结束,判断是否可用
		for (Iterator&lt;SockIO&gt; i = aSockets.keySet().iterator();i.hasNext();)
		{
			if (count &gt; 0)
			{
				socket = i.next();
				if (isFreeSocket(socket,aSockets))
					return socket;
					count--;
			}else break;
		}		
	}
}
// create one socket -- let the maint thread take care of creating more
SockIO socket = createSocket(host);
if (socket != null)
{
   addSocketToPool(socketPool, host, socket,SOCKET_STATUS_BUSY,SOCKET_STATUS_BUSY, true);
}
return socket;

}

获取一个连接,希望通过高效的方式获取到一个合适的socket对象,所有采用了取余的一种算法。

线程池的维护MaintThread

前面说过,初始化连接后,即开始执行维护线程MaintThread,用于在指定时间间隔内维护socketpool,其内部实现其实也是调用SockIOPool的内部方法selfMaint,该方法维护socket连接池的步骤如下:

  • 在socketpool中找出需要建立socket的host,并且计算需要建立几个socket,其实就是根据配置参数minConn进行计算
  • 为每个host创建需要增加的socket实例,创建成功后放入socketpool中
  • 计算所有的空闲状态的活动socket,并且计算每个host的多余的空闲socket实例个数,即大于maxConn的,同时将这些socket状态置为SOCKET_STATUS_DEAD
  • 清理socketpool中所有状态为SOCKET_STATUS_DEAD的socket,从socketpool中删除,当然是先关闭socket再删除

总结

在基于NIO的基础上,该客户端实现了socket的灵活管理,使用多线程做连接池的定期维护,使的连接池始终保持在高校可用的状态;使用合理的包装,以符合分布式memcached缓存的实现需求。

相关帖子

欢迎来到这里!

我们正在构建一个小众社区,大家在这里相互信任,以平等 • 自由 • 奔放的价值观进行分享交流。最终,希望大家能够找到与自己志同道合的伙伴,共同成长。

注册 关于
请输入回帖内容 ...

推荐标签 标签

  • Spark

    Spark 是 UC Berkeley AMP lab 所开源的类 Hadoop MapReduce 的通用并行框架。Spark 拥有 Hadoop MapReduce 所具有的优点;但不同于 MapReduce 的是 Job 中间输出结果可以保存在内存中,从而不再需要读写 HDFS,因此 Spark 能更好地适用于数据挖掘与机器学习等需要迭代的 MapReduce 的算法。

    74 引用 • 46 回帖 • 559 关注
  • 百度

    百度(Nasdaq:BIDU)是全球最大的中文搜索引擎、最大的中文网站。2000 年 1 月由李彦宏创立于北京中关村,致力于向人们提供“简单,可依赖”的信息获取方式。“百度”二字源于中国宋朝词人辛弃疾的《青玉案·元夕》词句“众里寻他千百度”,象征着百度对中文信息检索技术的执著追求。

    63 引用 • 785 回帖 • 164 关注
  • 微软

    微软是一家美国跨国科技公司,也是世界 PC 软件开发的先导,由比尔·盖茨与保罗·艾伦创办于 1975 年,公司总部设立在华盛顿州的雷德蒙德(Redmond,邻近西雅图)。以研发、制造、授权和提供广泛的电脑软件服务业务为主。

    8 引用 • 44 回帖
  • 友情链接

    确认过眼神后的灵魂连接,站在链在!

    24 引用 • 373 回帖
  • Caddy

    Caddy 是一款默认自动启用 HTTPS 的 HTTP/2 Web 服务器。

    12 引用 • 54 回帖 • 159 关注
  • Postman

    Postman 是一款简单好用的 HTTP API 调试工具。

    4 引用 • 3 回帖 • 7 关注
  • 电影

    这是一个不能说的秘密。

    121 引用 • 604 回帖 • 1 关注
  • 微服务

    微服务架构是一种架构模式,它提倡将单一应用划分成一组小的服务。服务之间互相协调,互相配合,为用户提供最终价值。每个服务运行在独立的进程中。服务于服务之间才用轻量级的通信机制互相沟通。每个服务都围绕着具体业务构建,能够被独立的部署。

    96 引用 • 155 回帖
  • FreeMarker

    FreeMarker 是一款好用且功能强大的 Java 模版引擎。

    23 引用 • 20 回帖 • 464 关注
  • Swift

    Swift 是苹果于 2014 年 WWDC(苹果开发者大会)发布的开发语言,可与 Objective-C 共同运行于 Mac OS 和 iOS 平台,用于搭建基于苹果平台的应用程序。

    36 引用 • 37 回帖 • 535 关注
  • ReactiveX

    ReactiveX 是一个专注于异步编程与控制可观察数据(或者事件)流的 API。它组合了观察者模式,迭代器模式和函数式编程的优秀思想。

    1 引用 • 2 回帖 • 161 关注
  • CongSec

    本标签主要用于分享网络空间安全专业的学习笔记

    1 引用 • 1 回帖 • 15 关注
  • 大数据

    大数据(big data)是指无法在一定时间范围内用常规软件工具进行捕捉、管理和处理的数据集合,是需要新处理模式才能具有更强的决策力、洞察发现力和流程优化能力的海量、高增长率和多样化的信息资产。

    93 引用 • 113 回帖
  • NetBeans

    NetBeans 是一个始于 1997 年的 Xelfi 计划,本身是捷克布拉格查理大学的数学及物理学院的学生计划。此计划延伸而成立了一家公司进而发展这个商用版本的 NetBeans IDE,直到 1999 年 Sun 买下此公司。Sun 于次年(2000 年)六月将 NetBeans IDE 开源,直到现在 NetBeans 的社群依然持续增长。

    78 引用 • 102 回帖 • 683 关注
  • TextBundle

    TextBundle 文件格式旨在应用程序之间交换 Markdown 或 Fountain 之类的纯文本文件时,提供更无缝的用户体验。

    1 引用 • 2 回帖 • 53 关注
  • 工具

    子曰:“工欲善其事,必先利其器。”

    288 引用 • 734 回帖 • 2 关注
  • Unity

    Unity 是由 Unity Technologies 开发的一个让开发者可以轻松创建诸如 2D、3D 多平台的综合型游戏开发工具,是一个全面整合的专业游戏引擎。

    25 引用 • 7 回帖 • 159 关注
  • Rust

    Rust 是一门赋予每个人构建可靠且高效软件能力的语言。Rust 由 Mozilla 开发,最早发布于 2014 年 9 月。

    58 引用 • 22 回帖
  • 负能量

    上帝为你关上了一扇门,然后就去睡觉了....努力不一定能成功,但不努力一定很轻松 (° ー °〃)

    88 引用 • 1235 回帖 • 410 关注
  • Pipe

    Pipe 是一款小而美的开源博客平台。Pipe 有着非常活跃的社区,可将文章作为帖子推送到社区,来自社区的回帖将作为博客评论进行联动(具体细节请浏览 B3log 构思 - 分布式社区网络)。

    这是一种全新的网络社区体验,让热爱记录和分享的你不再感到孤单!

    132 引用 • 1114 回帖 • 125 关注
  • 持续集成

    持续集成(Continuous Integration)是一种软件开发实践,即团队开发成员经常集成他们的工作,通过每个成员每天至少集成一次,也就意味着每天可能会发生多次集成。每次集成都通过自动化的构建(包括编译,发布,自动化测试)来验证,从而尽早地发现集成错误。

    15 引用 • 7 回帖
  • Bug

    Bug 本意是指臭虫、缺陷、损坏、犯贫、窃听器、小虫等。现在人们把在程序中一些缺陷或问题统称为 bug(漏洞)。

    76 引用 • 1737 回帖 • 1 关注
  • Jenkins

    Jenkins 是一套开源的持续集成工具。它提供了非常丰富的插件,让构建、部署、自动化集成项目变得简单易用。

    53 引用 • 37 回帖 • 3 关注
  • 区块链

    区块链是分布式数据存储、点对点传输、共识机制、加密算法等计算机技术的新型应用模式。所谓共识机制是区块链系统中实现不同节点之间建立信任、获取权益的数学算法 。

    91 引用 • 751 回帖 • 1 关注
  • 自由行
    4 关注
  • API

    应用程序编程接口(Application Programming Interface)是一些预先定义的函数,目的是提供应用程序与开发人员基于某软件或硬件得以访问一组例程的能力,而又无需访问源码,或理解内部工作机制的细节。

    77 引用 • 430 回帖 • 1 关注
  • 学习

    “梦想从学习开始,事业从实践起步” —— 习近平

    171 引用 • 512 回帖