Spark 分组 TOPN 排序

本贴最后更新于 3154 天前,其中的信息可能已经时移世改
/**
 * Created by zhangshuai on 2016/9/22.
 */
//        输入文件
//        Spark 100
//        Hadoop 65
//        Spark 99
//        Hadoop 61
//        Spark 195
//        Hadoop 60
//        Spark 98
//        Hadoop 69
//        Spark 91
//        Hadoop 64
//        Spark 89
//        Hadoop 98
//        Spark 88
//        Hadoop 99
//        Spark 68
//        Hadoop 60
//        Spark 79
//        Hadoop 97
//        Spark 69
//        Hadoop 96



//        结果输出
//        Group key :Spark
//        195
//        100
//        99
//        98
//        91
//        *********************
//        Group key :Hadoop
//        99
//        98
//        97
//        96
//        69
//        **********************

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.api.java.function.VoidFunction;

import scala.Tuple2;

import java.util.Arrays;
import java.util.Iterator;


public class TopNGroupJava {
    public static void main(String[] args) {
        // TODO Auto-generated method stub
        SparkConf conf = new SparkConf().setAppName("TopNGroupJava")
                .setMaster("local");

        JavaSparkContext sc = new JavaSparkContext(conf); //其底层实际上就是Scala的SparkContext
        JavaRDD<String> lines = sc.textFile(
                "E://topn.txt");

        JavaPairRDD<String, Integer> pairs = lines.mapToPair(new PairFunction<String, String, Integer>() {
            private static final long serialVersionUID = 1L;

            @Override
            public Tuple2<String, Integer> call(String line)
                    throws Exception {
                // TODO Auto-generated method stub
                String[] splitedLine = line.split(" ");
                System.out.println(splitedLine[0]);

                return new Tuple2<String, Integer>(splitedLine[0],Integer.valueOf(splitedLine[1]));
            }
        });

        JavaPairRDD<String, Iterable<Integer>> groupedPairs = pairs.groupByKey();

        JavaPairRDD<String, Iterable<Integer>> top5 = groupedPairs.mapToPair(new PairFunction<Tuple2<String, Iterable<Integer>>, String, Iterable<Integer>>() {
            /**
             *
             */
            private static final long serialVersionUID = 1L;

            @Override
            public Tuple2<String, Iterable<Integer>> call(Tuple2<String, Iterable<Integer>> groupedData) throws Exception {
                Integer[] top5 = new Integer[5]; //保存top5本身
                String groupedKey = groupedData._1(); //获取分组组名
                Iterator<Integer> groupedValue = groupedData._2().iterator(); //获取每组的内容集合

                while (groupedValue.hasNext()) { //查看下一个元素,如果有继续循环
                    Integer value = groupedValue.next(); //获取当前循环的元素本身内容

                    for (int i = 0; i < 5; i++) {
                        if (top5[i] == null) {
                            top5[i] = value;

                            break;
                        } else if (value > top5[i]) {
                            for (int j = 4; j > i; j--) {
                                top5[j] = top5[j - 1];
                            }

                            top5[i] = value;

                            break;
                        }
                    }
                }

                return new Tuple2<String, Iterable<Integer>>(groupedKey,
                        Arrays.asList(top5));
            }
        });

        top5.foreach(new VoidFunction<Tuple2<String, Iterable<Integer>>>() {
            @Override
            public void call(Tuple2<String, Iterable<Integer>> topped)
                    throws Exception {
                System.out.println("Group key :" + topped._1());

                Iterator<Integer> toppedValue = topped._2().iterator();

                while (toppedValue.hasNext()) {
                    Integer value = toppedValue.next();
                    System.out.println(value);
                }

                System.out.println("**********************");
            }
        });
    }
}
import org.apache.spark.{SparkContext, SparkConf}

/**
 * Created by zhangshuai on 2016/9/22.
 */
//输入文件
//Spark,100
//Hadoop,62
//Flink,77
//Kafka,91
//Hadoop,93
//Spark,78
//Hadoop,69
//Spark,98
//Hadoop,62
//Spark,99
//Hadoop,61
//Spark,70
//Hadoop,75
//Spark,88
//Hadoop,68
//Spark,90
//Hadoop,61



//结果输出
//Flink:
//77
//Hadoop:
//61
//61
//62
//62
//68
//Kafka:
//91
//Spark:
//70
//78
//88
//90
//98
object TopNGroupScala {
  def main(args: Array[String]) {
     val conf=new SparkConf().setAppName("TopNGroupScala").setMaster("local")

     val sc=new SparkContext(conf)
     sc.setLogLevel("WARN")
     val lines=sc.textFile("E://topn.txt",1)

     val pairs=lines.map{(line =>(line.split(",")(0),line.split(",")(1).toInt))}

     val grouped=pairs.groupByKey()

     val groupedTop5=grouped.map(grouped =>
      {
        (grouped._1,grouped._2.toList.sortWith(_<_).take(5))
      }
     )
     val groupedKeySorted=groupedTop5.sortByKey()

     groupedKeySorted.collect().foreach(pair =>
     {
       println(pair._1+":")
       pair._2.foreach{println}
     }
     )
    sc.stop()

  }
}

 

  • Spark

    Spark 是 UC Berkeley AMP lab 所开源的类 Hadoop MapReduce 的通用并行框架。Spark 拥有 Hadoop MapReduce 所具有的优点;但不同于 MapReduce 的是 Job 中间输出结果可以保存在内存中,从而不再需要读写 HDFS,因此 Spark 能更好地适用于数据挖掘与机器学习等需要迭代的 MapReduce 的算法。

    74 引用 • 46 回帖 • 563 关注

相关帖子

欢迎来到这里!

我们正在构建一个小众社区,大家在这里相互信任,以平等 • 自由 • 奔放的价值观进行分享交流。最终,希望大家能够找到与自己志同道合的伙伴,共同成长。

注册 关于
请输入回帖内容 ...

推荐标签 标签

  • Solo

    Solo 是一款小而美的开源博客系统,专为程序员设计。Solo 有着非常活跃的社区,可将文章作为帖子推送到社区,来自社区的回帖将作为博客评论进行联动(具体细节请浏览 B3log 构思 - 分布式社区网络)。

    这是一种全新的网络社区体验,让热爱记录和分享的你不再感到孤单!

    1441 引用 • 10069 回帖 • 494 关注
  • 运维

    互联网运维工作,以服务为中心,以稳定、安全、高效为三个基本点,确保公司的互联网业务能够 7×24 小时为用户提供高质量的服务。

    150 引用 • 257 回帖
  • CentOS

    CentOS(Community Enterprise Operating System)是 Linux 发行版之一,它是来自于 Red Hat Enterprise Linux 依照开放源代码规定释出的源代码所编译而成。由于出自同样的源代码,因此有些要求高度稳定的服务器以 CentOS 替代商业版的 Red Hat Enterprise Linux 使用。两者的不同在于 CentOS 并不包含封闭源代码软件。

    239 引用 • 224 回帖
  • OneNote
    1 引用 • 3 回帖
  • ActiveMQ

    ActiveMQ 是 Apache 旗下的一款开源消息总线系统,它完整实现了 JMS 规范,是一个企业级的消息中间件。

    19 引用 • 13 回帖 • 680 关注
  • 七牛云

    七牛云是国内领先的企业级公有云服务商,致力于打造以数据为核心的场景化 PaaS 服务。围绕富媒体场景,七牛先后推出了对象存储,融合 CDN 加速,数据通用处理,内容反垃圾服务,以及直播云服务等。

    28 引用 • 226 回帖 • 128 关注
  • 面试

    面试造航母,上班拧螺丝。多面试,少加班。

    325 引用 • 1395 回帖 • 2 关注
  • Caddy

    Caddy 是一款默认自动启用 HTTPS 的 HTTP/2 Web 服务器。

    12 引用 • 54 回帖 • 177 关注
  • 开源

    Open Source, Open Mind, Open Sight, Open Future!

    412 引用 • 3588 回帖
  • 域名

    域名(Domain Name),简称域名、网域,是由一串用点分隔的名字组成的 Internet 上某一台计算机或计算机组的名称,用于在数据传输时标识计算机的电子方位(有时也指地理位置)。

    43 引用 • 208 回帖 • 1 关注
  • DNSPod

    DNSPod 建立于 2006 年 3 月份,是一款免费智能 DNS 产品。 DNSPod 可以为同时有电信、网通、教育网服务器的网站提供智能的解析,让电信用户访问电信的服务器,网通的用户访问网通的服务器,教育网的用户访问教育网的服务器,达到互联互通的效果。

    6 引用 • 26 回帖 • 529 关注
  • Vue.js

    Vue.js(读音 /vju ː/,类似于 view)是一个构建数据驱动的 Web 界面库。Vue.js 的目标是通过尽可能简单的 API 实现响应的数据绑定和组合的视图组件。

    268 引用 • 666 回帖
  • Angular

    AngularAngularJS 的新版本。

    26 引用 • 66 回帖 • 548 关注
  • 旅游

    希望你我能在旅途中找到人生的下一站。

    96 引用 • 901 回帖
  • Webswing

    Webswing 是一个能将任何 Swing 应用通过纯 HTML5 运行在浏览器中的 Web 服务器,详细介绍请看 将 Java Swing 应用变成 Web 应用

    1 引用 • 15 回帖 • 639 关注
  • Mac

    Mac 是苹果公司自 1984 年起以“Macintosh”开始开发的个人消费型计算机,如:iMac、Mac mini、Macbook Air、Macbook Pro、Macbook、Mac Pro 等计算机。

    168 引用 • 597 回帖
  • Kafka

    Kafka 是一种高吞吐量的分布式发布订阅消息系统,它可以处理消费者规模的网站中的所有动作流数据。 这种动作(网页浏览,搜索和其他用户的行动)是现代系统中许多功能的基础。 这些数据通常是由于吞吐量的要求而通过处理日志和日志聚合来解决。

    36 引用 • 35 回帖 • 1 关注
  • Lute

    Lute 是一款结构化的 Markdown 引擎,支持 Go 和 JavaScript。

    29 引用 • 202 回帖 • 29 关注
  • Google

    Google(Google Inc.,NASDAQ:GOOG)是一家美国上市公司(公有股份公司),于 1998 年 9 月 7 日以私有股份公司的形式创立,设计并管理一个互联网搜索引擎。Google 公司的总部称作“Googleplex”,它位于加利福尼亚山景城。Google 目前被公认为是全球规模最大的搜索引擎,它提供了简单易用的免费服务。不作恶(Don't be evil)是谷歌公司的一项非正式的公司口号。

    49 引用 • 192 回帖
  • Office

    Office 现已更名为 Microsoft 365. Microsoft 365 将高级 Office 应用(如 Word、Excel 和 PowerPoint)与 1 TB 的 OneDrive 云存储空间、高级安全性等结合在一起,可帮助你在任何设备上完成操作。

    5 引用 • 34 回帖
  • Vim

    Vim 是类 UNIX 系统文本编辑器 Vi 的加强版本,加入了更多特性来帮助编辑源代码。Vim 的部分增强功能包括文件比较(vimdiff)、语法高亮、全面的帮助系统、本地脚本(Vimscript)和便于选择的可视化模式。

    29 引用 • 66 回帖
  • API

    应用程序编程接口(Application Programming Interface)是一些预先定义的函数,目的是提供应用程序与开发人员基于某软件或硬件得以访问一组例程的能力,而又无需访问源码,或理解内部工作机制的细节。

    79 引用 • 431 回帖
  • Telegram

    Telegram 是一个非盈利性、基于云端的即时消息服务。它提供了支持各大操作系统平台的开源的客户端,也提供了很多强大的 APIs 给开发者创建自己的客户端和机器人。

    5 引用 • 35 回帖
  • GraphQL

    GraphQL 是一个用于 API 的查询语言,是一个使用基于类型系统来执行查询的服务端运行时(类型系统由你的数据定义)。GraphQL 并没有和任何特定数据库或者存储引擎绑定,而是依靠你现有的代码和数据支撑。

    4 引用 • 3 回帖 • 13 关注
  • 互联网

    互联网(Internet),又称网际网络,或音译因特网、英特网。互联网始于 1969 年美国的阿帕网,是网络与网络之间所串连成的庞大网络,这些网络以一组通用的协议相连,形成逻辑上的单一巨大国际网络。

    99 引用 • 367 回帖
  • Laravel

    Laravel 是一套简洁、优雅的 PHP Web 开发框架。它采用 MVC 设计,是一款崇尚开发效率的全栈框架。

    20 引用 • 23 回帖 • 740 关注
  • JSON

    JSON (JavaScript Object Notation)是一种轻量级的数据交换格式。易于人类阅读和编写。同时也易于机器解析和生成。

    52 引用 • 190 回帖