MapReduce解决大规模数据过滤问题

技术文档网 2021-04-29
问题背景

在处理网页分析-锚文本分析的时候遇到这样一个问题:网页库中数据规模近600亿, 每个网页平均出链大概有100个, 总共出链约6000亿规模。我的任务是要给出网页库中的存在网页的置信度靠前的锚文本。这里有一个问题, 如果基于6000亿的链接关系进行锚文本分析,最终算出来的结果中会有许多网页是不在网页库中的(我们网页库还没有录入),这样是极其浪费计算资源的。如果能把不在网页库中的link提前过滤掉那么对算力资源的节约很有帮助。

问题规模

6000亿链接,600亿库中网页,包含有锚文本的链接(并且链接在网页库)中的规模约20%(120亿),因此需要从6000亿中过滤掉大部分不在网页库中的链接。

解决方法

Spark方案

利用spark算子join内联的方式进行过滤, 假设6000亿的RDD rdd1(urlid, other), 网页库600亿网页的rdd2(urlid, other), 通过rdd2.join(rdd1)的方式可以过滤掉不在网页库中的网页。

存在问题

join算子会引入shuffle, 我们的数据量规模太大, shuffle严重, 造成程序挂掉。

MapReduce方案

为了解决shuffle问题, 同时完成过滤任务,将spark程序改写为mapreduce程序。解决思路:

  • 首先:将600亿的网页进行处理存储到hdfs(csv格式)字段分别为格式为(urlid, url), 文件路径记为file1; 600亿网页出链打平(6000亿结果)存储到hdfs(csv格式),字段为(urlid, anchor, url), 文件路径记为file2; 其中urlid均为url进行long型转换的结果。
  • 其次:在map阶段我们对来自file1的, 我们给其打上标签"1"; 来自file2的, 我们给其打上标签"2";
  • 最后:在reduce阶段,我们对相同key的结果进行处理, 我们判断如果同一个key的结果中,既含有标签"1"又含有标签"2"则对结果进行保留, 如果只含有标签"2"则把对应结果抛弃。

    伪代码

只给出代码主要框架,删除了具体细节(涉及工作)

package xxx;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.math.BigDecimal;
import java.security.SecureRandom;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Locale;
import java.util.Map;

/**
 * AnchorTextAnalyzer class for anchot text analyzer
 *
 * @author wangyizhong
 * @since 2019-08-21
 */
public class AnchorTextAnalyzer {
    private static final Logger logger = LoggerFactory.getLogger(AnchorTextAnalyzer.class);
    private static final byte OUTTER_SEP = 0x02;
    private static final int TOPN = 5;
    private static final int GINI_IM_NUMBER = 100;
    private static final double GINI_INIT_VALUE = 1.00000000001;
    private static final int ZH_MIN_LENGTH = 6;
    private static final int OTHER_MIN_LENGTH = 3;
    private static final int MIN_ANCHOR = 4;
    private static final String TAG_SEP = "\t";
    private static final int INDEX_SECOND = 2;
    private static final int INDEX_THREE = 3;
    private static final int INDEX_FORTH = 4;
    private static final int INDEX_FIFTH = 5;
    private static final String BLANK = " ";
    private static final int MAP_INIT_SIZE = 100;
    private static String innerSeparator;
    private static String outterSeparator;

    static {
        try {
            innerSeparator = new String(new byte[]{0x01}, "UTF-8");
            outterSeparator = new String(new byte[]{OUTTER_SEP}, "UTF-8");
        } catch (UnsupportedEncodingException ex) {
            logger.error("init innerSeparator and outterSeparator fail!!!");
        }
    }

    private static int topN = TOPN;
    private static int giniImNum = GINI_IM_NUMBER;
    private static int minAnchor = MIN_ANCHOR;
    private static String secondFilter = "off";

    private static String baseOutputPath = "";

    /**
     * Mapper
     *
     * @since 2019-08-21
     */
    public static class AnchorTextAnalyzerMapper extends Mapper<LongWritable, Text, LongWritable, Text> {
        @Override
        public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            String line = value.toString();
            String[] lists = line.split(TAG_SEP);
            long mapKey = Long.parseLong(lists[0]);
            String mapValue = "";
            if (lists.length == INDEX_FORTH) {
                String anchor = lists[1];
                String sourceUrlId = lists[INDEX_SECOND];
                String domainId = lists[INDEX_THREE];
                mapValue = anchor + innerSeparator + sourceUrlId + innerSeparator + domainId + outterSeparator + "2";
            } else if (lists.length == INDEX_SECOND) {
                mapValue = line + outterSeparator + "1";
            } else {
                mapValue = "";
            }
            context.write(new LongWritable(mapKey), new Text(mapValue));
        }
    }

    /**
     * Reducer
     *
     * @since 2019-08-21
     */
    public static class AnchorTextAnalyzerReducer extends Reducer<LongWritable, Text, LongWritable, Text> {
        private MultipleOutputs<LongWritable, Text> mos;
        private String prefix;

        /**
         * self define tuple2
         *
         * @param <TYPEA> type TYPEA
         * @param <TYPEB> type TYPEB
         * @since 2019-08-21
         */
        private static class Tuple2<TYPEA, TYPEB> {
            TYPEA first;
            TYPEB second;

            /**
             * Tuple2
             *
             * @param paramA paramA
             * @param paramB paramB
             */
            Tuple2(TYPEA paramA, TYPEB paramB) {
                first = paramA;
                second = paramB;
            }

            public TYPEA getFirst() {
                return first;
            }

            public TYPEB getSecond() {
                return second;
            }
        }

        /**
         * setup
         *
         * @param context context
         */
        @Override
        public void setup(Context context) {
            mos = new MultipleOutputs<>(context);
            prefix = getTimeStamp();
        }

        /**
         * cleanup
         *
         * @param context context
         * @throws IOException IOException
         * @throws InterruptedException InterruptedException
         */
        @Override
        public void cleanup(Context context) throws IOException, InterruptedException {
            mos.close();
        }

        /**
         * reduce
         *
         * @param key key
         * @param iterator iterator
         * @param context context
         * @throws IOException io-exception
         * @throws InterruptedException interrupted-exception
         */
        @Override
        public void reduce(LongWritable key, Iterable<Text> iterator, Context context)
            throws IOException, InterruptedException {
            // anchor, <domainId, count>
            minAnchor = Integer.parseInt(context.getConfiguration().get("minAnchor"));
            baseOutputPath = context.getConfiguration().get("baseOutputPath");
            secondFilter = context.getConfiguration().get("secondFilter");
            HashMap<String, HashMap<Long, Long>> maps = new HashMap<>(MAP_INIT_SIZE);
            // anchor, count
            HashMap<String, Long> anchorCounts = new HashMap<>(MAP_INIT_SIZE);
            boolean isBoth = false;
            for (Text line : iterator) {
                String[] tmpLists = line.toString().split(outterSeparator);
                if (tmpLists.length == INDEX_SECOND) {
                    String tag = tmpLists[1];
                    if ("1".equals(tag)) {
                        isBoth = true;
                    } else {
                        try {
                            String[] innerLists = tmpLists[0].split(innerSeparator);
                            String anchor = innerLists[0];
                            long domainId = Long.parseLong(innerLists[INDEX_SECOND]);
                            if (maps.containsKey(anchor)) {
                                HashMap<Long, Long> siteCounts = maps.get(anchor);
                                long sitecnt = siteCounts.containsKey(domainId) ? siteCounts.get(domainId) + 1 : 1L;
                                siteCounts.put(domainId, sitecnt);
                                maps.put(anchor, siteCounts);
                            } else {
                                HashMap<Long, Long> siteCounts = new HashMap<>(MAP_INIT_SIZE);
                                siteCounts.put(domainId, 1L);
                                maps.put(anchor, siteCounts);
                            }
                            long cnt = anchorCounts.containsKey(anchor) ? anchorCounts.get(anchor) + 1L : 1L;
                            anchorCounts.put(anchor, cnt);
                        } catch (NumberFormatException e) {
                            continue;
                        }
                    }
                }
            }
            if (isBoth) {
                if (anchorCounts.size() > 0) {
                   // saveSiteCountAndRatio(key, maps, anchorCounts);
                    // mos.write(key, new Text(sb.toString()), baseOutputPath + "/scoreResult/" + prefix);

                }
                // writeAnchorResult(key, context, maps, anchorCounts);
                // mos.write(key, new Text(text), baseOutputPath + "/siteSave/" + prefix);
            }
        }

    }
}
// 调用入口类
package xxx;

import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.CombineTextInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.LazyOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;

/**
 * AnchorTextAnalyzerMrMain
 *
 * @author wangyizhong1
 * @since 2019-08-23
 */
public class AnchorTextAnalyzerMrMain extends Configured implements Tool {
    private static final Logger logger = LoggerFactory.getLogger(AnchorTextAnalyzerMrMain.class);
    private static final int ARGS_LENGTH = 4;
    private static final int MIN_ANCHOR_INDEX = 2;
    private static final int SITE_COUNT_INDEX = 3;
    private static final int SECOND_FILTER_INDEX = 4;

    /**
     * main entry
     *
     * @param args args
     * @throws Exception exception
     */
    public static void main(String[] args) throws Exception {
        System.exit(ToolRunner.run(null, new AnchorTextAnalyzerMrMain(), args));
    }

    @Override
    public int run(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
        JobConf conf = new JobConf(getConf());
        conf.setJobName("AnchorTextAnalyzerMrMain");
        if (args.length > ARGS_LENGTH) {
            conf.set("minAnchor", args[MIN_ANCHOR_INDEX]);
            conf.set("baseOutputPath", args[SITE_COUNT_INDEX]);
            conf.set("secondFilter", args[SECOND_FILTER_INDEX]);
        } else {
            conf.set("minAnchor", "4");
            conf.set("siteCountPath", "/tmp/xxx");
            conf.set("secondFilter", "off");
        }

        Job job = Job.getInstance(conf);
        job.setJarByClass(AnchorTextAnalyzerMrMain.class);

        job.setOutputKeyClass(LongWritable.class);
        job.setOutputValueClass(Text.class);

        job.setMapperClass(AnchorTextAnalyzer.AnchorTextAnalyzerMapper.class);
        job.setReducerClass(AnchorTextAnalyzer.AnchorTextAnalyzerReducer.class);

        job.setInputFormatClass(CombineTextInputFormat.class);
        job.setOutputFormatClass(TextOutputFormat.class);
        LazyOutputFormat.setOutputFormatClass(job, TextOutputFormat.class);
        job.setOutputValueClass(Text.class);

        FileInputFormat.setInputPaths(job, args[0]);
        FileOutputFormat.setOutputPath(job, new Path(args[1]));

        job.waitForCompletion(true);
        return 0;
    }
}

相关文章

  1. 硅谷互联网公司的开发流程

    开发流程包括这么几个阶段: OKR 的设立; 主项目及其子项目的确立; 每个子项目的生命周期; 主项目的生命周期; 收尾、维护、复盘。 第一点,OKR 的设立 所有项目的起始,都应该从 Ro

  2. RESTful-表述性状态转移风格

    REST英文全拼:Representational State Transfer 面向资源编程 资源指的就是一类数据 产品表-&gt;就是产品资源 最重要的是如何表示一个资源 地址即

  3. 稳定性思考

    产品功能线 0-1: 当系统从无到有的时候,首要考虑的是研发效率,功能快速迭代,满足快速增长的业务需求 1-10 系统已经搭建起来,此时考虑的是系统的稳定性。 可用性:1.隔离:区分出核心和非核心功能

  4. Supervisor守护队列发邮件

    安装 CentOS: yum -y install supervisor Debien/Ubuntu适用:apt-get install supervisor 配置 修改主配置文件:vim /et

  5. 安装libsodium,让服务器支持chacha20等加密方式

    用chacha20加密方式需要安装libsodium 注意:libsodium从1.0.15开始就废弃了aes-128-ctr yum install wget m2crypto git libsod

随机推荐

  1. 硅谷互联网公司的开发流程

    开发流程包括这么几个阶段: OKR 的设立; 主项目及其子项目的确立; 每个子项目的生命周期; 主项目的生命周期; 收尾、维护、复盘。 第一点,OKR 的设立 所有项目的起始,都应该从 Ro

  2. RESTful-表述性状态转移风格

    REST英文全拼:Representational State Transfer 面向资源编程 资源指的就是一类数据 产品表-&gt;就是产品资源 最重要的是如何表示一个资源 地址即

  3. 稳定性思考

    产品功能线 0-1: 当系统从无到有的时候,首要考虑的是研发效率,功能快速迭代,满足快速增长的业务需求 1-10 系统已经搭建起来,此时考虑的是系统的稳定性。 可用性:1.隔离:区分出核心和非核心功能

  4. Supervisor守护队列发邮件

    安装 CentOS: yum -y install supervisor Debien/Ubuntu适用:apt-get install supervisor 配置 修改主配置文件:vim /et

  5. 安装libsodium,让服务器支持chacha20等加密方式

    用chacha20加密方式需要安装libsodium 注意:libsodium从1.0.15开始就废弃了aes-128-ctr yum install wget m2crypto git libsod