java 如何使用spark处理一系列hbase行?

声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow 原文地址: http://stackoverflow.com/questions/25189527/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me): StackOverFlow

提示:将鼠标放在中文语句上可以显示对应的英文。显示中英文
时间:2020-11-02 07:32:53  来源:igfitidea点击:

How to process a range of hbase rows using spark?

javahadoopbigdataapache-spark

提问by amitkarmakar

I am trying to use HBase as a data source for spark. So the first step turns out to be creating a RDD from a HBase table. Since Spark works with hadoop input formats, i could find a way to use all rows by creating an rdd http://www.vidyasource.com/blog/Programming/Scala/Java/Data/Hadoop/Analytics/2014/01/25/lighting-a-spark-with-hbaseBut how do we create a RDD for a range scan ?

我正在尝试使用 HBase 作为 spark 的数据源。所以第一步是从 HBase 表创建一个 RDD。由于 Spark 使用 hadoop 输入格式,我可以通过创建 rdd http://www.vidyasource.com/blog/Programming/Scala/Java/Data/Hadoop/Analytics/2014/01/25找到一种使用所有行的方法/lighting-a-spark-with-hbase但是我们如何为范围扫描创建 RDD 呢?

All suggestions are welcome.

欢迎所有建议。

采纳答案by zsxwing

Here is an example of using Scan in Spark:

以下是在 Spark 中使用 Scan 的示例:

import java.io.{DataOutputStream, ByteArrayOutputStream}
import java.lang.String
import org.apache.hadoop.hbase.client.Scan
import org.apache.hadoop.hbase.HBaseConfiguration
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.client.Result
import org.apache.hadoop.hbase.mapreduce.TableInputFormat
import org.apache.hadoop.hbase.util.Base64

def convertScanToString(scan: Scan): String = {
  val out: ByteArrayOutputStream = new ByteArrayOutputStream
  val dos: DataOutputStream = new DataOutputStream(out)
  scan.write(dos)
  Base64.encodeBytes(out.toByteArray)
}

val conf = HBaseConfiguration.create()
val scan = new Scan()
scan.setCaching(500)
scan.setCacheBlocks(false)
conf.set(TableInputFormat.INPUT_TABLE, "table_name")
conf.set(TableInputFormat.SCAN, convertScanToString(scan))
val rdd = sc.newAPIHadoopRDD(conf, classOf[TableInputFormat], classOf[ImmutableBytesWritable], classOf[Result])
rdd.count

You need to add related libraries to the Spark classpath and make sure they are compatible with your Spark. Tips: you can use hbase classpathto find them.

您需要将相关库添加到 Spark 类路径并确保它们与您的 Spark 兼容。提示:您可以使用hbase classpath查找它们。

回答by Narendra Parmar

You can set below conf

你可以在下面设置 conf

 val conf = HBaseConfiguration.create()//need to set all param for habse
 conf.set(TableInputFormat.SCAN_ROW_START, "row2");
 conf.set(TableInputFormat.SCAN_ROW_STOP, "stoprowkey");

this will load rdd only for those reocrds

这只会为那些记录加载 rdd

回答by Roman Kondakov

Here is a Java example with TableMapReduceUtil.convertScanToString(Scan scan):

这是一个Java示例 TableMapReduceUtil.convertScanToString(Scan scan):

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableInputFormat;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaSparkContext;

import java.io.IOException;

public class HbaseScan {

    public static void main(String ... args) throws IOException, InterruptedException {

        // Spark conf
        SparkConf sparkConf = new SparkConf().setMaster("local[4]").setAppName("My App");
        JavaSparkContext jsc = new JavaSparkContext(sparkConf);

        // Hbase conf
        Configuration conf = HBaseConfiguration.create();
        conf.set(TableInputFormat.INPUT_TABLE, "big_table_name");

        // Create scan
        Scan scan = new Scan();
        scan.setCaching(500);
        scan.setCacheBlocks(false);
        scan.setStartRow(Bytes.toBytes("a"));
        scan.setStopRow(Bytes.toBytes("d"));


        // Submit scan into hbase conf
        conf.set(TableInputFormat.SCAN, TableMapReduceUtil.convertScanToString(scan));

        // Get RDD
        JavaPairRDD<ImmutableBytesWritable, Result> source = jsc
                .newAPIHadoopRDD(conf, TableInputFormat.class,
                        ImmutableBytesWritable.class, Result.class);

        // Process RDD
        System.out.println(source.count());
    }
}