0%

核心概念

战术设计(Tactic DDD): Entity, Value Object; Aggregate, Root Entity, Service, Domain Event; Factory, Repository
战略设计(Strategic DDD): Bounded Context, Context Map; Published Language, Shared Kernel, Open Host Service, Customer-Supplier, Conformist, Anti Corruption Layer (context relationship types)

Entity(实体)
  • 存在 2 点特征
    • 唯一标志:当一些对象不是由属性定义,而是由一个唯一标志定义的话,我们就可以认为它是一个实体。
    • 连续性:对象的连续性体现在对象是有生命周期的。
  • 由上2点可以看出,实体并非一定是映射到我们现实世界的某个具体事物
  • 生成实体唯一标识的 4 种方法
    • 用户提供一个或者多个初始唯一值作为输入时
    • 程序内部通过某种算法自动生成身份标识,例如UUID、雪花ID等
    • 程序依赖于持久化存储,比如数据库生成的自增主键
    • 通过其他的限界上下文决定出的唯一标识,作为程序的输入。
  • 实体不变性
    • 一个实体维护了一个或者多个不变条件
    • 不变条件主要是由聚合所关注
Value Object(值对象)
  • 当我们只关心一个模型元素的属性时,应把它归类为值对象。
  • 它度量或者描述了领域中的一件东西。
  • 它将不同的相关属性组合成了一个概念整体。
  • 它可以和其他值对象进行相等性比较。
  • 值对象应该是不可变的,不要为它分配任何标识,不要将它设计得跟实体一样复杂。
  • 值对象应该具有无副作用性
Aggregate(聚合)& Root Entity(根对象)
  • 描述
    • 每个聚合都有一个根和一个边界,边界内定义了聚合的内部有什么。”根” 是聚合所包含的一个特定的实体。
    • 外部对象可以引用根,但不能引用聚合内部的其他对象,聚合内的对象之间可以相互引用,除了根实体外,其他实体拥有本地标识。
  • 定义
    • 我们应该将实体和值对象分门别类的聚集到聚合当中,并定义聚合的边界。并通过根来控制边界内其他对象的所有访问。只允许外部对象保持对根的引用。对内部成员的临时引用可以被传递出去,但仅在一次操作中有效。
  • 不变性和一致性边界即是聚合的设计依据和精髓。
    • 这里的不变性指的是业务规则,该规则应该始终保持一致
    • 一致性边界的意思是单个事务的修改范围。 原则上我们应该在一个事务里只修改一个聚合。
  • 聚合的作用
    • 为了维护对象生命周期内的完整性
    • 通过定义清晰的所属关系和边界,在这个边界中的模型元素在生命周期内必须维护一致性,通俗的讲就是业务规则。
  • 聚合特征
    • 根实体具有全局的标识,它最终负责检查固定规则。
    • 边界内的实体具有本地标识,这些标识只在聚合内部才是唯一的。
    • 聚合外部的对象不能引用根实体之外的聚合内部对象。根实体可以将内部实体的引用传递给它们,但只能临时使用。或者传递一个值对象的副本出去,而不用关心它发生了什么变化。
    • 只有根实体才能直接通过数据库直接查询,其他对象必须通过遍历关联来发现。
    • 根实体可以保持其他根实体的引用。
    • 当对聚合边界内的任何对象做了修改时,整个聚合的所有固定规则都必须被满足。
  • 原则
    • 通过唯一标识去引用其他聚合
      • 引用聚合和被引用的聚合不可以在同一个事务中进行修改
      • 如果你在试图在单个事务中修改多个聚合,这往往意味着此时的一致性边界是错误的,发生这样的情况通常是我们遗留了某些建模点,或者尚未发现通用语言中的某个概念。
      • 当试图修改多个聚合的话,我们也应该采用最终一致性而非原子一致性。
    • 利用应用层来处理聚合内的依赖关系,避免在聚合中使用资源库或者领域服务。
    • 边界之外使用最终一致性
Service(领域服务)
  • 三个特征
    • 它是与领域相关的操作如执行一个显著的业务操作过程,但它又并不适合放入实体与值对象中。
    • 操作是无状态的。
    • 对领域对象进行转换,或以多个领域对象作为输入进行计算,结果产生一个值对象。
  • 区分不同的服务
    • 应用服务:获取输入,发送消息给领域层,监听确认消息,决定使用基础服务来发送邮件。
    • 领域服务:协调账户模型和总账模型进行交互,执行相应的领域行为。
    • 基础服务:按照应用服务的指示发送邮件。
  • 粒度
    • 我们应该尽量避免领域知识泄露到应用层当中去。那此时领域服务就不失为一种良好的处理方式,通过将细粒度的领域对象封装到领域服务当中去,将领域知识限制在领域服务当中,形成粗粒度的领域对象。
  • 转换过程

Domain Event(领域事件)
  • 领域专家所关心的发生在领域中的一些事件。将领域中所发生的活动建模成一系列的离散事件。每个事件都用领域对象来表示…领域事件是领域模型的组成部分,表示领域中所发生的事情。

  • 一个领域事件必须对业务有价值,有助于形成完整的业务闭环,也即一个领域事件将导致进一步的业务操作。

  • 事件风暴

    • 事件风暴是一项团队活动,旨在通过领域事件识别出聚合根,进而划分微服务的限界上下文。

      在活动中,团队先通过头脑风暴的形式罗列出领域中所有的领域事件,整合之后形成最终的领域事件集合,然后对于每一个事件,标注出导致该事件的命令(Command),再然后为每个事件标注出命令发起方的角色,命令可以是用户发起,也可以是第三方系统调用或者是定时器触发等。最后对事件进行分类整理出聚合根以及限界上下文。

Factory
  • why
    • 隐藏创建的细节
    • 对内,复杂对象除了本身生命周期的维护外,如果再承担自身的创建,会导致负载过重
    • 对外,不需要客户理解对象创建过程
  • 定义
    • 复杂对象的创建是领域层的职责,但这项任务并不一定属于那些用于表示模型的对象,他们没有对应模型中的事物,但又确实承担了领域层的职责。
    • 应该将创建复杂对象和聚合的职责转移给单独的对象,这个对象本身可能没有承担领域模型中的职责,但它仍然领域设计的一部分。
    • 在创建聚合时要把它作为一个整体,并确保它满足固定规则。
  • 设计要点
    • 每个创建方法都应该是原子的,并保证生成的对象处于一致的状态。
    • 可以使用独立的工厂或者在聚合根上使用工厂方法。
    • 工厂方法的参数应该是较低层的对象。
    • 非必要场景,直接构造函数即可
      • 类仅仅是一种类型,没有其他子类,没有实现多态性。
      • 客户关心的是实现类。
      • 客户可以访问对象的所有属性,因此向客户公开的构造函数中没有嵌套的对象创建。
      • 构造过程很简单。
      • 公共构造函数必须遵守与工厂相同的规则,必须是原子操作且满足所有固定规则。
      • 不要在构造函数中调用其他构造函数,应保持构造函数的简单。
Repository
  • 实现
    • 对类型抽象
    • 充分利用与客户进行解藕
    • 事务的控制权交给客户
  • 优点
    • 为客户提供了一个简单的模型,可用来获取持久化对象并管理他们的生命周期
    • 他们将应用程序和领域设计与持久化技术进行解耦
    • 它们体现了有关对象访问的设计决策
    • 很容易测试,将利用集合直接替换资源库进行测试
Bounded Context (限定上下文)
  • 领域、子域、限界上下文
    • 领域即是一个组织所做的事情以及其中所包含的一切
    • 模型只在限界上下文中变动,不影响其他限界上下文,将变动的影响范围控制在单个限界上下文中
    • 一般来说,一个子域对应一个限界上下文,但是子域并不一定与限界上下文一一对应
    • 领域种类划分
      • 核心域:公司主要的业务领域,比如生鲜的商品子域以及订单子域(核心域并不绝对)
      • 支撑子域:公司的库存帮助公司完成销售。他们就属于支撑子域。
      • 通用子域:会员子域,在许多的网上购物平台上都会使用到的会员体系。它属于通用子域。
  • 限界上下文是一个显式的边界,领域存在于这个边界之内。
Context Map(上下文映射)
image-20210311142539037
  • 继承方式
    • RPC
    • Restful等api
    • 消息队列
  • 种类
    • 共享内核-Shared Kernel
    • 客户/供应商-Customer/Supplier
    • 追随者-Conformist
    • 防腐层-Anticorruption Layer
    • 公开主机服务-Open Host Service
    • 各行其道 - Separate Way
    • 合作关系 - Partnership

设计方法

分层架构
image-20210311111555717
  • 基本原则:每层只能和位于下方的层产生耦合
    • 严格分层架构:每层只能与直接位于下方的层发生耦合
    • 松散分层架构:任意上层和任意下层发生耦合
    • 可以通过观察者等模式,让下层和上层发生耦合
六边形架构
image-20210311111942873
  • 六边形架构 其实是分层架构的一种扩展,是原来分层架构的另外一种解读,是一种端口+适配器风格的架构
  • 通过端口+适配器将领域包裹起来, 形成清晰的应用程序边界
SOA
  • SOA原则
    • 服务契约
    • 松耦合
    • 服务抽象
    • 服务重用性
    • 服务自治性
    • 服务无状态性
    • 服务可发现性
    • 服务组合性
  • SOA 精神所在
    • 业务价值高于技术策略
    • 战略目标高于项目利益
RESTful
  • 将RESTful和DDD结合的2种方式
    • 为系统接口层单独创建一个限界上下文
    • 使用标准媒体类型的时候,如果某种媒体类型不用于支持单个系统接口,我们可以可以创建一个领域模型来处理。
CQRS
事件驱动

ContextMapper

image-20210315200928557
ContextMap
  • contextMap 只有2种type
    • SYSTEM_LANDSCAPE 这个是从系统层面描述 ContextMap
    • ORGANIZATIONAL 这个是从组织团队层面描述 ContextMap
BoundedContext
  • 一个BoundedContext 可以实现多个domain
  • 一个BoundedContext 可以 refines 另外一个
  • BoundedContext 类型有4种
    • FEATURE
    • APPLICATION
    • SYSTEM
    • TEAM
  • Knowage level 有2个
    • CONCRETE
    • META
Domain and Subdomain
image-20210315201429551
  • Subdomain 类型
    • CORE_DOMAIN
    • SUPPORTING_DOMAIN
    • GENERIC_SUBDOMAIN

整体架构

image-20210228184558960

ha3本身是阿里系针对自己的场景自己研发的搜索引擎平台,也是基于自身的技术积累之上构建的,包括依赖的系统和代码库,都是自研自足的。经历了近10年的发展,也经受了核心场景双十一的考验,已经是一套非常完善成熟的系统,值得学习和研究。 图中为ha3的基本架构,比较简洁,主要分为数据源聚合(俗称 dump)、全量/增量/实时索引构建及在线服务等部分,其中数据源聚合在 tisplus 平台和 Blink 平台完成,核心有以下几个模块:

  • QRS
    • 输入的查询解析/校验,转发searcher
    • searcher 结果合并加工返回用户
  • Searcher
    • 文档召回服务,包含打分/排序/summary
  • Build Service
    • 全量/增量/实时索引构建,提供给在线服务使用
  • 其他
    • hippo: 调度系统,分配机器
    • suez/suze_ops,引擎管控/任务分发
    • deploy express,用于分发包,索引,配置等数据
    • swift,消息队列
    • cm2/vipserver,域名解析/服务发现

整个ha3作为一个完善的搜索引擎,方方面面都很涉及,本文主要围绕索引和检索两个过程进行讨论,其他的包括插件、检索语法、配置、运维等方面,不在本文叙述。

索引

索引的作用是为了增加检索速度,ha3索引主要是基于indexlib库构建的。indexlib的索引类型支持 index索引/kv索引/kkv索引/时序索引等。

在我们搜索场景中,主要使用index索引,index索引主要用于基于关键词进行文档检索召回的场景,并对召回的文档基于文档属性进行进一步的过滤、统计、排序等操作。

index索引是基于文档进行的,每个文档都会有一个docid(docid类型为int32_t,所以最多支持20亿文档)。 而每个文档都是由多个field组成,每个field会对应为:主键索引(primary key index)、倒排索引(index)、正排索引(attribute)、摘要(summary)

本章主要针对index索引,介绍其索引的结构和构建的过程,以便我们更好的理解索引的作用,以及后面的检索过程。

索引格式

image-20210228195329797

上图是整个索引文件列表,针对目录的说明如下:

结构名称 说明
generation generation_x是引擎区分不同版本全量索引的标识。
partition partition是searcher加载索引的基本单位。如果一个partition中数据过多,会导致searcher性能降低。线上数据一般通过划分多个partition的方式来保证每个searcher的检索效率。
segment segment是索引组成的基本单位。segment中包含了文档的倒排和正排结构。index builder每次dump都会生成一个segment。多个segment可以通过merge策略进行合并。一个partition中可用的segment在version文件中指明。
index 倒排索引的基本单位。
attribute 正排索引的基本单位。
deletionmap 删除的doc记录。
truncate_meta 截断索引meta数据文件(Index表倒排截断场景 )。
adaptive_bitmap_meta 自适应bitmap高频词表文件(Index表倒排应用adaptive_bitmap场景)。

涉及到的文件如下:

文件名称 存储内容
index_format_version 索引的版本信息。用于检查索引文件是否符合binary要求。
index_partition_meta 存储了全局排序的信息。包括排序字段和升降序。
schema.json 索引配置文件。主要记录fields,index, attribute 和summary等信息。引擎通过该文件来加载索引。
version.0 version文件。主要记录当前partition中引擎需要加载的segment和最新doc的时间戳。在实时build中,引擎会根据增量version的时间戳过滤旧的原始文档。
segment_info segment信息摘要。记录了当前segment中文档数目,当前segment是否merge过,locator信息和最新doc时间戳信息。

倒排索引

image-20210228205126548

倒排索引核心解决的问题是建立关键词到doc_id的映射, 通过倒排索引,我们可以快速获取相关的文档列表,以及倒排词在文档中的位置词频等信息。 整个检索流程可以参考上图,先通过词典文件查询索引词在索引文件中的位置,在对应位置获取关联的文档列表信息,以及关键词和关键词在文档中的信息。

除了上图存在的内容外,还存在一个截断索引的概念,这是一种辅助索引,对于原始索引中高频词的倒排链,按照某些feature,截取权重较高的文档形成截断索引,提高检索速度。实际应用中,可以更具多个截断方式,生成多条截断链。

此外,在ha3中,倒排索引记录的主要信息如下, 对于不同的索引(NUMBER/TEXT/PACK/EXPACK/PRIMARYKEY64/RANGE/SPATIAL),支持也不一样,具体参看文档:

信息名称 描述
ttf 全称:total term frequency, 表示检索词在所有文档中出现的总次数
df 全称:document frequency, 表示包含检索词的文档总数
tf 全称:term frequency, 表示检索词在文档中出现的次数
docid 全称:document id, 是文档在引擎中的唯一标识,可以通过docid获取到原文档的其他信息
fieldmap 全称:field map, 用于记录包含检索词的field信息
section 信息 用户可以为某些文档分段,然后为每一段添加附属信息。该信息可以在检索时取出,供后续处理使用
position 用于记录检索词在文档中的位置信息
positionpayload 全称:position payload, 用户可以为文档不同位置设置payload信息,并可以在检索时取出,供后续处理用
docpayload 全称:document payload, 用户可以为某些文档添加附属信息,并可以在检索时取出,供后续处理使用
termpayload 全称:term payload, 用户可以为某些词添加附属信息,并可以在检索时取出,供后续处理使用

正排索引

正排索引主要是简历 doc_id ->field 的映射,主要用于检索到 doc_id 后,可以根据 doc_id 快速获取关键字段的值用来统计、排序、过滤。 正派索引支持的字段类型主要包括单值类型和多值类型:

  • 单值类型
    • 只有一个data文件, 其为每一个doc分配固定大小的空间,用来存储对应正排字段的取值,可以通过docID直接定位到data文件中该doc对应信息的存储位置,完成获取信息的操作
  • 多值类型
    • 有两个文件——data文件和offset文件,其中data文件存储着对应正排字段的字段值信息,offset文件记录了doc对应在data中的偏移量,它为每个doc按照doc顺序分配固定大小的空间,来存储其在data文件中的偏移量,从而获取到对应的正排字段信息

摘要索引

摘要索引将一片文档对应的信息存储在一起,通过docID可以定位该文档信息的存储位置,从而为用户提供摘要信息的存储获取服务。

摘要索引只有两个文件——data文件和offset文件。 通过offset可以直接定位到data中doc的信息。

索引构建

从构建的角度来说,索引分为全量索引/增量索引/实时索引。ha3 通过 buildservice 来构建索引,基本的流程如下:

image-20210228214152893

全量和增量索引属于离线索引,离线索引依赖于三类BS内部的worker:

  • Processor 从数据源获取原始文档,经过文档处理插件后,转换为Processed文档写入SWIFT中转topic
  • Builder 负责从SWIFT中读取Processed文档,根据此文档构建索引写入到文件系统。
  • Merger 对Builder产出全量索引或增量索引文件,按照合并策略进行索引合并,并将产出后的索引写到文件系统中。

实时索引属于在线索引,构建在在线服务内部,通过RealtimeBuilder模块,直接从中转swift topic(s) 中读取数据(对应图中的实时数据流)、构建索引。可以达到秒级延迟。

索引加载

索引的加载方式目前只支持mmap加载和blockcache加载 2 种方式:

  • mmap 加载

    通过系统调用mmap将索引文件映射到进程内存地址空间中。加载过程支持mmap lock到内存来保证全内存场景下数据读取完全不读取磁盘数据;也支持mmap非lock场景加载,由操作系统进行内存页缓存管理(采用系统cache方式)。

  • blockcache 加载

    通过blockcache加载模式,可以将索引文件读取的热数据缓存到blockcache中,减少磁盘读取操作。同时数据淘汰策略采用了lru策略,加载和淘汰更加可控(对比mmap非lock的系统cache)。

检索

image-20210228221141255

检索流程更多是在线过程,整体流程参看照爷整理的流程图,如上图。检索过程主要分为2个阶段,一阶段负责文档的召回和排序,二阶段则是summary的信息不全。

对于多集群召回的结果,默认支持去重机制。 整体查询支持一些高级查询:

  • 一阶段/二阶段的独立查询
  • 索引分层,第一层数量不够后重查后面几层
  • 结果数不同重查机制:
    • 返回的结果数小于research_threshold,同时过程中触发了有可能影响结果数优化的逻辑,才会触发重查
    • 重查时,会关掉所有对结果数有影响的优化逻辑
  • 分层查询
  • 截断链查询

此外因为属于在线服务,在每个阶段都会有相应的指标监控,具体如下:

  • qrsSessionLatencyIndependentPhase1 : qrs 创建session -> end session

  • qrsProcessLatencyIndependentPhase1 : qrs begin session -> end session

  • searcherProcessLatencyPhase1 : searcher begin search -> end serach

  • sessionLatencyPhase1 : searcher 创建 session -> end search

  • afterRunGraphLatency :search图执行完 -> end serach

  • afterSearchLatency : searcher final sort完 -> end serach

  • beforeRunGraphLatency : searcher begin search -> 图中第一个节点IsPhaseOneOp开始

  • beforeSearchLatency : searcher begin search -> 图中seek op开始 multi layer search

  • extraRankLatency : Ha3SorterOp(final sort)耗时

  • mergeLatencyPhase1 : qrs result merge, run qrs graph耗时

  • rankLatency : multi layer search

  • rerankLatency :rerank 耗时, rankLatency和rerankLatency加起来是seek op 耗时

  • runGraphLatency :search 图执行耗时

附录

背景

Blink 其实是阿里内部基于 flink 改造的系统,学习 Blink 首先要学习 flink 相关的知识,先了解一下 flink 产生的背景和要解决的问题。

在大数据开始爆发的时代,mapreduce 作为初代的计算引擎,提供了分布式计算的核心思想,map&reduce的分阶段处理。但是 map reduce 本质上还是批处理计算框架,随着业务发展,流式数据计算处理的需求越来越旺盛,storm 等应运而生,但是作为第一代流式计算框架,存在一些设计上的缺陷,包括 exactly once等语义的支持。而且批处理和流处理作为处理同一个业务逻辑的两套系统,需要维护两套代码,并不是很友好。而这个时候 spark也发展起来了,集批处理,流处理,SQL 功能,图计算,机器学习于一身,并且支持 SparkR 和 PySpark 来做科学计算。而同时支持流处理和批处理的计算引擎,只有2个选择,除了spark 便是 flink。

spark 批流统一处理的核心思想其实是用批来模拟流的能力,而flink则完全相反,使用流计算来模拟批计算。flink在捐赠给apache后定位的主流方向便是 Streaming, 区别于Storm,Spark Streaming以及其他流式计算引擎的是:它不仅是一个高吞吐、低延迟的计算引擎,同时还提供很多高级的功能。比如它提供了有状态的计算,支持状态管理,支持强一致性的数据语义以及支持 Event Time,WaterMark 对消息乱序的处理。

最后,回归到官网对它的定义:

1
Apache Flink 是一个框架和分布式处理引擎,用于在无边界和有边界数据流上进行有状态的计算。Flink 能在所有常见集群环境中运行,并能以内存速度和任意规模进行计算。

关于应用场景, 官方主要给了3类应用和相应的事例可以参看:

设计架构

image-20210218205017997

Client 构建dataflow graph 已经数据流下放到 jobmanager 去调度 taskmanager 执行。Flink 运行时由2种进程组成,JobManager 和 TaskManager。

JobManager

JobManager 负责task的调度、失败处理、协调处理checkpoint等,主要由3个组件构成:

  • ResourceManager:负责资源的申请、分配、回收(适配 YARN、Mesos、Kubernetes等)。管理TaskManager的Task Slots
  • Dispacher:提供REST接口,用于提交Flink应用程序执行,并且为每一个Task启动一个新的JobMaster,同时提供一个WebUI反应作业执行情况。
  • JobMaster: 负责管理单个JobGraph的执行。Flink 集群中可以同时运行多个作业,每个作业都有自己的 JobMaster。JobMaster可以有多个,其中一个事leader 其他的为Standby。JobMaster的HA有2种实现方式:Zookeeper 和 Kubernetes。

TaskManager

TaskManager 执行作业流的task,并且缓存和交换数据流。 本质上是一个 JVM 进程,可以在单独的线程中执行一个或多个 subtask,为了控制TaskManager接受多个task,也就是所谓的task slots。

Task、Subtask、Slot 是 3 个不同层面的概念

slot是独立资源,如果一个taskmanager有3个slot,那么每个slot独立分配1/3内存。同时同一个taskmanager下的slot共享同一个JVM。同一个JVM的多个task共享TCP链接和心跳信息。

默认允许 SubTask 共享 slot, 即便是不同的task的subtask,只要是一个作业就可以。也就是说,一个slot可以持有整个作业的通道。

执行过程

Flink的执行会被提交到3种类型的集群执行,其主要的差异在于执行的生命周期和资源的隔离保证:

  • Flink Session 集群
    • 生命周期:集群是长期运行的,可接受多个作业提交
    • 资源隔离:多个作业共享集群资源,如果 TaskManager 或者 JobManager 出问题,会影响所有集群运行
  • Flink Job 集群
    • 生命周期:Yarn或者k8s 为每个作业启动一个集群,作业结束,则集群结束
    • 资源隔离:作业独占资源
  • Flink Application 集群
    • 生命周期:专用的flink集群,运行不是通过作业提交的方式运行,而是以application的方式运行。
    • 资源隔离: ResourceManager 和 Dispatcher 作用于单个的 Flink 应用程序,相比于 Flink Session 集群,它提供了更好的隔离。

核心概念

Flink的四大基石:

  • Checkpoint: 快照;高容错
  • State: 状态
  • Time: 通过WaterMark支持基于Event Time的时间窗口
  • Window: 窗口机制

Checkpoint

image-20210228143710944

checkpoint是一种通过快找容错恢复的机制,这种机制保证实时程序运行的时候,即便出现异常,也可以自我恢复。 checkpoint 是flink系统自身的系统行为,用户无感知,只需要配置就可以了。

  • checkpoint 的实现参考了 Chandy-Lamport algorithm 算法, 自己实现了一套异步barrier快照的算法,在数据流中设置屏障,记录屏障点的信息(比如kafka偏移量)以及每个算子的 state 的状态。 拥有2个输入流的Operators 会执行 Barrier 对齐,保证当前快照消费了输入流barrier之前的所有events。
  • checkpoint的状态备份存储主要分为2类
    • 基于RocksDB内嵌k/v存储,将工作状态保存到磁盘上
    • 基于堆的 state backend.
      • FsStateBackend,将状态快照持久化到分布式文件系统中
      • MemoryStateBackend, 使用JobManager的堆保存状态
  • 在流处理过程中,对结果的保障分为3种
    • at most once (结果不会从快照中恢复)
    • at least once(没有任何丢失,但是会有冗余结果)
      • souce 必须可以重放
    • Exactly once (没有丢失,没有冗余)
      • souce 必须可以重放
      • sinks 必须是事务性的
  • checkpoint 状态保留策略
    • 当程序取消时,删除checkpoint存储文件
    • 当程序取消时,保存之前的checkpoint文件
  • chekpoint 配置
    • Exactly-once/at-least-once 模式配置
    • checkpoint 超时: 如果执行时间超过配置,checkpoint操作会被丢弃
    • 并发checkpoint的数目: 默认为1
    • checkpoints之间的最小时间
    • externalized checkpoints: 周期存储 checkpoint 到外部系统中
    • 在 checkpoint 出错时使 task 失败或者继续进行 task
    • 优先从 checkpoint 恢复: 在有savepoint的情况下的选择 (checkpoint 恢复的更快)

State

image-20210228151711717

有状态的计算是流处理框架要实现的重要功能,因为稍复杂的流处理场景都需要记录状态,主要用来作为中间状态来进行去重/对比/聚合/更新等操作。 而在flink中,一个算子会有多个子任务,状态是和子任务绑定的,有子任务创建和管理。对于状态的管理,flink作为计算框架,封装了一些实现,也包括状态的高效存储/checkpoint/savepoint持久化/计算资源扩缩容等等。

  • 针对不同的场景,会有状态的类型的划分
    • 托管状态(Managed State),由flink帮忙存储、恢复和优化,支持一些常见数据结构,比如ValueState/ListState/MapState
      • Keyed State
        • Keyed State是KeyedStream上的状态。假如输入流按照id为Key进行了keyBy分组,形成一个KeyedStream,数据流中所有id为1的数据共享一个状态,可以访问和更新这个状态,以此类推,每个Key对应一个自己的状态。
        • Flink提供了几种现成的数据结构供我们使用,包括ValueStateListState
      • Operator State
        • Operator State可以用在所有算子上,每个算子子任务或者说每个算子实例共享一个状态,流入这个算子子任务的数据可以访问和更新这个状态。
    • 原生状态(Raw State),需要开发者自己管理, 只支持字节,需要自己序列化
  • 横向扩展问题

Time

Flink 在流程序中支持不同的Time概念;

  • Time 类型
    • Processing Time, 事件被处理时机器的系统时间
    • Event Time, 事件发生的时间
    • Ingestion Time, 事件进入 Flink 的时间
  • Watermark 机制
    • 处理数据乱序的问题。在 Flink 的窗口处理过程中,如果确定全部数据到达,就可以对 Window 的所有数据做窗口计算操作(如汇总、分组等),如果数据没有全部到达,则继续等待该窗口中的数据全部到达才开始处理
    • Watermark = 进入 Flink 的最大的事件时间(mxtEventTime)— 指定的延迟时间(t)

Window

image-20210228161951552

前文提过,flink中其实是通过流来模拟批处理的,而window就是实现的机制。window可以是时间驱动的,也可以是数据驱动的 ,几种不同的窗口还行,可以参看上图。

参考文档

背景

通过把nextjs的静态文件放到github pages上,省一波流量钱,但是发现资源文件总是会404,本来以为是github pages构建完成更新cdn需要缓存,但是过了很久都没有成功,以下是静态文件目录

1
2
3
4
5
-- _next
-- chunks
-- *.js
-- css
-- *.css

后来做了猜想是目录深度原因,但是测试下来还是没有用。 后面经过验证是 下划线开头的文件或者文件夹的原因, 折腾了好半天。 实际的原因只是因为 github pages使用jeklly引擎的默认规则。

解法

  • 避免使用下划线开头的文件(需要重新)
  • 通过在根目录创建.nojekyll 空文件, 关闭jeklly引擎
1
2
3
4
5
6
-- .nojekyll
-- _next
-- chunks
-- *.js
-- css
-- *.css

背景

最近学习了一下 nextjs,用这个写了个小网站 cushiwen.cn. 过程中发现 nextjs 天然支持 amp,虽然只有css-in-js的方式支持css, 但是还是很方便的,就尝试了一下。 发现过程中总是出现以下错误:

1
2
3
4
5
6
7
8
Error: Unable to fetch https://cdn.ampproject.org/v0/validator.js - connect ETIMEDOUT 172.217.27.129:443
at ClientRequest.<anonymous> (/Users/xxx/xtestw/xxx/fe-xxx/node_modules/next/dist/compiled/amphtml-validator/index.js:1:1159)
at ClientRequest.emit (node:events:329:20)
at TLSSocket.socketErrorListener (node:_http_client:478:9)
at TLSSocket.emit (node:events:329:20)
at emitErrorNT (node:internal/streams/destroy:188:8)
at emitErrorCloseNT (node:internal/streams/destroy:153:3)
at processTicksAndRejections (node:internal/process/task_queues:80:21)

这就很尴尬了, 我自己架了个ss的代理,但是node本身的请求没有走代理,需要解决这个问题。

大概有几种解法吧:

  • VPN的方式架梯子,成本有点高
  • Charles 全局代理捕获,自定义 response (未验证,理论可行)
  • node全局代理

本文讨论最后一种的解法

实现

查找了一些资料,通过global-agent来实现。其实现主要通过系统环境变量来实现的 主要步骤如下:

  1. 安装 global-agent

    1
    yarn add global-agent
  2. 添加引用

    在需要的页面里面添加引用,我是在 _app.tsx 文件添加的

    1
    2
    3
    import 'global-agent/bootstrap';
    // or:
    // import {bootstrap} from 'global-agent';
  3. 设置环境变量

    1
    export GLOBAL_AGENT_HTTP_PROXY=http://127.0.0.1:8080

    如果指定部分域名不走代理的话,通过另外一个环境变量设置(因为我本地开了另外一个服务作为api接口, 所以这个接口不走代理):

    1
    export GLOBAL_AGENT_NO_PROXY='*.test.com,test2.com,localhost:8888'
  4. 启动

    需要找到next实际的启动地址,来启动

    1
    node -r global-agent/bootstrap /Users/xxx/xtestw/yyy/fe-yyy/node_modules/next/dist/bin/next

附录

UndeclaredThrowableException 原因

RPC 请求的时候抛出该异常,异常点是 RPC 调用的地方抛出的该异常,除此之外没有其他的异常信息。仔细排查,实际的异常应该是网络IO的TIMEOUT导致的。 那么,就有2个问题需要探讨:

  • 为啥抛出的是UndeclaredThrowableException,而不是 TimeOut 的 Exception
  • 如何抛出 TimeOut 异常

为啥抛出的是UndeclaredThrowableException

先说结论,之所以抛出这个异常,是因为在RPC调用的实现中,使用了 JDK 动态代理的原因。在 JDK Proxy 的调用中,如果实际运行时(InvocationHandler#invoke) 抛出了某个**受检异常(checked exception)**,但该受检异常并未在被代理对象接口定义中进行声明,那么这个异常就会被 JVM包装成 UndeclaredThrowableException 进行抛出。

这里面有个受检异常的概念,简单解释一下,Java 中所有异常,都继承自 java.lang.Throwable 类。

Throwable有两个直接子类,Error 类和 Exception 类。

Error 类型用于指示合理的应用程序不应该试图捕获的严重问题, 是一种 UncheckedExcepiton

Exception 可分为 RuntimeExceptionChecked Exception 两种, 而 RuntimeExceptionJVM 正常运行期间抛出的异常的超类时抛出,也是UncheckedExcepiton

  • 受检异常

    1
    2
    3
    4
    5
    6
    7
    8
    java.lang.ClassNotFoundException
    java.lang.CloneNotSupportedException
    java.lang.IllegalAccessException
    java.lang.InterruptedException
    java.lang.NoSuchFieldException
    java.lang.NoSuchMetodException
    java.io.IOException
    ...
  • 非受检异常

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    java.lang.ArithmeticException
    java.lang.ArrayStoreExcetpion
    java.lang.ClassCastException
    java.lang.EnumConstantNotPresentException
    java.lang.IllegalArgumentException
    java.lang.IllegalThreadStateException
    java.lang.NumberFormatException
    java.lang.IllegalMonitorStateException
    java.lang.IllegalStateException
    java.lang.IndexOutOfBoundsException
    java.lang.ArrayIndexOutOfBoundsException
    java.lang.StringIndexOutOfBoundsException
    java.lang.NegativeArraySizeException’
    java.lang.NullPointerException
    java.lang.SecurityException
    java.lang.TypeNotPresentException
    java.lang.UnsupprotedOperationException
    ...

如何抛出 TimeOut 异常

  • 可以通过UndeclaredThrowableException#getUndeclaredThrowable拿到被包装的受检异常;JDK1.4以后,通过*Throwable#getCause也可以拿到被包装的受检异常,而且这是被建议的方式,因为前者已经过时了

  • 实际上,method.invoke的方法申明如下:

    1
    public Object invoke(Object obj, Object... args) throws IllegalAccessException, IllegalArgumentException, InvocationTargetException

    Method.invoke 的方法本身只申明了3种异常,正常调用部分的异常会被 InvocationTargetException 包裹,实际上是 2 层包裹。 另外值得注意的是 InvocationTargetException 本身是受检异常, 既可以包裹受检异常,也可以包裹非受检异常。而 UndeclaredThrowableException 本身是非受检异常, 只可以包裹受检异常

  • 参看 Spring处理方法

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    // org.springframework.aop.support.AopUtils

    public static Object invokeJoinpointUsingReflection(Object target, Method method, Object[] args) throws Throwable {

    // Use reflection to invoke the method.
    try {
    ReflectionUtils.makeAccessible(method);
    return method.invoke(target, args);
    }
    catch (InvocationTargetException ex) {
    // Invoked method threw a checked exception.
    // We must rethrow it. The client won't see the interceptor.
    // 重点在此处:抛出被包装的原始异常
    throw ex.getTargetException();
    }
    catch (IllegalArgumentException ex) {
    throw new AopInvocationException("AOP configuration seems to be invalid: tried calling method [" +
    method + "] on target [" + target + "]", ex);
    }
    catch (IllegalAccessException ex) {
    throw new AopInvocationException("Could not access method [" + method + "]", ex);
    }
    }

去年在做内容库的时候,涉及到了多系统多数据源互相查询的情景,当时就想要将内容库的所有数据,增加统一的数据接入层,通过graphql的方式提供给上层业务使用,后面受限于人力没有实施。目前从头开始做站群相关的业务,顺带着浅尝了一下graphql,感觉还不错。

GraphQL 是什么

GraphQL is a query language for APIs and a runtime for fulfilling those queries with your existing data. GraphQL provides a complete and understandable description of the data in your API, gives clients the power to ask for exactly what they need and nothing more, makes it easier to evolve APIs over time, and enables powerful developer tools.

背景

参看: https://www.youtube.com/watch?v=Ah6GSCK5Rfs

graphql 和 rest api 是同一层的东西,都是一种基于 http 之上的数据接口协议,两者设计理念完全不同。目前来说 graphql 在很多大小厂也都开始使用,确实会方便很多。 但是 rest api 还是最广泛应用的协议,比如 AMP (T T)。 事实上任何东西都有2面性,对于graphql 来说,优缺点都有

与 REST API 相比的好处

  • 优点
    • 一次提供,多次使用
      • 作为后端,我们只需要关心数据字段的提供,不需要关心前端是怎么用这些字段,页面结构是怎样的
      • 真的不需要频繁的对接口了
    • 动态可扩展,无冗余查询,所见即所得
      • 因为前端只会请求需要的字段,增加新的字段后,不用担心老的业务请求这些冗余数据,很好的支撑了可扩展性
      • 所见即所得,能够帮助客户端代码不易出错
    • 多个字段,一次请求
      • 对于前端来说,面向schema编程,无需知道后端是多少个服务在支撑需要的这些字段,也无需频繁的更改接口去从新服务获取想要的字段,只需要在请求的时候增加一个 field 就好了
    • 代码即文档
      • 直接通过graphqli 等工具查看schema协议,不需要文档,写注释就好了
    • 类型校验
      • 在定义好schema的同时,就约束了接口类型,graphql支持强类型校验
  • 缺点
    • 基于post 请求
      • 传统方式无法监控(nginx)
      • 不能利用 http 自身的缓存机制
    • 没有充分利用http
      • 只使用了 post 请求,没有其他http的方法:方法的幂等性
      • 缺失了http状态码
    • 配套还不完善
      • 微服务
      • 监控
      • 分流

怎么用

几个基本概念

最好参看官方文档:https://graphql.github.io/graphql-spec/June2018/

中文版的可以查看:https://spec.graphql.cn/#sec-Overview-

概念很多,摘了几个出来,在实际业务场景中使用的话,还是有很多需要探的地方,这里只是大概的一个介绍

  • schema
    • 相当于协议文件,定义了所有对象的类型和查询接口
    • 一般定义的操作是2个: query/ mutation , 一个用于读,一个用于写,也存在subscription的操作
  • type
    • graphql 有自己的一套类型系统,有8种类型
      • scalars/标量
      • objects/对象
      • interfaces/接口
      • unions/联合
      • Enums/枚举
      • Input objects/输入类型
      • lists/列表
      • Non-null/非空型
    • graphql 有自己的基本类型,也可以自定义一些类型,但是需要相应的解释器
  • 内省 -> 验证 -> 执行 -> 响应
  • datafetcher
    • 实现过程中,定义了字段的获取方法

实操

具体的代码可以参看: https://github.com/xtestw/graphql-demo

  • 引入 graphql 的依赖包

    1
    2
    3
    4
    5
    <dependency>
    <groupId>com.graphql-java</groupId>
    <artifactId>graphql-java</artifactId>
    <version>13.0</version>
    </dependency>
  • 定义 schema 文件

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    schema {
    query: Query
    mutation: Mutation
    }
    scalar Date

    type Query {
    student(id:Int!):Student
    students(pagination:Pagination):[Student]
    }

    type Mutation {
    add(newStudent:NewStudent):Student
    }

    input Pagination{
    index: Int!
    size: Int!
    }

    input NewStudent{
    name:String!
    sex:Sex
    }

    type Student {
    id: Int
    name: String
    sex: Sex
    creation: Date
    }

    enum Sex{
    MALE,FEMALE
    }
  • 实现 schema 文件

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    package com.xtestw.graphql.demo.schema.wiring;

    import com.fasterxml.jackson.databind.ObjectMapper;
    import com.google.common.collect.ImmutableMap;
    import com.xtestw.graphql.demo.schema.model.NewStudent;
    import com.xtestw.graphql.demo.schema.model.Pagination;
    import com.xtestw.graphql.demo.storage.Student;
    import com.xtestw.graphql.demo.storage.Student.Sex;
    import com.xtestw.graphql.demo.storage.repository.StudentRepository;
    import graphql.schema.DataFetchingEnvironment;
    import graphql.schema.idl.MapEnumValuesProvider;
    import graphql.schema.idl.TypeRuntimeWiring;
    import graphql.schema.idl.TypeRuntimeWiring.Builder;
    import java.util.Collections;
    import java.util.List;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.stereotype.Component;

    /**
    * Create by xuwei on 2019/8/4
    */
    @Component
    public class StudentWiring implements Wiring {

    @Autowired
    StudentRepository studentRepository;
    ObjectMapper mapper = new ObjectMapper();

    @Override
    public List<TypeRuntimeWiring> wireTypes() {
    return Collections.singletonList(
    TypeRuntimeWiring.newTypeWiring("Sex")
    .enumValues(new MapEnumValuesProvider(
    ImmutableMap.of(
    "MALE", Sex.MALE,
    "FEMALE", Sex.FEMALE
    )))
    .build());
    }

    @Override
    public void wireQueries(Builder queryBuilder) {
    queryBuilder.dataFetcher("student", this::fetchStudentById)
    .dataFetcher("students", this::fetchStudents);
    }

    private List<Student> fetchStudents(DataFetchingEnvironment dataFetchingEnvironment) {
    Pagination pagination = mapper
    .convertValue(dataFetchingEnvironment.getArgument("pagination"), Pagination.class);
    if (pagination == null) {
    pagination = Pagination.create(0, 20);
    }
    return studentRepository.findAll(pagination.toPageable()).getContent();
    }

    private Student fetchStudentById(DataFetchingEnvironment dataFetchingEnvironment) {
    Integer id = dataFetchingEnvironment.getArgument("id");
    return studentRepository.findById(id).orElse(null);
    }

    @Override
    public void wireMutations(Builder mutationBuilder) {
    mutationBuilder.dataFetcher("add", this::addNewStudent);
    }

    private Student addNewStudent(DataFetchingEnvironment dataFetchingEnvironment) {
    NewStudent newStudent = mapper
    .convertValue(dataFetchingEnvironment.getArgument("newStudent"), NewStudent.class);
    return studentRepository.save(newStudent.toStudent());
    }
    }

  • 构建 GraphqQL 对象实例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
package com.xtestw.graphql.demo.config;

import com.xtestw.graphql.demo.schema.ExtendedScalars;
import com.xtestw.graphql.demo.schema.wiring.Wiring;
import graphql.GraphQL;
import graphql.GraphQLException;
import graphql.schema.GraphQLScalarType;
import graphql.schema.GraphQLSchema;
import graphql.schema.idl.RuntimeWiring;
import graphql.schema.idl.SchemaGenerator;
import graphql.schema.idl.SchemaParser;
import graphql.schema.idl.TypeDefinitionRegistry;
import graphql.schema.idl.TypeRuntimeWiring;
import graphql.schema.idl.errors.SchemaProblem;
import java.io.IOException;
import java.io.InputStreamReader;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.stream.Collectors;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.io.Resource;

/**
* Create by xuwei on 2019/8/3
*/
@Configuration
@Slf4j
public class GraphQLConfig {

@Value("classpath*:schemas/*.graphqls")
private Resource[] files;

@Bean
GraphQL graphQL(@Autowired GraphQLSchema graphQLSchema) {

return GraphQL.newGraphQL(graphQLSchema)
.build();
}


@Bean
GraphQLSchema graphQLSchema(@Autowired RuntimeWiring wiring) {
SchemaParser parser = new SchemaParser();
TypeDefinitionRegistry typeDefinitionRegistry = Arrays.stream(files).map(file -> {
try {
return file.getInputStream();
} catch (IOException e) {
log.error("Load graphql file error: {} - {}", file, e);
}
return null;
}).filter(Objects::nonNull)
.map(inputStream -> {
try {
return parser.parse(new InputStreamReader(inputStream));
} catch (SchemaProblem e) {
throw new GraphQLException(
String.format("Compile schema '%s' failed: %s", inputStream,
e.getErrors().stream().map(Object::toString).collect(Collectors.toList())), e);
}
}).reduce(new TypeDefinitionRegistry(), (all, cur) -> {
all.merge(cur);
return all;
});
return new SchemaGenerator().makeExecutableSchema(typeDefinitionRegistry, wiring);
}

@Bean
RuntimeWiring wiring(@Autowired List<GraphQLScalarType> scalarTypes,
@Autowired List<TypeRuntimeWiring> types) {
RuntimeWiring.Builder builder = RuntimeWiring.newRuntimeWiring();
if (scalarTypes != null) {
scalarTypes.forEach(builder::scalar);
}
if (types != null) {
types.forEach(builder::type);
}
return builder.build();
}

@Bean
List<GraphQLScalarType> scalarTypes() {
return Collections.singletonList(ExtendedScalars.GraphQLDate);
}

@Bean
List<TypeRuntimeWiring> types(@Autowired List<Wiring> wirings) {

TypeRuntimeWiring.Builder queryBuilder = TypeRuntimeWiring.newTypeWiring("Query");
TypeRuntimeWiring.Builder mutationBuilder = TypeRuntimeWiring.newTypeWiring("Mutation");
return wirings.stream().map(wiring -> {
wiring.wireQueries(queryBuilder);
wiring.wireMutations(mutationBuilder);
return wiring.wireTypes();
})
.reduce(new ArrayList<>(Arrays.asList(queryBuilder.build(), mutationBuilder.build())),
(all, cur) -> {
all.addAll(cur);
return all;
});
}

}

  • 定义接口

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    package com.xtestw.graphql.demo.controller;

    import com.fasterxml.jackson.databind.JsonNode;
    import com.fasterxml.jackson.databind.ObjectMapper;
    import com.fasterxml.jackson.databind.type.MapType;
    import com.xtestw.graphql.demo.schema.model.Query;
    import graphql.GraphQL;
    import java.io.IOException;
    import java.util.Collections;
    import java.util.HashMap;
    import java.util.Map;
    import javax.annotation.Resource;
    import lombok.extern.slf4j.Slf4j;
    import org.springframework.web.bind.annotation.CrossOrigin;
    import org.springframework.web.bind.annotation.PostMapping;
    import org.springframework.web.bind.annotation.RequestBody;
    import org.springframework.web.bind.annotation.RequestMapping;
    import org.springframework.web.bind.annotation.RestController;

    /**
    * Create by xuwei on 2019/8/3
    */
    @RestController
    @Slf4j
    @RequestMapping("/graphql")
    @CrossOrigin
    public class GraphQLController {

    @Resource
    GraphQL graphQL;

    @PostMapping(path = {""})
    private Object query(@RequestBody String queryStr) throws IOException {
    Query query = getQuery(queryStr);
    return graphQL.execute(query.toExecutionInput());
    }

    private static ObjectMapper mapper = new ObjectMapper();
    private static final MapType VARIABLES_TYPE = mapper.getTypeFactory()
    .constructMapType(HashMap.class,
    String.class, Object.class);

    private Query getQuery(String queryText) throws IOException {
    String operationName = null;
    String fullQueryText = queryText;
    Map<String, Object> variables = null;
    JsonNode jsonBody = mapper.readTree(queryText);
    if (jsonBody != null) {
    JsonNode queryNode = jsonBody.get("query");
    if (queryNode != null && queryNode.isTextual()) {
    queryText = queryNode.asText();
    }
    JsonNode operationNameNode = jsonBody.get("operationName");
    if (operationNameNode != null && operationNameNode.isTextual()) {
    operationName = operationNameNode.asText();
    }
    JsonNode variablesNode = jsonBody.get("variables");
    if (variablesNode != null) {
    if (variablesNode.isTextual()) {
    String variablesJson = variablesNode.asText();
    variables = mapper.convertValue(mapper.readTree(variablesJson), VARIABLES_TYPE);
    } else if (variablesNode.isObject()) {
    variables = mapper.convertValue(variablesNode, VARIABLES_TYPE);
    }
    }
    }
    if (variables == null) {
    variables = Collections.emptyMap();
    }
    return new Query(fullQueryText, queryText, operationName, variables);
    }
    }

  • 测试

image-20190804164253298

image-20190804164354974

image-20190804164517506

总结

graphql 相比较 restapi 来说,各有优缺点。个人感觉 graphql的前景还是很大的,目前最大的问题其实还是相关的生态和基础设施还不够完善,也存在很大的迁移成本和学习成本。不过单纯从数据获取的角度来说,非常有优势!此处我们只做了一个非常浅的探索。

工程中的网络请求,有时会需要使用http代理,比较简单的方法是使用apache的httpclient 直接设置代理,但有的时候使用java自带的HttpURLConnection的时候,就需要注意多线程的问题了。 使用HttpURLConnection 实现代理的方法也很简单,在建立连接之前先设置代理:

Authenticator.setDefault(authenticator);

需要注意的是,设置代理的方法并不是使用HttpURLConnection的一个方法,而在建立请求的时候,也没有任何调用和使用 Authenticator的地方,可以猜测这里设置了代理是使用了全局量,跟进Authenticator中去,会发现: 1 2 其实setDefault 方法就是设置了一个静态变量,而这个变量被使用的地方在: 3 (三个同名函数,相同的处理)这个静态变量被全局的网络请求所使用,而不是当前连接独占的配置,一般来说,当前网络使用一个http代理的时候没有问题(比如我们就是通过elb代理多个IP出口),但是当我们有多个代理的时候,在多线程环境下就会出现问题,如果代理服务器的账号密码不同,请求的服务球对cookie和ip进行校验的时候,就会比较麻烦,所以需要想办法来让每一个HttpURLConnection独占这个代理配置,直接的方法似乎没有,但是可以折中,同步网络请求过程中,HttpURLConnection是和唯一线程绑定的,我们可以用ThreadLocal,让每个线程独占一个代理配置,从而间接的保证每个HttpURLConnection始终使用一个代理配置。 可以定一个类:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public class ThreadLocalAuthenticator extends Authenticator {

ThreadLocal<PasswordAuthentication> auth = new ThreadLocal<>();

public void setPasswordAuthentication(PasswordAuthentication passwordAuthentication) {
auth.set(passwordAuthentication);
}

public void clearPasswordAuthentication() {
auth.remove();
}

@Override
protected PasswordAuthentication getPasswordAuthentication() {
return auth.get();
}
}

然后在http网络请求的工具类中定义一个全局的静态ThreadLocalAuthenticator的实例:

1
private static final ThreadLocalAuthenticator authenticator = new ThreadLocalAuthenticator();

然后在需要的时候使用它就OK了。

前段时间,因为一年前篡改了一个sdk的包,导致了一系列不可预知的损失,这也充分说明了技术的正确性,需要决策的正确性来支撑的。作为技术人员,不仅仅要考虑技术的可行性,也要考虑实施后的风险和结果的预期假设。 从工程角度,简单记录一下篡改sdk包的技术。

  1. 解压jar或者apk的包
  2. JD-GUI 反编译查看class的相关代码,找到修改点
  3. 自己编写相应的方法或者类,使用 javaassist 注入到class文件
  4. 重新打包

javassist 使用方法参考: https://jboss-javassist.github.io/javassist/tutorial/tutorial.html 附上测试代码,修改一个方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public static void main(String[] args) {
try {
javassist.tools.reflect.Loader cl = new javassist.tools.reflect.Loader();
ClassPool pool = ClassPool.getDefault();
pool.insertClassPath("/tmp/aa/");
CtClass cc = pool.get("com.facebook.ads.internal.d.g");
CtMethod ctmethod = cc.getDeclaredMethod("c");
cc.getDeclaredField("c");
// getCode()即为需要替换的的代码
ctmethod.setBody(getCode());
cc.writeFile("/tmp/aa/");
} catch (CannotCompileException e) {
e.printStackTrace();
} catch (NotFoundException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
}
}
}

最近的论文用到了RSA相关的东西,做一个整理。

流程图

  • 密钥生成过程: RSA
  • 加密解密过程: RSA

选取2个质数p、q

\(RSA\)算法的主要就是基于一个十分简单的数论事实:将两个大素数相乘十分容易,但是想要对其乘积进行因式分解却极其困难。 同时为了增强强度,\(p-1\)和\(q-1\)的最大公因子要小 质数的选取方法:

  • 随机搜索法随机产生一个奇数 \(p_1\) 进行素数测试,若是素数,则结束;否则,重新随机产生一个奇数 \(p_2\) 进行素性测试,直至找到一个素数 \(p_t\)。
  • 随机递增搜索法随机产生一个奇数,对以该数为起点的奇数依次进行测试,直至找到一个素数。这种方法相对于随机搜索法,在速度上有一定的提高,但是并没有本质上的区别

设:

\(p=61,q=53\)

计算n = p * q

\(n\)是公钥对,密钥对都需要的一个值,为提高保密强度,RSA密钥至少为500位长,一般推荐使用1024位,也有2048位的。

\( n = p*q=61*53 = 3233 \)

这个部分是可以直接求解的。

计算欧拉函数 ϕ(n)

\( \phi(n) = (p-1)(q-1) = 60*52 = 3120 \)

选择加密密钥 e

加密密钥 \(e\) 需要满足以下条件:

\(1 < e < \phi(n), gcd(e, \phi(n)) = 1\)

这个条件是为了确保\(e\)在模\(\phi(n)\)的情况下有逆元。出于安全性考虑 \(e=2\) 永远不该被使用 一般生成这种\(e\)的方法有2种:

  • 随机生成法
  • 穷举法

我们令

\(e = 17 \)

即我们得到公钥对:

\(PU\ = \ \{\ e,n\ \} = \{\ 17,3233\ \} \)

计算解密密钥 d

解密密钥 \(d\) 需要满足:

\(d \equiv\) \(e\)-1 \( \ \ (\ mod\ \phi(n) \ )\)

其实求的就是\(e\)的逆元,求逆元的方法有3种:

  • 循环求解法枚举所有的数,来求解\(e\)的逆元
  • 扩展欧几里

    扩展欧几里得算法可以在求得a、b的最大公约数的同时,能找到整数\(x、y\)(其中一个很可能是负数),使它们满足贝祖等式\(ax + by = gcd(a, b)\)。

当\(gcd(a, b)=1\),那么\((ax + by = 1)\),此时可以看出\(m\)是\(a\)模\(b\)的乘法逆元,\(n\)是\(b\)模\(a\)的乘法逆元。

扩展欧几里德计算过程:

\(a = q_1b + r_1 \ \ \ r_1 = ax_1+by_1\)

 

\(b = q_2r_1 + r_2 \ \ \ r_2 = ax_2+by_2\)

 

\(r_1 = q_3b + r_3 \ \ \ r_3 = ax_3+by_3\)

 

 

\(r_{n-2}= q_nr_{n-1} + r_n \ \ \ r_n = ax_n+by_n\)

 

\(r_{n-1}= q_{n+1}r_{n} + 0 \)

通过移项,得到:

\(r_i = r_{i-2}-r_{i-1}q_i\)

同样,从i-1和i-2行,也可以得到:

\(r_{i-2} = ax_{i-2}+by_{i-2}\ \ \ \ \ r_{i-1}=ax_{i-1}+by_{i-1}\)

代入:

\(r_i =a(x_{i-2} - q_ix_{i-1})+b(y_{i-2}-q_iy_{i-1})\)

因为我们已经假设\(r_i=ax_i+by_i\),因此

\(x_i =x_{i-2} - q_ix_{i-1}\ \ \ \ \ y_i=y_{i-2} - q_iy_{i-1}\)

#include
#include
#define LL __int64
using namespace std;
LL extend_gcd(LL a, LL b, LL &x, LL &y)//ax+by=1返回a,b的gcd
{
LL ans, t;
if(b == 0)
{
x = 1;
y = 0;
return a;
}
ans = extend_gcd(b, a%b, x, y);
t = x;
x = y;
y = t - ( a / b ) * y;
return ans;
}
int main()
{
LL a, b, c, x, y;
while(~scanf(“%I64d%I64d%I64d”, &a, &b, &c))//ax+by = c
{
LL gcd = extend_gcd(a,b,x,y);
while(x < 0)//x 为正
x += b,y -= a;
printf(“ax+by = 1的最小正整数解:%I64d %I64d\n”, x, y);//ax+by = 1
//x即为a%b的逆元,y为b%a的逆元
printf(“a mod b的逆元:%I64d\n”, x);
if(c % gcd)
{
printf(“无解!\n”);
continue;
}//ax+by = c
printf(“x=%I64d,y=%I64d\n”, x*c/gcd, y*c/gcd);
}
return 0;
}

  • 费马小定理成立的前提是\(\phi(n)\)为质数,否则无法使用。 假如\(a\)是整数,\(p\)是质数,且\(a,p\)互质(即两者只有一个公约数1),那么\(a\)的\((p-1)\)次方除以\(p\)的余数恒等于1。那么逆元为 \(a\)\(m-2\)\(\ mod\ m \)

利用扩展欧几里得算法可以求得:

\(d=2753\)

即我们得到密钥对:

\(PR\ =\ \{\ d,n\ \} = \{\ 2753,3233\ \}\)

加密解密

  • 加密 \(C = M^e\ (\ mod\ n\ )\)

假设对65加密,即\(M=65\)则:

\(C=65\)17 \(mod \ 3233 = 2790\)

  • 解密 \(M = C^d\ (\ mod\ n\ )\)

针对密文\(C=2790\)的情况,我们进行解密:

\(M = 2790\)2753 \(\ mod\ 3233 = 65.\)

求高次幂的快速幂算法: 比如求解:

\(a^b\ mod\ n\)

把b转换成二进制数,该二进制数第\(i\)位的权为 2\(i\)-1 例如, \(b=11\)的二进制是1011

11 = 23 ×1 + 22 ×0 + 21 ×1 + 20 ×1

因此,我们将\(a\)11 转化为算

\( a\)11 =\(a\)20 *\(a\)21 * \(a\)23

int modexp(int a,int b,int n)
{
int ret=1;
while(b)
{
if(b&1) ret=reta%n;
a=a
a%n;
b>>=1;
}
return ret;
}

// <![CDATA[ if (typeof MathJaxListener !== ‘undefined’) { MathJax.Hub.Register.StartupHook(‘End’, function () { MathJaxListener.invokeCallbackForKey_(‘End’); }); } // ]]>