Java Hive 中的 COLLECT_SET(),保留重复项?
声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow
原文地址: http://stackoverflow.com/questions/6445339/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me):
StackOverFlow
COLLECT_SET() in Hive, keep duplicates?
提问by batman
Is there a way to keep the duplicates in a collected set in Hive, or simulate the sort of aggregate collection that Hive provides using some other method? I want to aggregate all of the items in a column that have the same key into an array, with duplicates.
有没有办法将重复项保留在 Hive 中的收集集中,或者使用其他方法模拟 Hive 提供的那种聚合集合?我想将列中具有相同键的所有项目聚合到一个数组中,并带有重复项。
I.E.:
IE:
hash_id | num_of_cats
=====================
ad3jkfk 4
ad3jkfk 4
ad3jkfk 2
fkjh43f 1
fkjh43f 8
fkjh43f 8
rjkhd93 7
rjkhd93 4
rjkhd93 7
should return:
应该返回:
hash_agg | cats_aggregate
===========================
ad3jkfk Array<int>(4,4,2)
fkjh43f Array<int>(1,8,8)
rjkhd93 Array<int>(7,4,7)
采纳答案by Marvin W
Try to use COLLECT_LIST(col) after Hive 0.13.0
尝试在 Hive 0.13.0 之后使用 COLLECT_LIST(col)
SELECT
hash_id, COLLECT_LIST(num_of_cats) AS aggr_set
FROM
tablename
WHERE
blablabla
GROUP BY
hash_id
;
回答by Jeff Mc
There is nothing built in, but creating user defined functions, including aggregates, isn't that bad. The only rough part is trying to make them type generic, but here is a collect example.
没有任何内置功能,但是创建用户定义的函数(包括聚合)并没有那么糟糕。唯一粗糙的部分是尝试使它们类型通用,但这里有一个收集示例。
package com.example;
import java.util.ArrayList;
import org.apache.hadoop.hive.ql.exec.UDFArgumentTypeException;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.parse.SemanticException;
import org.apache.hadoop.hive.ql.udf.generic.AbstractGenericUDAFResolver;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils;
import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.StandardListObjectInspector;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
public class CollectAll extends AbstractGenericUDAFResolver
{
@Override
public GenericUDAFEvaluator getEvaluator(TypeInfo[] tis)
throws SemanticException
{
if (tis.length != 1)
{
throw new UDFArgumentTypeException(tis.length - 1, "Exactly one argument is expected.");
}
if (tis[0].getCategory() != ObjectInspector.Category.PRIMITIVE)
{
throw new UDFArgumentTypeException(0, "Only primitive type arguments are accepted but " + tis[0].getTypeName() + " was passed as parameter 1.");
}
return new CollectAllEvaluator();
}
public static class CollectAllEvaluator extends GenericUDAFEvaluator
{
private PrimitiveObjectInspector inputOI;
private StandardListObjectInspector loi;
private StandardListObjectInspector internalMergeOI;
@Override
public ObjectInspector init(Mode m, ObjectInspector[] parameters)
throws HiveException
{
super.init(m, parameters);
if (m == Mode.PARTIAL1)
{
inputOI = (PrimitiveObjectInspector) parameters[0];
return ObjectInspectorFactory
.getStandardListObjectInspector((PrimitiveObjectInspector) ObjectInspectorUtils
.getStandardObjectInspector(inputOI));
}
else
{
if (!(parameters[0] instanceof StandardListObjectInspector))
{
inputOI = (PrimitiveObjectInspector) ObjectInspectorUtils
.getStandardObjectInspector(parameters[0]);
return (StandardListObjectInspector) ObjectInspectorFactory
.getStandardListObjectInspector(inputOI);
}
else
{
internalMergeOI = (StandardListObjectInspector) parameters[0];
inputOI = (PrimitiveObjectInspector) internalMergeOI.getListElementObjectInspector();
loi = (StandardListObjectInspector) ObjectInspectorUtils.getStandardObjectInspector(internalMergeOI);
return loi;
}
}
}
static class ArrayAggregationBuffer implements AggregationBuffer
{
ArrayList<Object> container;
}
@Override
public void reset(AggregationBuffer ab)
throws HiveException
{
((ArrayAggregationBuffer) ab).container = new ArrayList<Object>();
}
@Override
public AggregationBuffer getNewAggregationBuffer()
throws HiveException
{
ArrayAggregationBuffer ret = new ArrayAggregationBuffer();
reset(ret);
return ret;
}
@Override
public void iterate(AggregationBuffer ab, Object[] parameters)
throws HiveException
{
assert (parameters.length == 1);
Object p = parameters[0];
if (p != null)
{
ArrayAggregationBuffer agg = (ArrayAggregationBuffer) ab;
agg.container.add(ObjectInspectorUtils.copyToStandardObject(p, this.inputOI));
}
}
@Override
public Object terminatePartial(AggregationBuffer ab)
throws HiveException
{
ArrayAggregationBuffer agg = (ArrayAggregationBuffer) ab;
ArrayList<Object> ret = new ArrayList<Object>(agg.container.size());
ret.addAll(agg.container);
return ret;
}
@Override
public void merge(AggregationBuffer ab, Object o)
throws HiveException
{
ArrayAggregationBuffer agg = (ArrayAggregationBuffer) ab;
ArrayList<Object> partial = (ArrayList<Object>)internalMergeOI.getList(o);
for(Object i : partial)
{
agg.container.add(ObjectInspectorUtils.copyToStandardObject(i, this.inputOI));
}
}
@Override
public Object terminate(AggregationBuffer ab)
throws HiveException
{
ArrayAggregationBuffer agg = (ArrayAggregationBuffer) ab;
ArrayList<Object> ret = new ArrayList<Object>(agg.container.size());
ret.addAll(agg.container);
return ret;
}
}
}
Then in hive, just issue add jar Whatever.jar;
and CREATE TEMPORARY FUNCTION collect_all AS 'com.example.CollectAll';
You should them be able to use it as expected.
然后在 hive 中,只需发出add jar Whatever.jar;
,CREATE TEMPORARY FUNCTION collect_all AS 'com.example.CollectAll';
您应该他们能够按预期使用它。
hive> SELECT hash_id, collect_all(num_of_cats) FROM test GROUP BY hash_id;
OK
ad3jkfk [4,4,2]
fkjh43f [1,8,8]
rjkhd93 [7,4,7]
It's worth noting that the order of the elements should be considered undefined, so if you intend to use this to feed information into n_grams you might need to expand it a bit to sort the data as needed.
值得注意的是元素的顺序应该被认为是未定义的,所以如果你打算使用它来将信息输入 n_grams 你可能需要稍微扩展它以根据需要对数据进行排序。
回答by nephtes
Modified Jeff Mc's code to remove the restriction (presumably inherited from collect_set) that input must be primitive types. This version can collect structs, maps and arrays as well as primitives.
修改了 Jeff Mc 的代码以删除输入必须是原始类型的限制(大概是从 collect_set 继承的)。这个版本可以收集结构体、映射和数组以及原语。
package com.example;
import java.util.ArrayList;
import org.apache.hadoop.hive.ql.exec.UDFArgumentTypeException;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.parse.SemanticException;
import org.apache.hadoop.hive.ql.udf.generic.AbstractGenericUDAFResolver;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils;
import org.apache.hadoop.hive.serde2.objectinspector.StandardListObjectInspector;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
public class CollectAll extends AbstractGenericUDAFResolver
{
@Override
public GenericUDAFEvaluator getEvaluator(TypeInfo[] tis)
throws SemanticException
{
if (tis.length != 1)
{
throw new UDFArgumentTypeException(tis.length - 1, "Exactly one argument is expected.");
}
return new CollectAllEvaluator();
}
public static class CollectAllEvaluator extends GenericUDAFEvaluator
{
private ObjectInspector inputOI;
private StandardListObjectInspector loi;
private StandardListObjectInspector internalMergeOI;
@Override
public ObjectInspector init(Mode m, ObjectInspector[] parameters)
throws HiveException
{
super.init(m, parameters);
if (m == Mode.PARTIAL1)
{
inputOI = parameters[0];
return ObjectInspectorFactory
.getStandardListObjectInspector(ObjectInspectorUtils
.getStandardObjectInspector(inputOI));
}
else
{
if (!(parameters[0] instanceof StandardListObjectInspector))
{
inputOI = ObjectInspectorUtils
.getStandardObjectInspector(parameters[0]);
return (StandardListObjectInspector) ObjectInspectorFactory
.getStandardListObjectInspector(inputOI);
}
else
{
internalMergeOI = (StandardListObjectInspector) parameters[0];
inputOI = internalMergeOI.getListElementObjectInspector();
loi = (StandardListObjectInspector) ObjectInspectorUtils.getStandardObjectInspector(internalMergeOI);
return loi;
}
}
}
static class ArrayAggregationBuffer implements AggregationBuffer
{
ArrayList<Object> container;
}
@Override
public void reset(AggregationBuffer ab)
throws HiveException
{
((ArrayAggregationBuffer) ab).container = new ArrayList<Object>();
}
@Override
public AggregationBuffer getNewAggregationBuffer()
throws HiveException
{
ArrayAggregationBuffer ret = new ArrayAggregationBuffer();
reset(ret);
return ret;
}
@Override
public void iterate(AggregationBuffer ab, Object[] parameters)
throws HiveException
{
assert (parameters.length == 1);
Object p = parameters[0];
if (p != null)
{
ArrayAggregationBuffer agg = (ArrayAggregationBuffer) ab;
agg.container.add(ObjectInspectorUtils.copyToStandardObject(p, this.inputOI));
}
}
@Override
public Object terminatePartial(AggregationBuffer ab)
throws HiveException
{
ArrayAggregationBuffer agg = (ArrayAggregationBuffer) ab;
ArrayList<Object> ret = new ArrayList<Object>(agg.container.size());
ret.addAll(agg.container);
return ret;
}
@Override
public void merge(AggregationBuffer ab, Object o)
throws HiveException
{
ArrayAggregationBuffer agg = (ArrayAggregationBuffer) ab;
ArrayList<Object> partial = (ArrayList<Object>)internalMergeOI.getList(o);
for(Object i : partial)
{
agg.container.add(ObjectInspectorUtils.copyToStandardObject(i, this.inputOI));
}
}
@Override
public Object terminate(AggregationBuffer ab)
throws HiveException
{
ArrayAggregationBuffer agg = (ArrayAggregationBuffer) ab;
ArrayList<Object> ret = new ArrayList<Object>(agg.container.size());
ret.addAll(agg.container);
return ret;
}
}
}
回答by jlemaitre
回答by Jai Prakash
Here is the exact hive query that does this job (works only in hive > 0.13):
这是完成这项工作的确切 hive 查询(仅适用于 hive > 0.13):
SELECT hash_id, collect_set( num_of_cats) FROM GROUP BY hash_id;
SELECT hash_id, collect_set( num_of_cats) FROM GROUP BY hash_id;
回答by mgokayla
For what it's worth (though I know this is an older post), Hive 0.13.0features a new collect_list()function that does not deduplicate.
对于它的价值(尽管我知道这是一篇较旧的帖子),Hive 0.13.0具有一个新的collect_list()函数,它不会重复数据删除。
回答by Jerome Banks
Check out the Brickhouse collect UDAF ( http://github.com/klout/brickhouse/blob/master/src/main/java/brickhouse/udf/collect/CollectUDAF.java)
查看 Brickhouse collect UDAF ( http://github.com/klout/brickhouse/blob/master/src/main/java/brickhouse/udf/collect/CollectUDAF.java)
It also supports collecting into a map. Brickhouse also contains many useful UDF's not in the standard Hive distribution.
它还支持收集到地图中。Brickhouse 还包含许多不在标准 Hive 发行版中的有用 UDF。
回答by Nikhil
Workaround to collect struct
收集结构的解决方法
suppose you have a table
假设你有一张桌子
tableWithStruct(
id string,
obj struct <a:string,b:string>)
now create another table as
现在创建另一个表
CREATE EXTERNAL TABLE tablename (
id string,
temp array<string>
)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY '\t' COLLECTION ITEMS TERMINATED BY ',' MAP KEYS TERMINATED BY '|'
insert query
插入查询
insert into table tablename select id,collect(concat_ws('|',cast(obj.a as string),cast(obj.b as string)) from tableWithStruct group by id;
now create another table at same location as tablename
现在在与tablename相同的位置创建另一个表
CREATE EXTERNAL TABLE tablename_final (
id string,
array_list array<struct<a:string,b:string>>
)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY '\t' COLLECTION ITEMS TERMINATED BY ',' MAP KEYS TERMINATED BY '|'
when you select from tablename_finalyou will get the desired output
当您从tablename_final 中选择时,您将获得所需的输出
回答by Nikhil vyas
Just wondering - if n the statemnent -
只是想知道 - 如果 n 声明 -
SELECT
hash_id, COLLECT_LIST(num_of_cats) AS aggr_set
FROM
tablename
WHERE
blablabla
GROUP BY
hash_id
;
we want to have sorting and limit the elements for num_of_cats - how to go about it ? COZ in big data we deal with PBs of datas.. we might not need all of that in such cases but top 10 or limit it .
我们想要对 num_of_cats 的元素进行排序和限制 - 怎么做?COZ 在大数据中,我们处理 PB 的数据。在这种情况下,我们可能不需要所有这些,但需要前 10 名或限制它。