scala 如何将 DataFrame 直接保存到 Hive?

声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow 原文地址: http://stackoverflow.com/questions/30664008/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me): StackOverFlow

提示:将鼠标放在中文语句上可以显示对应的英文。显示中英文
时间:2020-10-22 07:13:57  来源:igfitidea点击:

How to save DataFrame directly to Hive?

scalaapache-sparkhiveapache-spark-sql

提问by Gourav

Is it possible to save DataFramein spark directly to Hive?

是否可以将DataFramespark 直接保存到 Hive?

I have tried with converting DataFrameto Rddand then saving as a text file and then loading in hive. But I am wondering if I can directly save dataframeto hive

我曾尝试与转换DataFrameRdd,然后保存为文本文件,然后在蜂巢装。但是我想知道是否可以直接保存dataframe到hive

回答by Vinay Kumar

You can create an in-memory temporary table and store them in hive table using sqlContext.

您可以使用 sqlContext 创建一个内存中临时表并将它们存储在 hive 表中。

Lets say your data frame is myDf. You can create one temporary table using,

假设您的数据框是 myDf。您可以使用以下方法创建一个临时表,

myDf.createOrReplaceTempView("mytempTable") 

Then you can use a simple hive statement to create table and dump the data from your temp table.

然后你可以使用一个简单的 hive 语句来创建表并从你的临时表中转储数据。

sqlContext.sql("create table mytable as select * from mytempTable");

回答by Daniel Darabos

Use DataFrameWriter.saveAsTable. (df.write.saveAsTable(...)) See Spark SQL and DataFrame Guide.

使用DataFrameWriter.saveAsTable. ( df.write.saveAsTable(...)) 请参阅Spark SQL 和 DataFrame 指南

回答by Alex

I don't see df.write.saveAsTable(...)deprecated in Spark 2.0 documentation. It has worked for us on Amazon EMR. We were perfectly able to read data from S3 into a dataframe, process it, create a table from the result and read it with MicroStrategy. Vinays answer has also worked though.

df.write.saveAsTable(...)在 Spark 2.0 文档中没有看到已弃用。它在 Amazon EMR 上对我们有用。我们完全能够将 S3 中的数据读取到数据帧中,对其进行处理,根据结果创建一个表并使用 MicroStrategy 读取它。不过,Vinays 的回答也奏效了。

回答by Anandkumar

you need to have/create a HiveContext

您需要拥有/创建一个 HiveContext

import org.apache.spark.sql.hive.HiveContext;

HiveContext sqlContext = new org.apache.spark.sql.hive.HiveContext(sc.sc());

Then directly save dataframe or select the columns to store as hive table

然后直接保存dataframe或者选择要存储为hive表的列

df is dataframe

df 是数据框

df.write().mode("overwrite").saveAsTable("schemaName.tableName");

or

或者

df.select(df.col("col1"),df.col("col2"), df.col("col3")) .write().mode("overwrite").saveAsTable("schemaName.tableName");

or

或者

df.write().mode(SaveMode.Overwrite).saveAsTable("dbName.tableName");

SaveModes are Append/Ignore/Overwrite/ErrorIfExists

SaveModes 是追加/忽略/覆盖/ErrorIfExists

I added here the definition for HiveContext from Spark Documentation,

我在这里添加了 Spark 文档中 HiveContext 的定义,

In addition to the basic SQLContext, you can also create a HiveContext, which provides a superset of the functionality provided by the basic SQLContext. Additional features include the ability to write queries using the more complete HiveQL parser, access to Hive UDFs, and the ability to read data from Hive tables. To use a HiveContext, you do not need to have an existing Hive setup, and all of the data sources available to a SQLContext are still available. HiveContext is only packaged separately to avoid including all of Hive's dependencies in the default Spark build.

除了基本的 SQLContext 之外,您还可以创建一个 HiveContext,它提供了基本 SQLContext 提供的功能的超集。其他功能包括使用更完整的 HiveQL 解析器编写查询的能力、对 Hive UDF 的访问以及从 Hive 表读取数据的能力。要使用 HiveContext,您不需要具有现有的 Hive 设置,并且 SQLContext 可用的所有数据源仍然可用。HiveContext 仅单独打包以避免在默认 Spark 构建中包含 Hive 的所有依赖项。



on Spark version 1.6.2, using "dbName.tableName" gives this error:

在 Spark 1.6.2 版上,使用“dbName.tableName”会出现以下错误:

org.apache.spark.sql.AnalysisException: Specifying database name or other qualifiers are not allowed for temporary tables. If the table name has dots (.) in it, please quote the table name with backticks ().`

org.apache.spark.sql.AnalysisException:不允许为临时表指定数据库名称或其他限定符。如果表名中有点(.),请用反引号()引用表名。

回答by Raktotpal Bordoloi

Saving to Hive is just a matter of using write()method of your SQLContext:

保存到 Hive 只是使用write()SQLContext 方法的问题:

df.write.saveAsTable(tableName)

See https://spark.apache.org/docs/2.1.0/api/java/org/apache/spark/sql/DataFrameWriter.html#saveAsTable(java.lang.String)

https://spark.apache.org/docs/2.1.0/api/java/org/apache/spark/sql/DataFrameWriter.html#saveAsTable(java.lang.String)

From Spark 2.2: use DataSet instead DataFrame.

从 Spark 2.2 开始:使用 DataSet 代替 DataFrame。

回答by kartik

Here is PySpark version to create Hive table from parquet file. You may have generated Parquet files using inferred schema and now want to push definition to Hive metastore. You can also push definition to the system like AWS Glue or AWS Athena and not just to Hive metastore. Here I am using spark.sql to push/create permanent table.

这是从镶木地板文件创建 Hive 表的 PySpark 版本。您可能已经使用推断模式生成了 Parquet 文件,现在想要将定义推送到 Hive 元存储。您还可以将定义推送到 AWS Glue 或 AWS Athena 等系统,而不仅仅是 Hive 元存储。在这里,我使用 spark.sql 来推送/创建永久表。

   # Location where my parquet files are present.
    df = spark.read.parquet("s3://my-location/data/")
    cols = df.dtypes
    buf = []
    buf.append('CREATE EXTERNAL TABLE test123 (')
    keyanddatatypes =  df.dtypes
    sizeof = len(df.dtypes)
    print ("size----------",sizeof)
    count=1;
    for eachvalue in keyanddatatypes:
        print count,sizeof,eachvalue
        if count == sizeof:
            total = str(eachvalue[0])+str(' ')+str(eachvalue[1])
        else:
            total = str(eachvalue[0]) + str(' ') + str(eachvalue[1]) + str(',')
        buf.append(total)
        count = count + 1

    buf.append(' )')
    buf.append(' STORED as parquet ')
    buf.append("LOCATION")
    buf.append("'")
    buf.append('s3://my-location/data/')
    buf.append("'")
    buf.append("'")
    ##partition by pt
    tabledef = ''.join(buf)

    print "---------print definition ---------"
    print tabledef
    ## create a table using spark.sql. Assuming you are using spark 2.1+
    spark.sql(tabledef);

回答by Shadowtrooper

For Hive external tables I use this function in PySpark:

对于 Hive 外部表,我在 PySpark 中使用此函数:

def save_table(sparkSession, dataframe, database, table_name, save_format="PARQUET"):
    print("Saving result in {}.{}".format(database, table_name))
    output_schema = "," \
        .join(["{} {}".format(x.name.lower(), x.dataType) for x in list(dataframe.schema)]) \
        .replace("StringType", "STRING") \
        .replace("IntegerType", "INT") \
        .replace("DateType", "DATE") \
        .replace("LongType", "INT") \
        .replace("TimestampType", "INT") \
        .replace("BooleanType", "BOOLEAN") \
        .replace("FloatType", "FLOAT")\
        .replace("DoubleType","FLOAT")
    output_schema = re.sub(r'DecimalType[(][0-9]+,[0-9]+[)]', 'FLOAT', output_schema)

    sparkSession.sql("DROP TABLE IF EXISTS {}.{}".format(database, table_name))

    query = "CREATE EXTERNAL TABLE IF NOT EXISTS {}.{} ({}) STORED AS {} LOCATION '/user/hive/{}/{}'" \
        .format(database, table_name, output_schema, save_format, database, table_name)
    sparkSession.sql(query)
    dataframe.write.insertInto('{}.{}'.format(database, table_name),overwrite = True)

回答by MD Rijwan

In my case this works fine:

在我的情况下,这很好用:

from pyspark_llap import HiveWarehouseSession
hive = HiveWarehouseSession.session(spark).build()
hive.setDatabase("DatabaseName")
df = spark.read.format("csv").option("Header",True).load("/user/csvlocation.csv")
df.write.format(HiveWarehouseSession().HIVE_WAREHOUSE_CONNECTOR).option("table",<tablename>).save()

Done!!

完毕!!

You can read the Data, let you give as "Employee"

您可以读取数据,让您以“员工”的身份给出

hive.executeQuery("select * from Employee").show()

For more details use this URL: https://docs.cloudera.com/HDPDocuments/HDP3/HDP-3.1.5/integrating-hive/content/hive-read-write-operations.html

有关更多详细信息,请使用此 URL:https: //docs.cloudera.com/HDPDocuments/HDP3/HDP-3.1.5/integrating-hive/content/hive-read-write-operations.html

回答by Harshv

Sorry writing late to the post but I see no accepted answer.

很抱歉写这篇文章晚了,但我看不到接受的答案。

df.write().saveAsTablewill throw AnalysisExceptionand is not HIVE table compatible.

df.write().saveAsTable会抛出AnalysisException并且与 HIVE 表不兼容。

Storing DF as df.write().format("hive")should do the trick!

存储 DFdf.write().format("hive")应该可以解决问题!

However, if that doesn't work, then going by the previous comments and answers, this is what is the best solution in my opinion (Open to suggestions though).

但是,如果这不起作用,那么按照之前的评论和答案,这是我认为最好的解决方案(尽管接受建议)。

Best approach is to explicitly create HIVE table (including PARTITIONED table),

最好的方法是显式创建 HIVE 表(包括 PARTITIONED 表),

def createHiveTable: Unit ={
spark.sql("CREATE TABLE $hive_table_name($fields) " +
  "PARTITIONED BY ($partition_column String) STORED AS $StorageType")
}

save DF as temp table,

将 DF 保存为临时表,

df.createOrReplaceTempView("$tempTableName")

df.createOrReplaceTempView("$tempTableName")

and insert into PARTITIONED HIVE table:

并插入到 PARTITIONED HIVE 表中:

spark.sql("insert into table default.$hive_table_name PARTITION($partition_column) select * from $tempTableName")
spark.sql("select * from default.$hive_table_name").show(1000,false)

Offcourse the LAST COLUMNin DF will be the PARTITION COLUMNso create HIVE table accordingly!

当然,DF 中的LAST COLUMN将是PARTITION COLUMN,因此相应地创建 HIVE 表!

Please comment if it works! or not.

如果有效请评论!或不。



--UPDATE--

- 更新 -

df.write()
  .partitionBy("$partition_column")
  .format("hive")
  .mode(SaveMode.append)
  .saveAsTable($new_table_name_to_be_created_in_hive)  //Table should not exist OR should be a PARTITIONED table in HIVE

回答by mrsrinivas

If you want to create a hive table(which does not exist) from a dataframe(some times it fails to create with DataFrameWriter.saveAsTable). StructType.toDDLwill helps in listing the columns as string

如果您想从数据帧创建一个 hive 表(它不存在)(有时它无法创建DataFrameWriter.saveAsTable)。StructType.toDDL将有助于将列列为字符串

val df = ...

val schemaStr = df.schema.toDDL # This gives the columns 
spark.sql(s"""create table hive_table ( ${schemaStr})""")

//Now write the dataframe to the table
df.write.saveAsTable("hive_table")

hive_tablewill be created in default space since we did not provide any database at spark.sql(). stg.hive_tablecan be used to create hive_tablein stgdatabase.

hive_table将在默认空间中创建,因为我们没有在spark.sql(). stg.hive_table可用于hive_tablestg数据库中创建。