java Hadoop - 直接从 Mapper 写入 HBase

声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow 原文地址: http://stackoverflow.com/questions/11061854/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me): StackOverFlow

提示:将鼠标放在中文语句上可以显示对应的英文。显示中英文
时间:2020-10-31 03:38:32  来源:igfitidea点击:

Hadoop - Writing to HBase directly from the Mapper

javahadoopmapreducehbase

提问by yosi

I have a haddop job that its output should be written to HBase. I do not really needs reducer, the kind of row I would like to insert is determined in the Mapper.

我有一个 haddop 作业,它的输出应该写入 HBase。我真的不需要减速器,我想插入的行类型是在 Mapper 中确定的。

How can I use TableOutputFormat to achieve this? From all the examples I have seen the assumption is that the reducer is the one creating the Put, and that TableMapper is just for reading from HBase table.

如何使用 TableOutputFormat 来实现这一点?从我看到的所有示例中,假设是 reducer 是创建 Put 的那个,而 TableMapper 仅用于从 HBase 表中读取。

In my case the input is HDFS the output is Put to specific table, I cannot find anything in TableMapReduceUtil that can help me with that either.

在我的情况下,输入是 HDFS,输出是放置到特定表,我在 TableMapReduceUtil 中找不到任何可以帮助我的东西。

Is there any example out there that can help me with that?

有没有任何例子可以帮助我解决这个问题?

BTW, I am using the new Hadoop API

顺便说一句,我正在使用新的 Hadoop API

回答by wonsky

This is the example of reading from file and put all lines into Hbase. This example is from "Hbase: The definitive guide" and you can find it on repository. To get it just clone repo on your computer:

这是从文件中读取并将所有行放入 Hbase 的示例。此示例来自“Hbase:权威指南”,您可以在存储库中找到它。要获得它,只需在您的计算机上克隆 repo:

git clone git://github.com/larsgeorge/hbase-book.git

In this book you can also find all the explanations about the code. But if something is incomprehensible for you, feel free to ask.

在本书中,您还可以找到有关代码的所有解释。但是,如果您有什么不明白的地方,请随时提出。

`    public class ImportFromFile {
     public static final String NAME = "ImportFromFile"; 
     public enum Counters { LINES }

     static class ImportMapper
     extends Mapper<LongWritable, Text, ImmutableBytesWritable, Writable> { 
       private byte[] family = null;
       private byte[] qualifier = null;

       @Override
       protected void setup(Context context)
         throws IOException, InterruptedException {
         String column = context.getConfiguration().get("conf.column");
         byte[][] colkey = KeyValue.parseColumn(Bytes.toBytes(column));
         family = colkey[0];
         if (colkey.length > 1) {
           qualifier = colkey[1];
         }
       }

       @Override
       public void map(LongWritable offset, Text line, Context context) 
       throws IOException {
          try {
           String lineString = line.toString();
           byte[] rowkey = DigestUtils.md5(lineString); 
           Put put = new Put(rowkey);
           put.add(family, qualifier, Bytes.toBytes(lineString)); 
           context.write(new ImmutableBytesWritable(rowkey), put);
           context.getCounter(Counters.LINES).increment(1);
         } catch (Exception e) {
           e.printStackTrace();
         }
       }
     }

     private static CommandLine parseArgs(String[] args) throws ParseException { 
       Options options = new Options();
       Option o = new Option("t", "table", true,
         "table to import into (must exist)");
       o.setArgName("table-name");
       o.setRequired(true);
       options.addOption(o);
       o = new Option("c", "column", true,
         "column to store row data into (must exist)");
       o.setArgName("family:qualifier");
       o.setRequired(true);
       options.addOption(o);
       o = new Option("i", "input", true,
         "the directory or file to read from");
       o.setArgName("path-in-HDFS");
       o.setRequired(true);
       options.addOption(o);
       options.addOption("d", "debug", false, "switch on DEBUG log level");
       CommandLineParser parser = new PosixParser();
       CommandLine cmd = null;
       try {
         cmd = parser.parse(options, args);
       } catch (Exception e) {
         System.err.println("ERROR: " + e.getMessage() + "\n");
         HelpFormatter formatter = new HelpFormatter();
         formatter.printHelp(NAME + " ", options, true);
         System.exit(-1);
       }
       return cmd;
     }

     public static void main(String[] args) throws Exception {
       Configuration conf = HBaseConfiguration.create();
       String[] otherArgs =
         new GenericOptionsParser(conf, args).getRemainingArgs(); 
       CommandLine cmd = parseArgs(otherArgs);
       String table = cmd.getOptionValue("t");
       String input = cmd.getOptionValue("i");
       String column = cmd.getOptionValue("c");
       conf.set("conf.column", column);
       Job job = new Job(conf, "Import from file " + input + " into table " + table); 

            job.setJarByClass(ImportFromFile.class);
       job.setMapperClass(ImportMapper.class);
       job.setOutputFormatClass(TableOutputFormat.class);
       job.getConfiguration().set(TableOutputFormat.OUTPUT_TABLE, table);
       job.setOutputKeyClass(ImmutableBytesWritable.class);
       job.setOutputValueClass(Writable.class);
       job.setNumReduceTasks(0); 
       FileInputFormat.addInputPath(job, new Path(input));
       System.exit(job.waitForCompletion(true) ? 0 : 1);
     }
    }`

回答by Hari Menon

You just need to make the mapper output the pair. OutputFormatonly specifies how to persist the output key-values. It does not necessarily mean that the key values come from reducer. You would need to do something like this in the mapper:

您只需要让映射器输出成对。OutputFormatonly 指定如何持久化输出键值。这并不一定意味着键值来自reducer。您需要在映射器中执行以下操作:

... extends TableMapper<ImmutableBytesWritable, Put>() {
    ...
    ...
    context.write(<some key>, <some Put or Delete object>);
}