java和spark连接mysql hive hbase进行读写操作_a姜哲雨的博客-程序员宝宝

技术标签: 连接数据库  basic  大数据  

一、java连接

1.1 java连接mysql

public static void main(String[] args) throws Exception {
    
        String driver = "com.mysql.jdbc.Driver";
        String url = "jdbc:mysql://192.168.56.101:3306/eat?useUnicode=true&characterEncoding=utf8&useSSL=true";
        String username = "root";
        String password = "ok";

        Class.forName(driver);


        Connection con = DriverManager.getConnection(url,username,password);
        PreparedStatement pst = con.prepareStatement("select * from foods");
    	ResultSet res = pst.executeQuery();
        while (res.next()){
    
            int food_id = res.getInt("food_id");
            int shop_id = res.getInt("shoppe_id");
            String food_name = res.getString("food_name");
            int food_price = res.getInt("food_price");
            String result = food_id+":"+shop_id+":"+food_name+":"+food_price;
            System.out.println(result);
        }
        res.close();
        pst.close();
        con.close();
    }
public static void main(String[] args) throws Exception {
    
        String driver = "com.mysql.jdbc.Driver";
        String url = "jdbc:mysql://192.168.56.101:3306/eat?useUnicode=true&characterEncoding=utf8&useSSL=true";
        String username = "root";
        String password = "ok";

            Class.forName(driver);


        Connection con = DriverManager.getConnection(url,username,password);
        PreparedStatement pst = con.prepareStatement("insert into table foods(food_id,shoppe_id,food_name,food_price) values (?,?,?,?)");
        pst.setInt(1,100);
        pst.setInt(2,33);
        pst.setString(3,"红烧带鱼");
        pst.setInt(4,25);
        int i = pst.executeUpdate();
        System.out.println(i);
        pst.close();
        con.close();
    }

1.2 java连接hive

public static void main(String[] args) throws Exception {
    
        String driver = "org.apache.hive.jdbc.HiveDriver";
        String url = "jdbc:hive2://192.168.56.101:10000/mydemo";
        String username = "";
        String password = "";

        Class.forName(driver);

        Connection con = DriverManager.getConnection(url,username,password);
        PreparedStatement pst = con.prepareStatement("select * from scores");
        ResultSet res = pst.executeQuery();
        while (res.next()){
    
            int scid = res.getInt("scid");
            int userid = res.getInt("userid");
            int score = res.getInt("score");
            String classname = res.getString("classname");
            String result = scid+":"+userid+":"+score+":"+classname;
            System.out.println(result);
        }
    }
public static void main(String[] args) throws Exception {
    
        String driver = "org.apache.hive.jdbc.HiveDriver";
        String url = "jdbc:hive2://192.168.56.101:10000/mydemo";
        String username = "";
        String password = "";

        Class.forName(driver);
        Connection con = DriverManager.getConnection(url,username,password);
        PreparedStatement pst = con.prepareStatement("insert into table scores(scid,userid,score,classname) values (?,?,?,?)");
        pst.setInt(1,4);
        pst.setInt(2,4);
        pst.setInt(3,99);
        pst.setString(4,"kb11");

        int i = pst.executeUpdate();
        System.out.println(i);

1.3 java连接HBase

public class java_hbase {
    
//      创建表
    public void createTable() throws IOException {
    
//      配置信息
        Configuration conf = HBaseConfiguration.create();
        conf.set("hbase.zookeeper.quorum","clz");
        conf.set("hbase。zookeeper.property.clientPort","2181");

//        创建连接
        Connection con = ConnectionFactory.createConnection(conf);
        Admin admin = con.getAdmin();
        HTableDescriptor kk = new HTableDescriptor(TableName.valueOf("kk"));

        kk.addFamily(new HColumnDescriptor("kk1"));
        kk.addFamily(new HColumnDescriptor("kk2"));

        admin.createTable(kk);
    }

//    插入数据
    public void putData() throws IOException {
    
        Configuration conf = new Configuration();
        conf.set("hbase.zookeeper.quorum","clz");
        conf.set("hbase.zookeeper.property.clientPort","2181");

        Connection con = ConnectionFactory.createConnection(conf);
        Table student = con.getTable(TableName.valueOf("kk"));
        Put put = new Put(Bytes.toBytes("1001"));
        put.addColumn(Bytes.toBytes("kk1"),Bytes.toBytes("name"),Bytes.toBytes("zs"));
        student.put(put);
    }

//    查看数据
    public void getData() throws IOException {
    
        Configuration conf = HBaseConfiguration.create();
        conf.set("hbase.zookeeper.quorum","clz");
        conf.set("hbase.zookeeper.property.clientPort","2181");

        Connection con = ConnectionFactory.createConnection(conf);
        Table student = con.getTable(TableName.valueOf("kk"));
        Get get = new Get(Bytes.toBytes("1001"));
        Result result = student.get(get);
        Cell[] cells = result.rawCells();
        for (Cell cell : cells) {
    
            System.out.println("rowkey"+Bytes.toString(CellUtil.cloneRow(cell)));
            System.out.println("列簇"+Bytes.toString(CellUtil.cloneFamily(cell)));
            System.out.println("列名"+Bytes.toString(CellUtil.cloneQualifier(cell)));
            System.out.println("列值"+Bytes.toString(CellUtil.cloneValue(cell)));
        }
    }
//	 删除表操作 
    public void dropTable() throws IOException {
    
        Configuration conf = HBaseConfiguration.create();
        conf.set("hbase.zookeeper.quorum","clz");
        conf.set("hbase.zookeeper.property.clientPort","2181");
        //创建连接
        Connection conn = ConnectionFactory.createConnection(conf);
        //创建admin
        Admin admin = conn.getAdmin();
        //调用API禁用表
        admin.disableTable(TableName.valueOf("kk"));
        //调用API删除表
        admin.deleteTable(TableName.valueOf("kk"));
    }
}

二、spark连接

2.1 spark连接mysql

def main(args: Array[String]): Unit = {
    
    val spark = SparkSession.builder().appName("mysql").master("local[*]").getOrCreate()
    val url = "jdbc:mysql://jzy1:3306/mydemo"
    val prop = new Properties()
    prop.setProperty("driver","com.mysql.jdbc.Driver")
    prop.setProperty("user","root")
    prop.setProperty("password","ok")

    val frame = spark.read.jdbc(url,"course",prop)
    frame.show()
}

def main(args: Array[String]): Unit = {
    
    val spark = SparkSession.builder().appName("mysql").master("local[*]").getOrCreate()
    import spark.implicits._
    
    val url = "jdbc:mysql://jzy1:3306/mydemo"
    val prop = new Properties()
    prop.setProperty("driver","com.mysql.jdbc.Driver")
    prop.setProperty("user","root")
    prop.setProperty("password","ok")
    val courseadd = Seq(("04","化学","02")).toDF("c_id","c_name","t_id")
    courseadd.write.mode("append").jdbc(url,"course",prop)
  }

2.2 spark连接hive

 def main(args: Array[String]): Unit = {
    
    val spark = SparkSession.builder()
      .master("local[*]")
      .appName("dwd_action")
      .enableHiveSupport()
      .config("hive.metastore.uris", "thrift://192.168.56.21:9083")
      .getOrCreate()

    val usr = spark.sql("select * from mydemo.userinfos").cache()
    usr.show()
  }

 def main(args: Array[String]): Unit = {
      	
	val spark = SparkSession.builder()
      .master("local[*]")
      .appName("dwd_action")
      .enableHiveSupport()
      .config("hive.metastore.uris", "thrift://192.168.56.21:9083")
      .getOrCreate()

    val usr = spark.sql("select * from mydemo.userinfos").cache()
    usr.write.mode("append").saveAsTable("mydemo.userinfos")
 }

2.3 spark连接hbase

读:

     val spark = SparkSession.builder().appName("HBaseTest").master("local[*]").getOrCreate()
    val sc= spark.sparkContext
    val tablename="myexam:ord1"
    val conf = HBaseConfiguration.create()

    conf.set("hbase.zookeeper.quorum","192.168.56.21")
    conf.set("hbase.zookeeper.property.clientPort","2181")
    conf.set(TableInputFormat.INPUT_TABLE,tablename)

    val rdd1= sc.newAPIHadoopRDD(conf,classOf[TableInputFormat],
      classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],
      classOf[org.apache.hadoop.hbase.client.Result]
    ).cache()

    println("count="+rdd1.count())
    import spark.implicits._
    //遍历输出
    rdd1.foreach({
    case (_,result) =>
      //通过result.getRow来获取行键
      val key = Bytes.toString(result.getRow)
      //通过result.getValue("列簇","列名")来获取值
      //需要使用getBytes将字符流转化为字节流
      val buynum = Bytes.toString(result.getValue("one".getBytes,"buynum".getBytes))
      val cust_id = Bytes.toString(result.getValue("one".getBytes,"cust_id".getBytes))
      val dt = Bytes.toString(result.getValue("one".getBytes,"dt".getBytes))
      val good_id = Bytes.toString(result.getValue("one".getBytes,"good_id".getBytes))

      println("Row key:"+key+" buynum:"+buynum+" cust_id:"+cust_id+" dt:"+dt+" good_id:"+good_id)
    })

写:

    val spark = SparkSession.builder().appName("HBaseTest").master("local[*]").getOrCreate()
    val sc= spark.sparkContext
    val tablename="myexam:ord1"
    val conf = HBaseConfiguration.create()

    conf.set("hbase.zookeeper.quorum","192.168.56.21")
    conf.set("hbase.zookeeper.property.clientPort","2181")
    conf.set(TableOutputFormat.OUTPUT_TABLE,tablename)

    val job = new JobConf(conf)
    job.setOutputFormat(classOf[TableOutputFormat])

    val indataRDD = sc.makeRDD(Array("11,1,6,20200807,7"))

    val rdd = indataRDD.map(_.split(",")).map{
    arr=>
      val put = new Put(Bytes.toBytes(arr(0)))
      put.addColumn(Bytes.toBytes("one"),Bytes.toBytes("buynum"),Bytes.toBytes(arr(1)))
      put.addColumn(Bytes.toBytes("one"),Bytes.toBytes("cust_id"),Bytes.toBytes(arr(2)))
      put.addColumn(Bytes.toBytes("one"),Bytes.toBytes("dt"),Bytes.toBytes(arr(3)))
      put.addColumn(Bytes.toBytes("one"),Bytes.toBytes("good_id"),Bytes.toBytes(arr(4)))
      (new ImmutableBytesWritable,put)
    }
    rdd.saveAsHadoopDataset(job)
版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
本文链接:https://blog.csdn.net/weixin_42487460/article/details/108620822

智能推荐

poj2456 贪心+二分_Charon_HN的博客-程序员宝宝

Farmer John has built a new long barn, with N (2 <= N <= 100,000) stalls. The stalls are located along a straight line at positions x1,...,xN (0 <= xi <= 1,000,000,000).His C (2 <= C &l...

用STL实现DFS/BFS算法——使用boost.Multi_Index容器_alai04的博客-程序员宝宝_用stl写bfs

用STL实现DFS/BFS算法——使用boost.Multi_Index容器花了几天时间熟悉了一下boost的Multi_Index容器,觉得确实是个很不错的东西。现在准备用它来替换DFS/BFS算法中的list容器以及查重所用的set或hash_set容器。在前面的DFS/BFS版本中,我们使用了list容器来保存在搜索过程中生成的所有状态结点,并使用set或hash_set,还有v

北航的计算机组成,北航计算机组成原理讲义_1.Introduction.pdf_若水如鱼的博客-程序员宝宝

本讲内容透视计算机:计算机组成与结构总结组成部件成部件内部连接理解计算机:机器指令的执行过程计算机最基本的操作与控制:微操作指令的执行过程程序的执行程序的执行计算机的时序控制方式时序系时序系统简介简介计算机结构(Architecture)的概念演变What is Computer Architecture ?ComputerComputer ArchitectureArchi...

linux Proc文件系统介绍及运用总结_goldlevi的博客-程序员宝宝

引言先说一个刚刚发现的问题:前两天打开测试机发现速度非常慢,top一看,发现java占用CPU 99% 查看对应pid,发现这个是新的话单采集程序gather:#ps -ef | grep javaRoot 5762 1 99 14:41 pts/0 00:00:01 java -Xmx2048m -jar Gather-jar-with-depende

从0到1学习FreeRTOS:FreeRTOS 内核应用开发:(一)移植FreeRTOS到STM32第一部分_ba_wang_mao的博客-程序员宝宝

一、STM32 的裸机工程模板:相信学FreeRTOS的同学一定学过stm32,所以可以用自己的工程作为模板即可。或者去火哥论坛下一份模板程序。二、下载 FreeRTOS V9.0.0 源码:获取FreeRTOS 的官方的源码包。官 网 :https://www.freertos.org/ 下载三、FreeRTOS 文件夹内容简介:1、FreeRTOS 文件夹: FreeRTOS包含Demo 例程和内核源码(非常重要) ,具体见下...

Kotlin基本语法练习第二章_WoozXi的博客-程序员宝宝

kotlin基本语法练习第二章函数声明和动态参数try cath用法break 和continue流程控制关键字的使用、标签的声明和使用类声明和类成员函数调用作用域函数的使用委托属性的声明和使用package com.wzx.jetpartproimport java.lang.IllegalArgumentExceptionimport java.lang.IllegalStateExceptionimport kotlin.reflect.KProperty/** crea

随便推点

显示器刷新率测试软件144,【显示器】为什么“吃鸡”要选144Hz显示器?详解高刷新率显示器对FPS游戏的重要影响..._weixin_39777163的博客-程序员宝宝

144Hz这个话题其实已经是老生常谈了,对于游戏玩家来讲相信都不陌生,但对于大多数人来说可能还比较陌生,下面我们就来详细了解一下。首先就是大家问的最多的一个问题“144Hz是什么?”这里面的“144Hz”指的是显示器每秒钟的刷新率,而刷新率大家可以理解为显示器每秒钟画面的刷新次数,在传统显示器也就是60Hz显示器中,画面显示比较理想,但也只是能够保证一个正常大家可以接受的状态,而144Hz与传统6...

pl/sql查看锁表及解锁_weixin_30460489的博客-程序员宝宝

① 查看用户锁表select sess.sid,sess.serial#,lo.oracle_username,lo.os_user_name,ao.object_name,lo.locked_modefrom v$locked_object lo,dba_objects ao,v$session sesswhere ao.object_id = lo.object_id a...

强强联手,AliOS和ACRN为物联网领域带来全新解决方案_云布道师的博客-程序员宝宝

自3月份发布以来,开发者社区和行业合作伙伴都对ACRN项目表现出极大的兴趣。作为一款灵活的、轻量级的参考hypervisor,ACRN在构建时充分考虑了实时性和关键安全性,通过开源平台优...

世界各国对“智慧城市”的发展建设,主要做了些什么?_叁仟智慧城市的博客-程序员宝宝_国外智慧城市建设进程

智慧城市上帝造原野,人类造城市。城市是什么?贾平凹形容城市是一堆水泥和拥挤的人群。几乎人人都在抱怨着城市的拥挤、吵闹和空气污浊,但谁也不愿自己搬离城市。1、城市发展脚步太快这是一座城市的真实写照,也是生命力的体现。当科技赋予城市新的动力,城市又会是什么样?在现在,“码上生活”正在改变中国城市人的日常;大数据,已经深入生活的各个角落;人工智能悄然取代了过去的一些工作岗位,比如某些服务窗口…这些变化带来的结果是生活的快捷便利,也预示着智慧城市的大门正在缓缓开启。相关数据显示,在我国657个县级以上城

安卓模拟器一直黑屏_詹小布的博客-程序员宝宝

安卓模拟器黑屏,也没有开机界面。一直处于offline状态。原因:用的高版本的Android9,API版本28在SDK manager里面安装了Android5.1,重启eclipse,新建安卓虚拟机运行成功,出现开机界面,可以正常运行。配置如下图:转载于:https://www.cnblogs.com/ARABYCHEN/p/10589146.html...

CentOS中使用wget下载远程ftp文件_weixin_34362991的博客-程序员宝宝

2019独角兽企业重金招聘Python工程师标准>>> ...

推荐文章

热门文章

相关标签