spark 读取hive 计算后写入hive

2022-11-01,,,,

package com.grady

import org.apache.spark.SparkConf
import org.apache.spark.sql.{DataFrame, Row, SparkSession} object HiveTableToTable { def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf().setAppName("StuToStu2")
val spark: SparkSession = SparkSession.builder().config(conf).enableHiveSupport().getOrCreate() //tableToTable1(spark)
tableToTable2(spark)
} /**
* spark sql 方式
* @param spark
*/
def tableToTable1(spark: SparkSession): Unit = {
spark.sql("select * from jiang.student").show()
spark.sql("create table if not exists jiang.student_male like jiang.student;")
spark.sql("insert overwrite table jiang.student_male select * from jiang.student where sex = 'male'")
} /**
* 编程方式
* @param spark
*/
def tableToTable2(spark: SparkSession):Unit = {
spark.sql("create table if not exists jiang.student_female like jiang.student")
val dataFrame = spark.sql("select * from jiang.student")
val femaleDataSet = dataFrame.where("sex = 'female'")
// 有它和 case class Student 才能toDF,直接定义写成类文件不行
import spark.implicits._
val studentsDF = femaleDataSet.rdd.map( r =>
Student(r(0).toString.toInt, r(1).toString, r(2).toString, r(3).toString.toInt, r(4).toString)
).map(s => {
Student(s.id, s.name, s.sex, 18, "FemaleFt")
}).toDF()
studentsDF.write.mode("overwrite").insertInto("jiang.student_female") // 方法二
// val schema = SchemaType.getStudentSchema()
// 这里studentsRDD 需要转换成RDD[Row] 才可以使用
// val femaleStudentDF = spark.createDataFrame(studentsRDD, schema)
}
}
case class Student(id: Int, name: String, sex: String, age: Int, department: String)

执行:

spark-submit --master local[2] --num-executors 10 --class com.grady.HiveTableToTable /app/data/appdeploy/usehive1-1.0-SNAPSHOT.jar

日志:

hive> select * from student_female;
2 xiaochen female 18 FemaleFt
Time taken: 2.838 seconds, Fetched: 1 row(s)

spark 读取hive 计算写入hive的相关教程结束。

《spark 读取hive 计算后写入hive.doc》

下载本文的Word格式文档,以方便收藏与打印。