阿里 memcached 客户端 socket 连接池源码分析

本贴最后更新于 3093 天前,其中的信息可能已经渤澥桑田

上篇文章中,我们对阿里memcached客户端主要源码进行了分析,其中socket连接池部分是系统关键部分,涉及到整个客户端运行是否稳定,与服务端连接是否高效,本篇就来分析socket连接池实现的这部分源码。

连接池源码位于com.alisoft.xplatform.asf.cache.memcached.client下的SockIOPool类,该类还包括2个内部类,分别是MaintThread和SockIO。SockIOPool主要功能是用于维护与memcached服务端的持久化连接;并提供初始化连接池,获取、释放连接,以及设置服务器权重,连接池维护等功能。

初始化连接池

在系统启动时,会首先读取缓存服务设置的相关配置文件,读取成功后即开始实例化一个SockIOPool实例,再将pool的相关设置参数赋给该实例,如

//所有memcahed服务端ip地址,以及对应的权重
private String[] servers;
private Integer[] weights;

然后调用初始化逻辑,即initialize方法;该方法中,用一个支持并发的concurrenthashmap初始化一个容器socketpool,容器大小由配置的memcached服务端和初始化连接数决定。

socketPool = new ConcurrentHashMap<String, ConcurrentMap<SockIO, Integer>>(servers.length*initConn);

该容器中存放的是每个server地址,以及对应socket的一个包装内部类SockIO(在该内部类中完成所有的socket相关操作),同时该socket的状态也被记录下来,下面看看初始化代码:

public void initialize()
{
      // check to see if already initialized
      if (initialized && (buckets != null || consistentBuckets != null)&& (socketPool != null))
	{
		log.error("++++ trying to initialize an already initialized pool");
		return;
	}
        //加锁,防止多线程并发问题
	initDeadLock.lock();
	try
	{
	   // check to see if already initialized
	   if (initialized && (buckets != null || consistentBuckets != null)&& (socketPool != null))
	   {
		log.error("++++ trying to initialize an already initialized pool");
		return;
	   }
// pools socketPool = new ConcurrentHashMap&lt;String, ConcurrentMap&lt;SockIO, Integer&gt;&gt;( servers.length * initConn); fastPool = new HashMap&lt;String,SockIO&gt;(); hostDeadDur = new ConcurrentHashMap&lt;String, Long&gt;(); hostDead = new ConcurrentHashMap&lt;String, Date&gt;(); maxCreate = (poolMultiplier &gt; minConn) ? minConn : minConn / poolMultiplier; // only create up to maxCreate // connections at once ....... // if servers is not set, or it empty, then // throw a runtime exception if (servers == null || servers.length &lt;= 0) { log.error("++++ trying to initialize with no servers"); throw new IllegalStateException("++++ trying to initialize with no servers"); } // 初始化hash环结构的同时,创建每个server的socket,具体初始化hash算法可以参见上篇文章 if (this.hashingAlg == CONSISTENT_HASH) //如果是hash一致性算法 populateConsistentBuckets(); else populateBuckets(); // mark pool as initialized this.initialized = true; // 开始执行维护线程,该新线程会根据用户设定的时间间隔(maintsleep)进行连接池的维护工作 if (this.maintSleep &gt; 0) this.startMaintThread(); } finally { initDeadLock.unlock(); }

}

创建sokcet连接

由代码中我们可以看出,初始化hash环结构时,即为每个server初始化了socket连接,创建连接逻辑由createSocket方法负责。

如果给定的server发生故障,或者其他原因,无法创建socket的话,策略就是将其加入故障服务器队列hostDead,并且设置故障过期时间,下次再有需要对该server创建socket时,会先检测hostDead中是否包含该server,以及过期时间是否已经过了,如果包含并且未过过期时间的话,直接返回null,不在创建socket。无论创建是否成功,都会调用addSocketToPool方法将socket放入容器socketPool中。

注意:同一个server,有可能被创建多个socket。

protected <T> boolean addSocketToPool(ConcurrentMap<String, ConcurrentMap<SockIO, T>> pool, String host,SockIO socket, T oldValue,T newValue, boolean needReplace)
{
	.....
	if (!pool.containsKey(host))
	{
	sockets = new ConcurrentHashMap<SockIO, T>();
	pool.putIfAbsent(host, sockets);
	}
	sockets = pool.get(host);
	if (sockets != null)
	{
		if (needReplace)
		{       //对于同一个host,有可能创建多个socket
		    sockets.put(socket, newValue);
		    result = true;
		} 
		else{
			return sockets.replace(socket, oldValue, newValue);
		}
	}
	return result;
}

创建socket的逻辑就是构建一个SockIO对象,默认使用NIO建立socket,部分代码如下:

public SockIO(SockIOPool pool, String host, int timeout,
	           int connectTimeout, boolean noDelay) throws IOException,UnknownHostException
{
	.......
	// 创建真正的sokcet对象,默认使用NIO
 sock = getSocket(host.substring(0,index), Integer.parseInt(host.substring(index+1)), connectTimeout);
if (timeout &gt;= 0) this.sock.setSoTimeout(timeout); // testing only sock.setTcpNoDelay(noDelay); // 包装输入输出流 in = new DataInputStream(sock.getInputStream()); out = new BufferedOutputStream(sock.getOutputStream()); this.host = host;

}

从SocketChannel中获取一个socket连接。

protected static Socket getSocket(String host, int port, int timeout) throws IOException
{
	SocketChannel sock = SocketChannel.open();
	sock.socket().connect(new InetSocketAddress(host, port), timeout);
	return sock.socket();
}

如何获取socket连接

连接池初始化成功,socket也创建完毕,那么下面看看如何获取一个指定server的socket连接。

public SockIO getConnection(String host)
{
      ........
	if (socketPool != null && !socketPool.isEmpty())
	{
		//该host对应的map中可能包含多个socket对象
		Map<SockIO, Integer> aSockets = socketPool.get(host);
//fast check SockIO socket = fastPool.get(host); if (socket != null) { if (isFreeSocket(socket,aSockets)) return socket; } if (aSockets != null &amp;&amp; !aSockets.isEmpty()) { //aSockets中可能会包含一个host的多个socket,随机指定一个 int start = (random.nextInt() % aSockets.size()); if (start &lt; 0) start*= -1; int count = 0; //下面2个for循环,是对整个host对应的所有生成的socket连接进行遍历,随机遍历 for (Iterator&lt;SockIO&gt; i = aSockets.keySet().iterator(); i.hasNext();) { if (count &lt; start){ i.next();count++;continue; } socket = i.next(); //从比起始位置start大的第一个socket开始,判断连接是否可用 if (isFreeSocket(socket,aSockets)) return socket; } //如果没有结果,就从第一个socket开始,逐渐到比start小的第一个socket结束,判断是否可用 for (Iterator&lt;SockIO&gt; i = aSockets.keySet().iterator();i.hasNext();) { if (count &gt; 0) { socket = i.next(); if (isFreeSocket(socket,aSockets)) return socket; count--; }else break; } } } // create one socket -- let the maint thread take care of creating more SockIO socket = createSocket(host); if (socket != null) { addSocketToPool(socketPool, host, socket,SOCKET_STATUS_BUSY,SOCKET_STATUS_BUSY, true); } return socket;

}

获取一个连接,希望通过高效的方式获取到一个合适的socket对象,所有采用了取余的一种算法。

线程池的维护MaintThread

前面说过,初始化连接后,即开始执行维护线程MaintThread,用于在指定时间间隔内维护socketpool,其内部实现其实也是调用SockIOPool的内部方法selfMaint,该方法维护socket连接池的步骤如下:

  • 在socketpool中找出需要建立socket的host,并且计算需要建立几个socket,其实就是根据配置参数minConn进行计算
  • 为每个host创建需要增加的socket实例,创建成功后放入socketpool中
  • 计算所有的空闲状态的活动socket,并且计算每个host的多余的空闲socket实例个数,即大于maxConn的,同时将这些socket状态置为SOCKET_STATUS_DEAD
  • 清理socketpool中所有状态为SOCKET_STATUS_DEAD的socket,从socketpool中删除,当然是先关闭socket再删除

总结

在基于NIO的基础上,该客户端实现了socket的灵活管理,使用多线程做连接池的定期维护,使的连接池始终保持在高校可用的状态;使用合理的包装,以符合分布式memcached缓存的实现需求。

相关帖子

欢迎来到这里!

我们正在构建一个小众社区,大家在这里相互信任,以平等 • 自由 • 奔放的价值观进行分享交流。最终,希望大家能够找到与自己志同道合的伙伴,共同成长。

注册 关于
请输入回帖内容 ...

推荐标签 标签

  • Mobi.css

    Mobi.css is a lightweight, flexible CSS framework that focus on mobile.

    1 引用 • 6 回帖 • 753 关注
  • 书籍

    宋真宗赵恒曾经说过:“书中自有黄金屋,书中自有颜如玉。”

    78 引用 • 391 回帖
  • SOHO

    为成为自由职业者在家办公而努力吧!

    7 引用 • 55 回帖 • 2 关注
  • 互联网

    互联网(Internet),又称网际网络,或音译因特网、英特网。互联网始于 1969 年美国的阿帕网,是网络与网络之间所串连成的庞大网络,这些网络以一组通用的协议相连,形成逻辑上的单一巨大国际网络。

    98 引用 • 345 回帖
  • GitBook

    GitBook 使您的团队可以轻松编写和维护高质量的文档。 分享知识,提高团队的工作效率,让用户满意。

    3 引用 • 8 回帖
  • 大数据

    大数据(big data)是指无法在一定时间范围内用常规软件工具进行捕捉、管理和处理的数据集合,是需要新处理模式才能具有更强的决策力、洞察发现力和流程优化能力的海量、高增长率和多样化的信息资产。

    93 引用 • 113 回帖 • 1 关注
  • Git

    Git 是 Linux Torvalds 为了帮助管理 Linux 内核开发而开发的一个开放源码的版本控制软件。

    211 引用 • 358 回帖
  • SpaceVim

    SpaceVim 是一个社区驱动的模块化 vim/neovim 配置集合,以模块的方式组织管理插件以
    及相关配置,为不同的语言开发量身定制了相关的开发模块,该模块提供代码自动补全,
    语法检查、格式化、调试、REPL 等特性。用户仅需载入相关语言的模块即可得到一个开箱
    即用的 Vim-IDE。

    3 引用 • 31 回帖 • 109 关注
  • 周末

    星期六到星期天晚,实行五天工作制后,指每周的最后两天。再过几年可能就是三天了。

    14 引用 • 297 回帖
  • 持续集成

    持续集成(Continuous Integration)是一种软件开发实践,即团队开发成员经常集成他们的工作,通过每个成员每天至少集成一次,也就意味着每天可能会发生多次集成。每次集成都通过自动化的构建(包括编译,发布,自动化测试)来验证,从而尽早地发现集成错误。

    15 引用 • 7 回帖 • 1 关注
  • Chrome

    Chrome 又称 Google 浏览器,是一个由谷歌公司开发的网页浏览器。该浏览器是基于其他开源软件所编写,包括 WebKit,目标是提升稳定性、速度和安全性,并创造出简单且有效率的使用者界面。

    62 引用 • 289 回帖
  • Dubbo

    Dubbo 是一个分布式服务框架,致力于提供高性能和透明化的 RPC 远程服务调用方案,是 [阿里巴巴] SOA 服务化治理方案的核心框架,每天为 2,000+ 个服务提供 3,000,000,000+ 次访问量支持,并被广泛应用于阿里巴巴集团的各成员站点。

    60 引用 • 82 回帖 • 601 关注
  • 钉钉

    钉钉,专为中国企业打造的免费沟通协同多端平台, 阿里巴巴出品。

    15 引用 • 67 回帖 • 316 关注
  • Ngui

    Ngui 是一个 GUI 的排版显示引擎和跨平台的 GUI 应用程序开发框架,基于
    Node.js / OpenGL。目标是在此基础上开发 GUI 应用程序可拥有开发 WEB 应用般简单与速度同时兼顾 Native 应用程序的性能与体验。

    7 引用 • 9 回帖 • 403 关注
  • OpenResty

    OpenResty 是一个基于 NGINX 与 Lua 的高性能 Web 平台,其内部集成了大量精良的 Lua 库、第三方模块以及大多数的依赖项。用于方便地搭建能够处理超高并发、扩展性极高的动态 Web 应用、Web 服务和动态网关。

    17 引用 • 39 关注
  • 星云链

    星云链是一个开源公链,业内简单的将其称为区块链上的谷歌。其实它不仅仅是区块链搜索引擎,一个公链的所有功能,它基本都有,比如你可以用它来开发部署你的去中心化的 APP,你可以在上面编写智能合约,发送交易等等。3 分钟快速接入星云链 (NAS) 测试网

    3 引用 • 16 回帖 • 2 关注
  • HTML

    HTML5 是 HTML 下一个的主要修订版本,现在仍处于发展阶段。广义论及 HTML5 时,实际指的是包括 HTML、CSS 和 JavaScript 在内的一套技术组合。

    107 引用 • 295 回帖 • 1 关注
  • Redis

    Redis 是一个开源的使用 ANSI C 语言编写、支持网络、可基于内存亦可持久化的日志型、Key-Value 数据库,并提供多种语言的 API。从 2010 年 3 月 15 日起,Redis 的开发工作由 VMware 主持。从 2013 年 5 月开始,Redis 的开发由 Pivotal 赞助。

    286 引用 • 248 回帖 • 39 关注
  • OneDrive
    2 引用
  • Notion

    Notion - The all-in-one workspace for your notes, tasks, wikis, and databases.

    10 引用 • 74 回帖
  • 外包

    有空闲时间是接外包好呢还是学习好呢?

    26 引用 • 232 回帖
  • 小说

    小说是以刻画人物形象为中心,通过完整的故事情节和环境描写来反映社会生活的文学体裁。

    30 引用 • 108 回帖
  • Mac

    Mac 是苹果公司自 1984 年起以“Macintosh”开始开发的个人消费型计算机,如:iMac、Mac mini、Macbook Air、Macbook Pro、Macbook、Mac Pro 等计算机。

    166 引用 • 595 回帖
  • Android

    Android 是一种以 Linux 为基础的开放源码操作系统,主要使用于便携设备。2005 年由 Google 收购注资,并拉拢多家制造商组成开放手机联盟开发改良,逐渐扩展到到平板电脑及其他领域上。

    334 引用 • 323 回帖
  • 安装

    你若安好,便是晴天。

    132 引用 • 1184 回帖
  • ZooKeeper

    ZooKeeper 是一个分布式的,开放源码的分布式应用程序协调服务,是 Google 的 Chubby 一个开源的实现,是 Hadoop 和 HBase 的重要组件。它是一个为分布式应用提供一致性服务的软件,提供的功能包括:配置维护、域名服务、分布式同步、组服务等。

    59 引用 • 29 回帖 • 9 关注
  • 开源中国

    开源中国是目前中国最大的开源技术社区。传播开源的理念,推广开源项目,为 IT 开发者提供了一个发现、使用、并交流开源技术的平台。目前开源中国社区已收录超过两万款开源软件。

    7 引用 • 86 回帖 • 1 关注