阿里 memcached 客户端 socket 连接池源码分析

本贴最后更新于 3095 天前,其中的信息可能已经渤澥桑田

上篇文章中,我们对阿里memcached客户端主要源码进行了分析,其中socket连接池部分是系统关键部分,涉及到整个客户端运行是否稳定,与服务端连接是否高效,本篇就来分析socket连接池实现的这部分源码。

连接池源码位于com.alisoft.xplatform.asf.cache.memcached.client下的SockIOPool类,该类还包括2个内部类,分别是MaintThread和SockIO。SockIOPool主要功能是用于维护与memcached服务端的持久化连接;并提供初始化连接池,获取、释放连接,以及设置服务器权重,连接池维护等功能。

初始化连接池

在系统启动时,会首先读取缓存服务设置的相关配置文件,读取成功后即开始实例化一个SockIOPool实例,再将pool的相关设置参数赋给该实例,如

//所有memcahed服务端ip地址,以及对应的权重
private String[] servers;
private Integer[] weights;

然后调用初始化逻辑,即initialize方法;该方法中,用一个支持并发的concurrenthashmap初始化一个容器socketpool,容器大小由配置的memcached服务端和初始化连接数决定。

socketPool = new ConcurrentHashMap<String, ConcurrentMap<SockIO, Integer>>(servers.length*initConn);

该容器中存放的是每个server地址,以及对应socket的一个包装内部类SockIO(在该内部类中完成所有的socket相关操作),同时该socket的状态也被记录下来,下面看看初始化代码:

public void initialize()
{
      // check to see if already initialized
      if (initialized && (buckets != null || consistentBuckets != null)&& (socketPool != null))
	{
		log.error("++++ trying to initialize an already initialized pool");
		return;
	}
        //加锁,防止多线程并发问题
	initDeadLock.lock();
	try
	{
	   // check to see if already initialized
	   if (initialized && (buckets != null || consistentBuckets != null)&& (socketPool != null))
	   {
		log.error("++++ trying to initialize an already initialized pool");
		return;
	   }
// pools socketPool = new ConcurrentHashMap&lt;String, ConcurrentMap&lt;SockIO, Integer&gt;&gt;( servers.length * initConn); fastPool = new HashMap&lt;String,SockIO&gt;(); hostDeadDur = new ConcurrentHashMap&lt;String, Long&gt;(); hostDead = new ConcurrentHashMap&lt;String, Date&gt;(); maxCreate = (poolMultiplier &gt; minConn) ? minConn : minConn / poolMultiplier; // only create up to maxCreate // connections at once ....... // if servers is not set, or it empty, then // throw a runtime exception if (servers == null || servers.length &lt;= 0) { log.error("++++ trying to initialize with no servers"); throw new IllegalStateException("++++ trying to initialize with no servers"); } // 初始化hash环结构的同时,创建每个server的socket,具体初始化hash算法可以参见上篇文章 if (this.hashingAlg == CONSISTENT_HASH) //如果是hash一致性算法 populateConsistentBuckets(); else populateBuckets(); // mark pool as initialized this.initialized = true; // 开始执行维护线程,该新线程会根据用户设定的时间间隔(maintsleep)进行连接池的维护工作 if (this.maintSleep &gt; 0) this.startMaintThread(); } finally { initDeadLock.unlock(); }

}

创建sokcet连接

由代码中我们可以看出,初始化hash环结构时,即为每个server初始化了socket连接,创建连接逻辑由createSocket方法负责。

如果给定的server发生故障,或者其他原因,无法创建socket的话,策略就是将其加入故障服务器队列hostDead,并且设置故障过期时间,下次再有需要对该server创建socket时,会先检测hostDead中是否包含该server,以及过期时间是否已经过了,如果包含并且未过过期时间的话,直接返回null,不在创建socket。无论创建是否成功,都会调用addSocketToPool方法将socket放入容器socketPool中。

注意:同一个server,有可能被创建多个socket。

protected <T> boolean addSocketToPool(ConcurrentMap<String, ConcurrentMap<SockIO, T>> pool, String host,SockIO socket, T oldValue,T newValue, boolean needReplace)
{
	.....
	if (!pool.containsKey(host))
	{
	sockets = new ConcurrentHashMap<SockIO, T>();
	pool.putIfAbsent(host, sockets);
	}
	sockets = pool.get(host);
	if (sockets != null)
	{
		if (needReplace)
		{       //对于同一个host,有可能创建多个socket
		    sockets.put(socket, newValue);
		    result = true;
		} 
		else{
			return sockets.replace(socket, oldValue, newValue);
		}
	}
	return result;
}

创建socket的逻辑就是构建一个SockIO对象,默认使用NIO建立socket,部分代码如下:

public SockIO(SockIOPool pool, String host, int timeout,
	           int connectTimeout, boolean noDelay) throws IOException,UnknownHostException
{
	.......
	// 创建真正的sokcet对象,默认使用NIO
 sock = getSocket(host.substring(0,index), Integer.parseInt(host.substring(index+1)), connectTimeout);
if (timeout &gt;= 0) this.sock.setSoTimeout(timeout); // testing only sock.setTcpNoDelay(noDelay); // 包装输入输出流 in = new DataInputStream(sock.getInputStream()); out = new BufferedOutputStream(sock.getOutputStream()); this.host = host;

}

从SocketChannel中获取一个socket连接。

protected static Socket getSocket(String host, int port, int timeout) throws IOException
{
	SocketChannel sock = SocketChannel.open();
	sock.socket().connect(new InetSocketAddress(host, port), timeout);
	return sock.socket();
}

如何获取socket连接

连接池初始化成功,socket也创建完毕,那么下面看看如何获取一个指定server的socket连接。

public SockIO getConnection(String host)
{
      ........
	if (socketPool != null && !socketPool.isEmpty())
	{
		//该host对应的map中可能包含多个socket对象
		Map<SockIO, Integer> aSockets = socketPool.get(host);
//fast check SockIO socket = fastPool.get(host); if (socket != null) { if (isFreeSocket(socket,aSockets)) return socket; } if (aSockets != null &amp;&amp; !aSockets.isEmpty()) { //aSockets中可能会包含一个host的多个socket,随机指定一个 int start = (random.nextInt() % aSockets.size()); if (start &lt; 0) start*= -1; int count = 0; //下面2个for循环,是对整个host对应的所有生成的socket连接进行遍历,随机遍历 for (Iterator&lt;SockIO&gt; i = aSockets.keySet().iterator(); i.hasNext();) { if (count &lt; start){ i.next();count++;continue; } socket = i.next(); //从比起始位置start大的第一个socket开始,判断连接是否可用 if (isFreeSocket(socket,aSockets)) return socket; } //如果没有结果,就从第一个socket开始,逐渐到比start小的第一个socket结束,判断是否可用 for (Iterator&lt;SockIO&gt; i = aSockets.keySet().iterator();i.hasNext();) { if (count &gt; 0) { socket = i.next(); if (isFreeSocket(socket,aSockets)) return socket; count--; }else break; } } } // create one socket -- let the maint thread take care of creating more SockIO socket = createSocket(host); if (socket != null) { addSocketToPool(socketPool, host, socket,SOCKET_STATUS_BUSY,SOCKET_STATUS_BUSY, true); } return socket;

}

获取一个连接,希望通过高效的方式获取到一个合适的socket对象,所有采用了取余的一种算法。

线程池的维护MaintThread

前面说过,初始化连接后,即开始执行维护线程MaintThread,用于在指定时间间隔内维护socketpool,其内部实现其实也是调用SockIOPool的内部方法selfMaint,该方法维护socket连接池的步骤如下:

  • 在socketpool中找出需要建立socket的host,并且计算需要建立几个socket,其实就是根据配置参数minConn进行计算
  • 为每个host创建需要增加的socket实例,创建成功后放入socketpool中
  • 计算所有的空闲状态的活动socket,并且计算每个host的多余的空闲socket实例个数,即大于maxConn的,同时将这些socket状态置为SOCKET_STATUS_DEAD
  • 清理socketpool中所有状态为SOCKET_STATUS_DEAD的socket,从socketpool中删除,当然是先关闭socket再删除

总结

在基于NIO的基础上,该客户端实现了socket的灵活管理,使用多线程做连接池的定期维护,使的连接池始终保持在高校可用的状态;使用合理的包装,以符合分布式memcached缓存的实现需求。

相关帖子

欢迎来到这里!

我们正在构建一个小众社区,大家在这里相互信任,以平等 • 自由 • 奔放的价值观进行分享交流。最终,希望大家能够找到与自己志同道合的伙伴,共同成长。

注册 关于
请输入回帖内容 ...

推荐标签 标签

  • danl
    159 关注
  • CloudFoundry

    Cloud Foundry 是 VMware 推出的业界第一个开源 PaaS 云平台,它支持多种框架、语言、运行时环境、云平台及应用服务,使开发人员能够在几秒钟内进行应用程序的部署和扩展,无需担心任何基础架构的问题。

    5 引用 • 18 回帖 • 182 关注
  • Mobi.css

    Mobi.css is a lightweight, flexible CSS framework that focus on mobile.

    1 引用 • 6 回帖 • 751 关注
  • 星云链

    星云链是一个开源公链,业内简单的将其称为区块链上的谷歌。其实它不仅仅是区块链搜索引擎,一个公链的所有功能,它基本都有,比如你可以用它来开发部署你的去中心化的 APP,你可以在上面编写智能合约,发送交易等等。3 分钟快速接入星云链 (NAS) 测试网

    3 引用 • 16 回帖
  • RIP

    愿逝者安息!

    8 引用 • 92 回帖 • 376 关注
  • AWS
    11 引用 • 28 回帖 • 3 关注
  • 印象笔记
    3 引用 • 16 回帖
  • C++

    C++ 是在 C 语言的基础上开发的一种通用编程语言,应用广泛。C++ 支持多种编程范式,面向对象编程、泛型编程和过程化编程。

    107 引用 • 153 回帖
  • 深度学习

    深度学习(Deep Learning)是机器学习的分支,是一种试图使用包含复杂结构或由多重非线性变换构成的多个处理层对数据进行高层抽象的算法。

    54 引用 • 40 回帖
  • Flume

    Flume 是一套分布式的、可靠的,可用于有效地收集、聚合和搬运大量日志数据的服务架构。

    9 引用 • 6 回帖 • 650 关注
  • 周末

    星期六到星期天晚,实行五天工作制后,指每周的最后两天。再过几年可能就是三天了。

    14 引用 • 297 回帖 • 3 关注
  • frp

    frp 是一个可用于内网穿透的高性能的反向代理应用,支持 TCP、UDP、 HTTP 和 HTTPS 协议。

    20 引用 • 7 回帖 • 2 关注
  • 链滴

    链滴是一个记录生活的地方。

    记录生活,连接点滴

    163 引用 • 3820 回帖 • 1 关注
  • 创业

    你比 99% 的人都优秀么?

    82 引用 • 1395 回帖 • 1 关注
  • Spring

    Spring 是一个开源框架,是于 2003 年兴起的一个轻量级的 Java 开发框架,由 Rod Johnson 在其著作《Expert One-On-One J2EE Development and Design》中阐述的部分理念和原型衍生而来。它是为了解决企业应用开发的复杂性而创建的。框架的主要优势之一就是其分层架构,分层架构允许使用者选择使用哪一个组件,同时为 JavaEE 应用程序开发提供集成的框架。

    943 引用 • 1460 回帖
  • V2EX

    V2EX 是创意工作者们的社区。这里目前汇聚了超过 400,000 名主要来自互联网行业、游戏行业和媒体行业的创意工作者。V2EX 希望能够成为创意工作者们的生活和事业的一部分。

    17 引用 • 236 回帖 • 298 关注
  • Java

    Java 是一种可以撰写跨平台应用软件的面向对象的程序设计语言,是由 Sun Microsystems 公司于 1995 年 5 月推出的。Java 技术具有卓越的通用性、高效性、平台移植性和安全性。

    3192 引用 • 8214 回帖
  • InfluxDB

    InfluxDB 是一个开源的没有外部依赖的时间序列数据库。适用于记录度量,事件及实时分析。

    2 引用 • 86 关注
  • Flutter

    Flutter 是谷歌的移动 UI 框架,可以快速在 iOS 和 Android 上构建高质量的原生用户界面。 Flutter 可以与现有的代码一起工作,它正在被越来越多的开发者和组织使用,并且 Flutter 是完全免费、开源的。

    39 引用 • 92 回帖 • 1 关注
  • SMTP

    SMTP(Simple Mail Transfer Protocol)即简单邮件传输协议,它是一组用于由源地址到目的地址传送邮件的规则,由它来控制信件的中转方式。SMTP 协议属于 TCP/IP 协议簇,它帮助每台计算机在发送或中转信件时找到下一个目的地。

    4 引用 • 18 回帖 • 626 关注
  • jsoup

    jsoup 是一款 Java 的 HTML 解析器,可直接解析某个 URL 地址、HTML 文本内容。它提供了一套非常省力的 API,可通过 DOM,CSS 以及类似于 jQuery 的操作方法来取出和操作数据。

    6 引用 • 1 回帖 • 492 关注
  • gRpc
    11 引用 • 9 回帖 • 81 关注
  • Openfire

    Openfire 是开源的、基于可拓展通讯和表示协议 (XMPP)、采用 Java 编程语言开发的实时协作服务器。Openfire 的效率很高,单台服务器可支持上万并发用户。

    6 引用 • 7 回帖 • 101 关注
  • Sphinx

    Sphinx 是一个基于 SQL 的全文检索引擎,可以结合 MySQL、PostgreSQL 做全文搜索,它可以提供比数据库本身更专业的搜索功能,使得应用程序更容易实现专业化的全文检索。

    1 引用 • 211 关注
  • SQLite

    SQLite 是一个进程内的库,实现了自给自足的、无服务器的、零配置的、事务性的 SQL 数据库引擎。SQLite 是全世界使用最为广泛的数据库引擎。

    5 引用 • 7 回帖 • 1 关注
  • 开源中国

    开源中国是目前中国最大的开源技术社区。传播开源的理念,推广开源项目,为 IT 开发者提供了一个发现、使用、并交流开源技术的平台。目前开源中国社区已收录超过两万款开源软件。

    7 引用 • 86 回帖
  • Linux

    Linux 是一套免费使用和自由传播的类 Unix 操作系统,是一个基于 POSIX 和 Unix 的多用户、多任务、支持多线程和多 CPU 的操作系统。它能运行主要的 Unix 工具软件、应用程序和网络协议,并支持 32 位和 64 位硬件。Linux 继承了 Unix 以网络为核心的设计思想,是一个性能稳定的多用户网络操作系统。

    949 引用 • 943 回帖 • 1 关注