阿里 memcached 客户端 socket 连接池源码分析

本贴最后更新于 2873 天前,其中的信息可能已经渤澥桑田

上篇文章中,我们对阿里memcached客户端主要源码进行了分析,其中socket连接池部分是系统关键部分,涉及到整个客户端运行是否稳定,与服务端连接是否高效,本篇就来分析socket连接池实现的这部分源码。

连接池源码位于com.alisoft.xplatform.asf.cache.memcached.client下的SockIOPool类,该类还包括2个内部类,分别是MaintThread和SockIO。SockIOPool主要功能是用于维护与memcached服务端的持久化连接;并提供初始化连接池,获取、释放连接,以及设置服务器权重,连接池维护等功能。

初始化连接池

在系统启动时,会首先读取缓存服务设置的相关配置文件,读取成功后即开始实例化一个SockIOPool实例,再将pool的相关设置参数赋给该实例,如

//所有memcahed服务端ip地址,以及对应的权重
private String[] servers;
private Integer[] weights;

然后调用初始化逻辑,即initialize方法;该方法中,用一个支持并发的concurrenthashmap初始化一个容器socketpool,容器大小由配置的memcached服务端和初始化连接数决定。

socketPool = new ConcurrentHashMap<String, ConcurrentMap<SockIO, Integer>>(servers.length*initConn);

该容器中存放的是每个server地址,以及对应socket的一个包装内部类SockIO(在该内部类中完成所有的socket相关操作),同时该socket的状态也被记录下来,下面看看初始化代码:

public void initialize()
{
      // check to see if already initialized
      if (initialized && (buckets != null || consistentBuckets != null)&& (socketPool != null))
	{
		log.error("++++ trying to initialize an already initialized pool");
		return;
	}
        //加锁,防止多线程并发问题
	initDeadLock.lock();
	try
	{
	   // check to see if already initialized
	   if (initialized && (buckets != null || consistentBuckets != null)&& (socketPool != null))
	   {
		log.error("++++ trying to initialize an already initialized pool");
		return;
	   }
   // pools 
   socketPool = new ConcurrentHashMap&lt;String, ConcurrentMap&lt;SockIO, Integer&gt;&gt;(
				servers.length * initConn);
		
fastPool = new HashMap&lt;String,SockIO&gt;();
		
hostDeadDur = new ConcurrentHashMap&lt;String, Long&gt;();
hostDead = new ConcurrentHashMap&lt;String, Date&gt;();
maxCreate = (poolMultiplier &gt; minConn) ? minConn : minConn
		/ poolMultiplier; // only create up to maxCreate
			// connections at once
		
.......

// if servers is not set, or it empty, then
// throw a runtime exception
if (servers == null || servers.length &lt;= 0)
{
	log.error("++++ trying to initialize with no servers");
	throw new IllegalStateException("++++ trying to initialize with no servers");
}

// 初始化hash环结构的同时,创建每个server的socket,具体初始化hash算法可以参见上篇文章
if (this.hashingAlg == CONSISTENT_HASH)  //如果是hash一致性算法
	populateConsistentBuckets(); 
else
	populateBuckets();

// mark pool as initialized
this.initialized = true;

// 开始执行维护线程,该新线程会根据用户设定的时间间隔(maintsleep)进行连接池的维护工作
if (this.maintSleep &gt; 0)
this.startMaintThread();

} finally
{
   initDeadLock.unlock();
}

}

创建sokcet连接

由代码中我们可以看出,初始化hash环结构时,即为每个server初始化了socket连接,创建连接逻辑由createSocket方法负责。

如果给定的server发生故障,或者其他原因,无法创建socket的话,策略就是将其加入故障服务器队列hostDead,并且设置故障过期时间,下次再有需要对该server创建socket时,会先检测hostDead中是否包含该server,以及过期时间是否已经过了,如果包含并且未过过期时间的话,直接返回null,不在创建socket。无论创建是否成功,都会调用addSocketToPool方法将socket放入容器socketPool中。

注意:同一个server,有可能被创建多个socket。

protected <T> boolean addSocketToPool(ConcurrentMap<String, ConcurrentMap<SockIO, T>> pool, String host,SockIO socket, T oldValue,T newValue, boolean needReplace)
{
	.....
	if (!pool.containsKey(host))
	{
	sockets = new ConcurrentHashMap<SockIO, T>();
	pool.putIfAbsent(host, sockets);
	}
	sockets = pool.get(host);
	if (sockets != null)
	{
		if (needReplace)
		{       //对于同一个host,有可能创建多个socket
		    sockets.put(socket, newValue);
		    result = true;
		} 
		else{
			return sockets.replace(socket, oldValue, newValue);
		}
	}
	return result;
}

创建socket的逻辑就是构建一个SockIO对象,默认使用NIO建立socket,部分代码如下:

public SockIO(SockIOPool pool, String host, int timeout,
	           int connectTimeout, boolean noDelay) throws IOException,UnknownHostException
{
	.......
	// 创建真正的sokcet对象,默认使用NIO
 sock = getSocket(host.substring(0,index), Integer.parseInt(host.substring(index+1)), connectTimeout);
if (timeout &gt;= 0)
this.sock.setSoTimeout(timeout);
// testing only
sock.setTcpNoDelay(noDelay);
// 包装输入输出流
in = new DataInputStream(sock.getInputStream());
out = new BufferedOutputStream(sock.getOutputStream());

this.host = host;

}

从SocketChannel中获取一个socket连接。

protected static Socket getSocket(String host, int port, int timeout) throws IOException
{
	SocketChannel sock = SocketChannel.open();
	sock.socket().connect(new InetSocketAddress(host, port), timeout);
	return sock.socket();
}

如何获取socket连接

连接池初始化成功,socket也创建完毕,那么下面看看如何获取一个指定server的socket连接。

public SockIO getConnection(String host)
{
      ........
	if (socketPool != null && !socketPool.isEmpty())
	{
		//该host对应的map中可能包含多个socket对象
		Map<SockIO, Integer> aSockets = socketPool.get(host);
	//fast check
	SockIO socket = fastPool.get(host);
	if (socket != null)
	{
		if (isFreeSocket(socket,aSockets))
		return socket;
	}
		
	if (aSockets != null &amp;&amp; !aSockets.isEmpty())
	{       //aSockets中可能会包含一个host的多个socket,随机指定一个
		int start = (random.nextInt() % aSockets.size());	
		if (start &lt; 0)  start*= -1;
		int count = 0;
		//下面2个for循环,是对整个host对应的所有生成的socket连接进行遍历,随机遍历	
		for (Iterator&lt;SockIO&gt; i = aSockets.keySet().iterator(); i.hasNext();)
		{
			if (count &lt; start){
				i.next();count++;continue;
			}
			socket = i.next(); //从比起始位置start大的第一个socket开始,判断连接是否可用
			if (isFreeSocket(socket,aSockets))
				return socket;
		}
		//如果没有结果,就从第一个socket开始,逐渐到比start小的第一个socket结束,判断是否可用
		for (Iterator&lt;SockIO&gt; i = aSockets.keySet().iterator();i.hasNext();)
		{
			if (count &gt; 0)
			{
				socket = i.next();
				if (isFreeSocket(socket,aSockets))
					return socket;
					count--;
			}else break;
		}		
	}
}
// create one socket -- let the maint thread take care of creating more
SockIO socket = createSocket(host);
if (socket != null)
{
   addSocketToPool(socketPool, host, socket,SOCKET_STATUS_BUSY,SOCKET_STATUS_BUSY, true);
}
return socket;

}

获取一个连接,希望通过高效的方式获取到一个合适的socket对象,所有采用了取余的一种算法。

线程池的维护MaintThread

前面说过,初始化连接后,即开始执行维护线程MaintThread,用于在指定时间间隔内维护socketpool,其内部实现其实也是调用SockIOPool的内部方法selfMaint,该方法维护socket连接池的步骤如下:

  • 在socketpool中找出需要建立socket的host,并且计算需要建立几个socket,其实就是根据配置参数minConn进行计算
  • 为每个host创建需要增加的socket实例,创建成功后放入socketpool中
  • 计算所有的空闲状态的活动socket,并且计算每个host的多余的空闲socket实例个数,即大于maxConn的,同时将这些socket状态置为SOCKET_STATUS_DEAD
  • 清理socketpool中所有状态为SOCKET_STATUS_DEAD的socket,从socketpool中删除,当然是先关闭socket再删除

总结

在基于NIO的基础上,该客户端实现了socket的灵活管理,使用多线程做连接池的定期维护,使的连接池始终保持在高校可用的状态;使用合理的包装,以符合分布式memcached缓存的实现需求。

相关帖子

欢迎来到这里!

我们正在构建一个小众社区,大家在这里相互信任,以平等 • 自由 • 奔放的价值观进行分享交流。最终,希望大家能够找到与自己志同道合的伙伴,共同成长。

注册 关于
请输入回帖内容 ...

推荐标签 标签

  • 宕机

    宕机,多指一些网站、游戏、网络应用等服务器一种区别于正常运行的状态,也叫“Down 机”、“当机”或“死机”。宕机状态不仅仅是指服务器“挂掉了”、“死机了”状态,也包括服务器假死、停用、关闭等一些原因而导致出现的不能够正常运行的状态。

    13 引用 • 82 回帖 • 51 关注
  • 前端

    前端技术一般分为前端设计和前端开发,前端设计可以理解为网站的视觉设计,前端开发则是网站的前台代码实现,包括 HTML、CSS 以及 JavaScript 等。

    247 引用 • 1347 回帖 • 1 关注
  • Spring

    Spring 是一个开源框架,是于 2003 年兴起的一个轻量级的 Java 开发框架,由 Rod Johnson 在其著作《Expert One-On-One J2EE Development and Design》中阐述的部分理念和原型衍生而来。它是为了解决企业应用开发的复杂性而创建的。框架的主要优势之一就是其分层架构,分层架构允许使用者选择使用哪一个组件,同时为 JavaEE 应用程序开发提供集成的框架。

    942 引用 • 1458 回帖 • 118 关注
  • 小说

    小说是以刻画人物形象为中心,通过完整的故事情节和环境描写来反映社会生活的文学体裁。

    28 引用 • 108 回帖 • 1 关注
  • C++

    C++ 是在 C 语言的基础上开发的一种通用编程语言,应用广泛。C++ 支持多种编程范式,面向对象编程、泛型编程和过程化编程。

    106 引用 • 152 回帖
  • 电影

    这是一个不能说的秘密。

    120 引用 • 597 回帖
  • ActiveMQ

    ActiveMQ 是 Apache 旗下的一款开源消息总线系统,它完整实现了 JMS 规范,是一个企业级的消息中间件。

    19 引用 • 13 回帖 • 641 关注
  • 周末

    星期六到星期天晚,实行五天工作制后,指每周的最后两天。再过几年可能就是三天了。

    14 引用 • 297 回帖
  • 尊园地产

    昆明尊园房地产经纪有限公司,即:Kunming Zunyuan Property Agency Company Limited(简称“尊园地产”)于 2007 年 6 月开始筹备,2007 年 8 月 18 日正式成立,注册资本 200 万元,公司性质为股份经纪有限公司,主营业务为:代租、代售、代办产权过户、办理银行按揭、担保、抵押、评估等。

    1 引用 • 22 回帖 • 702 关注
  • Webswing

    Webswing 是一个能将任何 Swing 应用通过纯 HTML5 运行在浏览器中的 Web 服务器,详细介绍请看 将 Java Swing 应用变成 Web 应用

    1 引用 • 15 回帖 • 629 关注
  • abitmean

    有点意思就行了

    33 关注
  • 以太坊

    以太坊(Ethereum)并不是一个机构,而是一款能够在区块链上实现智能合约、开源的底层系统。以太坊是一个平台和一种编程语言 Solidity,使开发人员能够建立和发布下一代去中心化应用。 以太坊可以用来编程、分散、担保和交易任何事物:投票、域名、金融交易所、众筹、公司管理、合同和知识产权等等。

    34 引用 • 367 回帖
  • Tomcat

    Tomcat 最早是由 Sun Microsystems 开发的一个 Servlet 容器,在 1999 年被捐献给 ASF(Apache Software Foundation),隶属于 Jakarta 项目,现在已经独立为一个顶级项目。Tomcat 主要实现了 JavaEE 中的 Servlet、JSP 规范,同时也提供 HTTP 服务,是市场上非常流行的 Java Web 容器。

    162 引用 • 529 回帖 • 1 关注
  • Flume

    Flume 是一套分布式的、可靠的,可用于有效地收集、聚合和搬运大量日志数据的服务架构。

    9 引用 • 6 回帖 • 608 关注
  • Kubernetes

    Kubernetes 是 Google 开源的一个容器编排引擎,它支持自动化部署、大规模可伸缩、应用容器化管理。

    109 引用 • 54 回帖
  • 倾城之链
    23 引用 • 66 回帖 • 121 关注
  • Scala

    Scala 是一门多范式的编程语言,集成面向对象编程和函数式编程的各种特性。

    13 引用 • 11 回帖 • 110 关注
  • JavaScript

    JavaScript 一种动态类型、弱类型、基于原型的直译式脚本语言,内置支持类型。它的解释器被称为 JavaScript 引擎,为浏览器的一部分,广泛用于客户端的脚本语言,最早是在 HTML 网页上使用,用来给 HTML 网页增加动态功能。

    713 引用 • 1174 回帖 • 116 关注
  • Shell

    Shell 脚本与 Windows/Dos 下的批处理相似,也就是用各类命令预先放入到一个文件中,方便一次性执行的一个程序文件,主要是方便管理员进行设置或者管理用的。但是它比 Windows 下的批处理更强大,比用其他编程程序编辑的程序效率更高,因为它使用了 Linux/Unix 下的命令。

    122 引用 • 73 回帖
  • Bootstrap

    Bootstrap 是 Twitter 推出的一个用于前端开发的开源工具包。它由 Twitter 的设计师 Mark Otto 和 Jacob Thornton 合作开发,是一个 CSS / HTML 框架。

    18 引用 • 33 回帖 • 680 关注
  • Love2D

    Love2D 是一个开源的, 跨平台的 2D 游戏引擎。使用纯 Lua 脚本来进行游戏开发。目前支持的平台有 Windows, Mac OS X, Linux, Android 和 iOS。

    14 引用 • 53 回帖 • 515 关注
  • B3log

    B3log 是一个开源组织,名字来源于“Bulletin Board Blog”缩写,目标是将独立博客与论坛结合,形成一种新的网络社区体验,详细请看 B3log 构思。目前 B3log 已经开源了多款产品:SymSoloVditor思源笔记

    1083 引用 • 3461 回帖 • 261 关注
  • JVM

    JVM(Java Virtual Machine)Java 虚拟机是一个微型操作系统,有自己的硬件构架体系,还有相应的指令系统。能够识别 Java 独特的 .class 文件(字节码),能够将这些文件中的信息读取出来,使得 Java 程序只需要生成 Java 虚拟机上的字节码后就能在不同操作系统平台上进行运行。

    180 引用 • 120 回帖
  • HBase

    HBase 是一个分布式的、面向列的开源数据库,该技术来源于 Fay Chang 所撰写的 Google 论文 “Bigtable:一个结构化数据的分布式存储系统”。就像 Bigtable 利用了 Google 文件系统所提供的分布式数据存储一样,HBase 在 Hadoop 之上提供了类似于 Bigtable 的能力。

    17 引用 • 6 回帖 • 58 关注
  • GitHub

    GitHub 于 2008 年上线,目前,除了 Git 代码仓库托管及基本的 Web 管理界面以外,还提供了订阅、讨论组、文本渲染、在线文件编辑器、协作图谱(报表)、代码片段分享(Gist)等功能。正因为这些功能所提供的便利,又经过长期的积累,GitHub 的用户活跃度很高,在开源世界里享有深远的声望,并形成了社交化编程文化(Social Coding)。

    207 引用 • 2031 回帖
  • SMTP

    SMTP(Simple Mail Transfer Protocol)即简单邮件传输协议,它是一组用于由源地址到目的地址传送邮件的规则,由它来控制信件的中转方式。SMTP 协议属于 TCP/IP 协议簇,它帮助每台计算机在发送或中转信件时找到下一个目的地。

    4 引用 • 18 回帖 • 604 关注
  • 钉钉

    钉钉,专为中国企业打造的免费沟通协同多端平台, 阿里巴巴出品。

    15 引用 • 67 回帖 • 355 关注