scala 将 Spark 数据帧插入到 hbase 中
声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow
原文地址: http://stackoverflow.com/questions/44111988/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me):
StackOverFlow
Insert Spark dataframe into hbase
提问by Zied Hermi
I have a dataframe and I want to insert it into hbase. I follow this documenation.
我有一个数据框,我想将它插入到 hbase 中。我遵循这个文档。
This is how my dataframe look like:
这是我的数据框的样子:
--------------------
|id | name | address |
|--------------------|
|23 |marry |france |
|--------------------|
|87 |zied |italie |
--------------------
I create a hbase table using this code:
我使用以下代码创建了一个 hbase 表:
val tableName = "two"
val conf = HBaseConfiguration.create()
if(!admin.isTableAvailable(tableName)) {
print("-----------------------------------------------------------------------------------------------------------")
val tableDesc = new HTableDescriptor(tableName)
tableDesc.addFamily(new HColumnDescriptor("z1".getBytes()))
admin.createTable(tableDesc)
}else{
print("Table already exists!!--------------------------------------------------------------------------------------")
}
And now how may I insert this dataframe into hbase ?
现在如何将此数据框插入到 hbase 中?
In another example I succeed to insert into hbase using this code:
在另一个示例中,我使用以下代码成功插入到 hbase 中:
val myTable = new HTable(conf, tableName)
for (i <- 0 to 1000) {
var p = new Put(Bytes.toBytes(""+i))
p.add("z1".getBytes(), "name".getBytes(), Bytes.toBytes(""+(i*5)))
p.add("z1".getBytes(), "age".getBytes(), Bytes.toBytes("2017-04-20"))
p.add("z2".getBytes(), "job".getBytes(), Bytes.toBytes(""+i))
p.add("z2".getBytes(), "salary".getBytes(), Bytes.toBytes(""+i))
myTable.put(p)
}
myTable.flushCommits()
But now I am stuck, how to insert each record of my dataframe into my hbase table.
但是现在我被卡住了,如何将我的数据帧的每条记录插入到我的 hbase 表中。
Thank you for your time and attention
感谢您的时间和关注
回答by Capacytron
using answer for code formatting purposes Doc tells:
使用答案进行代码格式化 Doc 告诉:
sc.parallelize(data).toDF.write.options(
Map(HBaseTableCatalog.tableCatalog -> catalog, HBaseTableCatalog.newTable -> "5"))
.format("org.apache.hadoop.hbase.spark ")
.save()
where sc.parallelize(data).toDFis your DataFrame. Doc example turns scala collection to dataframe using sc.parallelize(data).toDF
其中sc.parallelize(data).toDF是您的 DataFrame。Doc 示例使用sc.parallelize(data).toDF将 scala 集合转换为数据帧
You already have your DataFrame, just try to call
您已经拥有 DataFrame,只需尝试调用
yourDataFrame.write.options(
Map(HBaseTableCatalog.tableCatalog -> catalog, HBaseTableCatalog.newTable -> "5"))
.format("org.apache.hadoop.hbase.spark ")
.save()
And it should work. Doc is pretty clear...
它应该工作。医生说的很清楚...
UPD
UPD
Given a DataFrame with specified schema, above will create an HBase table with 5 regions and save the DataFrame inside. Note that if HBaseTableCatalog.newTable is not specified, the table has to be pre-created.
给定具有指定架构的 DataFrame,上面将创建一个具有 5 个区域的 HBase 表并将 DataFrame 保存在其中。请注意,如果未指定 HBaseTableCatalog.newTable,则必须预先创建该表。
It's about data partitioning. Each HBase table can have 1...X regions. You should carefully pick number of regions. Low regions number is bad. High region numbers is also bad.
这是关于数据分区。每个 HBase 表可以有 1...X 个区域。您应该仔细选择区域数量。低区域数是不好的。高区域数字也很糟糕。
回答by varun r
An alternate is to look at rdd.saveAsNewAPIHadoopDataset, to insert the data into the hbase table.
另一种方法是查看 rdd.saveAsNewAPIHadoopDataset,将数据插入到 hbase 表中。
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder().appName("sparkToHive").enableHiveSupport().getOrCreate()
import spark.implicits._
val config = HBaseConfiguration.create()
config.set("hbase.zookeeper.quorum", "ip's")
config.set("hbase.zookeeper.property.clientPort","2181")
config.set(TableInputFormat.INPUT_TABLE, "tableName")
val newAPIJobConfiguration1 = Job.getInstance(config)
newAPIJobConfiguration1.getConfiguration().set(TableOutputFormat.OUTPUT_TABLE, "tableName")
newAPIJobConfiguration1.setOutputFormatClass(classOf[TableOutputFormat[ImmutableBytesWritable]])
val df: DataFrame = Seq(("foo", "1", "foo1"), ("bar", "2", "bar1")).toDF("key", "value1", "value2")
val hbasePuts= df.rdd.map((row: Row) => {
val put = new Put(Bytes.toBytes(row.getString(0)))
put.addColumn(Bytes.toBytes("cf1"), Bytes.toBytes("value1"), Bytes.toBytes(row.getString(1)))
put.addColumn(Bytes.toBytes("cf2"), Bytes.toBytes("value2"), Bytes.toBytes(row.getString(2)))
(new ImmutableBytesWritable(), put)
})
hbasePuts.saveAsNewAPIHadoopDataset(newAPIJobConfiguration1.getConfiguration())
}
参考:https: //sparkkb.wordpress.com/2015/05/04/save-javardd-to-hbase-using-saveasnewapihadoopdataset-spark-api-java-coding/

