eclipse 由于任务尝试未能报告状态 600 秒,减少失败。杀人!解决方案?

声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow 原文地址: http://stackoverflow.com/questions/15281307/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me): StackOverFlow

提示:将鼠标放在中文语句上可以显示对应的英文。显示中英文
时间:2020-09-19 20:07:17  来源:igfitidea点击:

The reduce fails due to Task attempt failed to report status for 600 seconds. Killing! Solution?

javaeclipsehadoopmapreduceelastic-map-reduce

提问by Mahalakshmi Lakshminarayanan

The reduce phase of the job fails with:

作业的缩减阶段失败:

of failed Reduce Tasks exceeded allowed limit.

失败的 Reduce Tasks 超过了允许的限制。

The reason why each task fails is:

每个任务失败的原因是:

Task attempt_201301251556_1637_r_000005_0 failed to report status for 600 seconds. Killing!

任务尝试_201301251556_1637_r_000005_0 未能报告状态 600 秒。杀人!

Problem in detail:

问题详细:

The Map phase takes in each record which is of the format: time, rid, data.

Map 阶段接收格式为:time、rid、data 的每条记录。

The data is of the format: data element, and its count.

数据的格式为:数据元素及其计数。

eg: a,1 b,4 c,7 correseponds to the data of a record.

例如:a,1 b,4 c,7 对应于记录的数据。

The mapper outputs for each data element the data for every record. eg:

映射器为每个数据元素输出每个记录的数据。例如:

key:(time, a,), val: (rid,data) key:(time, b,), val: (rid,data) key:(time, c,), val: (rid,data)

key:(time, a,), val: (rid,data) key:(time, b,), val: (rid,data) key:(time, c,), val: (rid,data)

Every reduce receives all the data corresponding to same key from all the records. e.g: key:(time, a), val:(rid1, data) and key:(time, a), val:(rid2, data) reach the same reduce instance.

每个reduce从所有记录中接收与相同键对应的所有数据。eg: key:(time, a), val:(rid1, data) 和 key:(time, a), val:(rid2, data) 到达同一个reduce实例。

It does some processing here and outputs similar rids.

它在这里做了一些处理并输出类似的rids。

My program runs without trouble for a small dataset such as 10MB. But fails when the data increases to say 1G, with the above mentioned reason. I don't know why this happens. Please help!

对于 10MB 这样的小数据集,我的程序可以毫无问题地运行。但是当数据增加到说1G时失败,原因如上。我不知道为什么会发生这种情况。请帮忙!

Reduce code:

减少代码:

There are two classes below:

下面有两个类:

  • VCLReduce0Split
  • CoreSplit
  • VCLReduce0Split
  • CoreSplit

a. VCLReduce0SPlit

一种。 VCLReduce0SPlit

public class VCLReduce0Split extends MapReduceBase implements Reducer<Text, Text, Text, Text>{
    //  @SuppressWarnings("unchecked")
        public void reduce (Text key, Iterator<Text> values, OutputCollector<Text, Text> output, Reporter reporter) throws IOException {

            String key_str = key.toString();
            StringTokenizer stk = new StringTokenizer(key_str);
            String t = stk.nextToken();

            HashMap<String, String> hmap = new HashMap<String, String>();

            while(values.hasNext())
            {
                StringBuffer sbuf1 = new StringBuffer(); 
                String val = values.next().toString();
                StringTokenizer st = new StringTokenizer(val);

                String uid = st.nextToken();

                String data = st.nextToken();

                     int total_size = 0;

                     StringTokenizer stx = new StringTokenizer(data,"|");

                     StringBuffer sbuf = new StringBuffer();

                     while(stx.hasMoreTokens())
                     {
                         String data_part = stx.nextToken();
                         String data_freq = stx.nextToken();

                    //   System.out.println("data_part:----->"+data_part+" data_freq:----->"+data_freq);
                         sbuf.append(data_part);
                         sbuf.append("|");
                         sbuf.append(data_freq);
                         sbuf.append("|");
                     }
                /*     
                     for(int i = 0; i<parts.length-1; i++)
                     {
                         System.out.println("data:--------------->"+data);
                         int part_size = Integer.parseInt(parts[i+1]);
                         sbuf.append(parts[i]);
                         sbuf.append("|");
                         sbuf.append(part_size);
                         sbuf.append("|");
                         total_size = part_size+total_size;
                         i++;
                     }*/

                sbuf1.append(String.valueOf(total_size));
                sbuf1.append(",");
                sbuf1.append(sbuf);
                if(uid.equals("203664471")){
                //  System.out.println("data:--------------------------->"+data+" tot_size:---->"+total_size+" sbuf:------->"+sbuf);
                }
                hmap.put(uid, sbuf1.toString());

            }

            float threshold = (float)0.8;

            CoreSplit obj = new CoreSplit();


            ArrayList<CustomMapSimilarity> al = obj.similarityCalculation(t, hmap, threshold);

            for(int i = 0; i<al.size(); i++)
            {
                CustomMapSimilarity cmaps = al.get(i);
                String xy_pair = cmaps.getRIDPair();
                String similarity = cmaps.getSimilarity();
                output.collect(new Text(xy_pair), new Text(similarity));
            }


         }
    }

b. coreSplit

coreSplit

package com.a;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Set;
import java.util.StringTokenizer;
import java.util.TreeMap;

import org.apache.commons.collections.map.MultiValueMap;

public class PPJoinPlusCoreOptNewSplit{


     public ArrayList<CustomMapSimilarity> similarityCalculation(String time, HashMap<String,String>hmap, float t)
     {

         ArrayList<CustomMapSimilarity> als = new ArrayList<CustomMapSimilarity>();
         ArrayList<CustomMapSimilarity> alsim = new ArrayList<CustomMapSimilarity>();

        Iterator<String> iter = hmap.keySet().iterator();

        MultiValueMap index = new MultiValueMap();

        String RID;
        TreeMap<String, Integer> hmap2;
        Iterator<String> iter1;

        int size;
        float prefix_size;
        HashMap<String, Float> alpha;
        HashMap<String, CustomMapOverlap> hmap_overlap;

        String data;

        while(iter.hasNext())
            {
                RID = (String)iter.next();

                String data_val = hmap.get(RID);

                StringTokenizer st = new StringTokenizer(data_val,",");
            //    System.out.println("data_val:--**********-->"+data_val+" RID:------------>"+RID+" time::---?"+time);
                String RIDsize = st.nextToken();
                size = Integer.parseInt(RIDsize);
                data = st.nextToken();


                StringTokenizer st1 = new StringTokenizer(data,"\|");


                String[] parts = data.split("\|");

            //  hmap2 = (TreeMap<String, Integer>)hmap.get(RID);
        //      iter1 = hmap2.keySet().iterator();

            //  size = hmap_size.get(RID);

                prefix_size = (float)(size-(0.8*size)+1); 

                if(size==1)
                {
                    prefix_size = 1;
                }

                alpha = new HashMap<String, Float>();

                hmap_overlap = new HashMap<String, CustomMapOverlap>();

        //      Iterator<String> iter2 = hmap2.keySet().iterator();

                int prefix_index = 0;

                int pi=0;

                for(float j = 0; j<=prefix_size; j++)
                {

                    boolean prefix_chk = false;
                    prefix_index++;
                    String ptoken = parts[pi];
            //      System.out.println("data:---->"+data+" ptoken:---->"+ptoken);
                    float val = Float.parseFloat(parts[pi+1]);
                    float temp_j = j;
                     j = j+val;
                     boolean j_l = false ;
                     float prefix_contri = 0;
                     pi= pi+2;

                     if(j>prefix_size)
                        {

                            // prefix_contri = j-temp_j;
                             prefix_contri = prefix_size-temp_j;

                            if(prefix_contri>0)
                            {
                                 j_l = true;
                                 prefix_chk = false;

                            }
                            else
                            {
                                prefix_chk = true;                              
                            }
                        }                   


                    if(prefix_chk == false){


                        filters(index, ptoken, RID, hmap,t, size, val, j_l, alpha, hmap_overlap, j, prefix_contri);


                    CustomMapPrefixTokens cmapt = new CustomMapPrefixTokens(RID,j);
                    index.put(ptoken, cmapt);

                }

            }


                als = calcSimilarity(time, RID, hmap, alpha, hmap_overlap);

                for(int i = 0; i<als.size(); i++)
                {
                    if(als.get(i).getRIDPair()!=null)
                    {
                        alsim.add(als.get(i));

                    }
                }

            }

         return alsim;

     }


     public void filters(MultiValueMap index, String ptoken, String RID, HashMap<String, String> hmap, float t, int size, float val, boolean j_l, HashMap<String, Float> alpha, HashMap<String, CustomMapOverlap> hmap_overlap, float j, float prefix_contri)
     {
            @SuppressWarnings("unchecked")

            ArrayList<CustomMapPrefixTokens> positions_list = (ArrayList<CustomMapPrefixTokens>) index.get(ptoken);

            if((positions_list!=null) &&(positions_list.size()!=0))
            {

                CustomMapPrefixTokens cmapt ;
                String y;
                Iterator<String> iter3;
                int y_size = 0;
                float check_size = 0;
            //  TreeMap<String, Integer> hmapy;
                float RID_val=0;
                float y_overlap = 0;
                float ubound = 0;
                ArrayList<Float> fl = new ArrayList<Float>();

              StringTokenizer st;

            for(int k = 0; k<positions_list.size(); k++)
            {
                cmapt = positions_list.get(k);

                if(!cmapt.getRID().equals(RID))
                {

                 y = hmap.get(cmapt.getRID());

                // iter3 = y.keySet().iterator();

                 String yRID = cmapt.getRID();

                 st = new StringTokenizer(y,",");

                 y_size = Integer.parseInt(st.nextToken());

                 check_size = (float)0.8*(size);

                if(y_size>=check_size)
                {

                    //hmapy = hmap.get(yRID);

                    String y_data = st.nextToken();

                    StringTokenizer st1 = new StringTokenizer(y_data,"\|");


                    while(st1.hasMoreTokens())
                    {
                        String token = st1.nextToken();
                        if(token.equals(ptoken))
                        {

                            String nxt_token = st1.nextToken();
                    //      System.out.println("ydata:--->"+y_data+" nxt_token:--->"+nxt_token);
                            RID_val = (float)Integer.parseInt(nxt_token);
                            break;
                        }
                    }

                 //    RID_val = (float) hmapy.get(ptoken); 
                     float alpha1 = (float)(0.8/1.8)*(size+y_size);

                     fl = overlapCalc(alpha1, size, y_size, cmapt, j, alpha, j_l,RID_val,val,prefix_contri);

                     ubound = fl.get(0);
                     y_overlap = fl.get(1);


                    positionFilter(ubound, alpha1, cmapt, y_overlap, hmap_overlap);

                  }

                }   
            }
        }



     }


   public void positionFilter( float ubound,float alpha1, CustomMapPrefixTokens cmapt, float y_overlap, HashMap<String, CustomMapOverlap> hmap_overlap)
   {

     float y_overlap_total = 0;

            if(null!=hmap_overlap.get(cmapt.getRID()))
            {

            y_overlap_total = hmap_overlap.get(cmapt.getRID()).getOverlap();

            if((y_overlap_total+ubound)>=alpha1)
            {

                CustomMapOverlap cmap_tmp = hmap_overlap.get(cmapt.getRID());

                float y_o_t = y_overlap+y_overlap_total;

                cmap_tmp.setOverlap(y_o_t);
                hmap_overlap.put(cmapt.getRID(),cmap_tmp);

            }
            else
            {
                float n = 0;
                hmap_overlap.put(cmapt.getRID(), new CustomMapOverlap(cmapt.getRID(),n));
            }

            }
            else
            {
                CustomMapOverlap cmap_tmp = new CustomMapOverlap(cmapt.getRID(),y_overlap);
                hmap_overlap.put(cmapt.getRID(), cmap_tmp);

            }

   }

   public ArrayList<Float> overlapCalc(float alpha1, int size, int y_size, CustomMapPrefixTokens cmapt, float j, HashMap<String, Float> alpha, boolean j_l, float RID_val, float val, float prefix_contri )
   {

            alpha.put(cmapt.getRID(), alpha1);
            float min1 = y_size-cmapt.getPosition();
            float min2 = size-j;
            float min = 0;

            float y_overlap = 0;

            if(min1<min2)
            {
                min = min1;
            }
            else
            {
                min = min2;
            }
            if(j_l==true)
            {
                val = prefix_contri;    
            }                                       
            if(RID_val<val)
            {
                y_overlap = RID_val;
            }
            else
            {
                y_overlap = val;
            }

            float ubound = y_overlap+min;

            ArrayList<Float> fl = new ArrayList<Float>();
            fl.add(ubound);
            fl.add(y_overlap);

            return fl;

   }


     public ArrayList<CustomMapSimilarity> calcSimilarity( String time, String RID, HashMap<String,String> hmap , HashMap<String, Float> alpha, HashMap<String, CustomMapOverlap> hmap_overlap)
     {

         float jaccard = 0;

         CustomMapSimilarity cms = new CustomMapSimilarity(null, null);   
         ArrayList<CustomMapSimilarity> alsim = new ArrayList<CustomMapSimilarity>();

        Iterator<String> iter = hmap_overlap.keySet().iterator();

        while(iter.hasNext())
        {
            String key = (String)iter.next();

            CustomMapOverlap val = (CustomMapOverlap)hmap_overlap.get(key);

            float overlap = (float)val.getOverlap();

            if(overlap>0)
            {

               String yRID = val.getRID();

              String RIDpair = RID+" "+yRID;

             jaccard = unionIntersection(hmap, RIDpair);

             if(jaccard>0.8)
                {
                    cms = new CustomMapSimilarity(time+" "+RIDpair, String.valueOf(jaccard));
                    alsim.add(cms);
                }

            }

        }

         return alsim;

     }


     public float unionIntersection( HashMap<String,String> hmap, String RIDpair)
     {


            StringTokenizer st = new StringTokenizer(RIDpair);

            String xRID = st.nextToken();

            String yRID = st.nextToken();

            String xdata = hmap.get(xRID);

            String ydata = hmap.get(yRID);


            int total_union = 0;

            int xval = 0;
            int yval = 0;
            int part_union = 0;

            int total_intersect = 0;

        //  System.out.println("xdata:------*************>"+xdata);

            StringTokenizer xtokenizer = new StringTokenizer(xdata,",");
            StringTokenizer ytokenizer = new StringTokenizer(ydata,",");
        //  String[] xpart = xdata.split(",");
        //  String[] ypart = ydata.split(",");

            xtokenizer.nextToken();
            ytokenizer.nextToken();

            String datax = xtokenizer.nextToken();
            String datay = ytokenizer.nextToken();


            HashMap<String,Integer> x = new HashMap<String, Integer>();
            HashMap<String,Integer> y = new HashMap<String, Integer>();


            String [] xparts;

                 xparts = datax.toString().split("\|");


              String [] yparts;

                 yparts = datay.toString().split("\|");


                 for(int i = 0; i<xparts.length-1; i++)
                 {
                     int part_size = Integer.parseInt(xparts[i+1]);
                     x.put(xparts[i], part_size);

                     i++;
                 }

                 for(int i = 0; i<yparts.length-1; i++)
                 {
                     int part_size = Integer.parseInt(yparts[i+1]);
                     y.put(xparts[i], part_size);

                     i++;
                 }


             Set<String> xset = x.keySet();
             Set<String> yset = y.keySet();

            for(String elm:xset )
            {

                yval = 0;

                xval = (Integer)x.get(elm);

                part_union = 0;
                int part_intersect = 0;
                if(yset.contains(elm)){

                    yval = (Integer) y.get(elm);

                if(xval>yval)
                {
                    part_union = xval;
                    part_intersect = yval;
                }
                else
                {
                    part_union = yval;
                    part_intersect = xval;
                }
                total_intersect = total_intersect+part_intersect;
                }
                else
                {
                    part_union = xval;
                }

                total_union = total_union+part_union;


            }


            for(String elm: yset)
            {
                part_union = 0;

                if(!xset.contains(elm))
                {
                    part_union = (Integer) y.get(elm);
                    total_union = total_union+part_union;
                }

            }

            float jaccard = (float)total_intersect/total_union;

         return jaccard;

     }

}

回答by harpun

The reason for the timeouts might be a long-running computation in your reducer without reporting the progress back to the Hadoop framework. This can be resolved using different approaches:

超时的原因可能是在您的 reducer 中进行了长时间运行的计算,而没有将进度报告回 Hadoop 框架。这可以使用不同的方法解决:

I. Increasing the timeout in mapred-site.xml:

I. 增加超时时间mapred-site.xml

<property>
  <name>mapred.task.timeout</name>
  <value>1200000</value>
</property>

The default is 600000 ms = 600 seconds.

默认为600000 ms = 600 seconds

II. Reporting progress every x records as in the Reducer example in javadoc:

二、报告每个 x 记录的进度,如 javadoc中的Reducer 示例

public void reduce(K key, Iterator<V> values,
                          OutputCollector<K, V> output, 
                          Reporter reporter) throws IOException {
   // report progress
   if ((noValues%10) == 0) {
     reporter.progress();
   }

   // ...
}

optionally you can increment a custom counter as in the example:

您可以选择增加自定义计数器,如示例所示

reporter.incrCounter(NUM_RECORDS, 1);

回答by Amar

It's possible that you might have consumed all of Java's heap space or GC is happening too frequently giving no chance to the reducer to report status to master and is hence killed.

您可能已经消耗了 Java 的所有堆空间,或者 GC 发生得太频繁,使 reducer 没有机会向 master 报告状态,因此被杀死。

Another possibility is that one of the reducer is getting too skewed data, i.e. for a particular rid, a lot of records are there.

另一种可能性是其中一个reducer 得到的数据过于倾斜,即对于特定的rid,有很多记录。

Try to increase your java heap by setting the following config: mapred.child.java.opts

尝试通过设置以下配置来增加 Java 堆: mapred.child.java.opts

to

-Xmx2048m

-Xmx2048m

Also, try and reduce the number of parallel reducers by setting the following config to a lower value than what it currently has (default value is 2):

此外,尝试通过将以下配置设置为比当前具有的值(默认值为2)更低的值来减少并行减速器的数量:

mapred.tasktracker.reduce.tasks.maximum

mapred.tasktracker.reduce.tasks.maximum