MapReduce解决大规模数据过滤问题
在处理网页分析-锚文本分析的时候遇到这样一个问题:网页库中数据规模近600亿, 每个网页平均出链大概有100个, 总共出链约6000亿规模。我的任务是要给出网页库中的存在网页的置信度靠前的锚文本。这里有一个问题, 如果基于6000亿的链接关系进行锚文本分析,最终算出来的结果中会有许多网页是不在网页库中的(我们网页库还没有录入),这样是极其浪费计算资源的。如果能把不在网页库中的link提前过滤掉那么对算力资源的节约很有帮助。
6000亿链接,600亿库中网页,包含有锚文本的链接(并且链接在网页库)中的规模约20%(120亿),因此需要从6000亿中过滤掉大部分不在网页库中的链接。
Spark方案
利用spark算子join内联的方式进行过滤, 假设6000亿的RDD rdd1(urlid, other), 网页库600亿网页的rdd2(urlid, other), 通过rdd2.join(rdd1)的方式可以过滤掉不在网页库中的网页。
存在问题
join算子会引入shuffle, 我们的数据量规模太大, shuffle严重, 造成程序挂掉。
MapReduce方案
为了解决shuffle问题, 同时完成过滤任务,将spark程序改写为mapreduce程序。解决思路:
- 首先:将600亿的网页进行处理存储到hdfs(csv格式)字段分别为格式为(urlid, url), 文件路径记为file1; 600亿网页出链打平(6000亿结果)存储到hdfs(csv格式),字段为(urlid, anchor, url), 文件路径记为file2; 其中urlid均为url进行long型转换的结果。
- 其次:在map阶段我们对来自file1的, 我们给其打上标签"1"; 来自file2的, 我们给其打上标签"2";
- 最后:在reduce阶段,我们对相同key的结果进行处理, 我们判断如果同一个key的结果中,既含有标签"1"又含有标签"2"则对结果进行保留, 如果只含有标签"2"则把对应结果抛弃。
伪代码
只给出代码主要框架,删除了具体细节(涉及工作)
package xxx;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.math.BigDecimal;
import java.security.SecureRandom;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Locale;
import java.util.Map;
/**
* AnchorTextAnalyzer class for anchot text analyzer
*
* @author wangyizhong
* @since 2019-08-21
*/
public class AnchorTextAnalyzer {
private static final Logger logger = LoggerFactory.getLogger(AnchorTextAnalyzer.class);
private static final byte OUTTER_SEP = 0x02;
private static final int TOPN = 5;
private static final int GINI_IM_NUMBER = 100;
private static final double GINI_INIT_VALUE = 1.00000000001;
private static final int ZH_MIN_LENGTH = 6;
private static final int OTHER_MIN_LENGTH = 3;
private static final int MIN_ANCHOR = 4;
private static final String TAG_SEP = "\t";
private static final int INDEX_SECOND = 2;
private static final int INDEX_THREE = 3;
private static final int INDEX_FORTH = 4;
private static final int INDEX_FIFTH = 5;
private static final String BLANK = " ";
private static final int MAP_INIT_SIZE = 100;
private static String innerSeparator;
private static String outterSeparator;
static {
try {
innerSeparator = new String(new byte[]{0x01}, "UTF-8");
outterSeparator = new String(new byte[]{OUTTER_SEP}, "UTF-8");
} catch (UnsupportedEncodingException ex) {
logger.error("init innerSeparator and outterSeparator fail!!!");
}
}
private static int topN = TOPN;
private static int giniImNum = GINI_IM_NUMBER;
private static int minAnchor = MIN_ANCHOR;
private static String secondFilter = "off";
private static String baseOutputPath = "";
/**
* Mapper
*
* @since 2019-08-21
*/
public static class AnchorTextAnalyzerMapper extends Mapper<LongWritable, Text, LongWritable, Text> {
@Override
public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String line = value.toString();
String[] lists = line.split(TAG_SEP);
long mapKey = Long.parseLong(lists[0]);
String mapValue = "";
if (lists.length == INDEX_FORTH) {
String anchor = lists[1];
String sourceUrlId = lists[INDEX_SECOND];
String domainId = lists[INDEX_THREE];
mapValue = anchor + innerSeparator + sourceUrlId + innerSeparator + domainId + outterSeparator + "2";
} else if (lists.length == INDEX_SECOND) {
mapValue = line + outterSeparator + "1";
} else {
mapValue = "";
}
context.write(new LongWritable(mapKey), new Text(mapValue));
}
}
/**
* Reducer
*
* @since 2019-08-21
*/
public static class AnchorTextAnalyzerReducer extends Reducer<LongWritable, Text, LongWritable, Text> {
private MultipleOutputs<LongWritable, Text> mos;
private String prefix;
/**
* self define tuple2
*
* @param <TYPEA> type TYPEA
* @param <TYPEB> type TYPEB
* @since 2019-08-21
*/
private static class Tuple2<TYPEA, TYPEB> {
TYPEA first;
TYPEB second;
/**
* Tuple2
*
* @param paramA paramA
* @param paramB paramB
*/
Tuple2(TYPEA paramA, TYPEB paramB) {
first = paramA;
second = paramB;
}
public TYPEA getFirst() {
return first;
}
public TYPEB getSecond() {
return second;
}
}
/**
* setup
*
* @param context context
*/
@Override
public void setup(Context context) {
mos = new MultipleOutputs<>(context);
prefix = getTimeStamp();
}
/**
* cleanup
*
* @param context context
* @throws IOException IOException
* @throws InterruptedException InterruptedException
*/
@Override
public void cleanup(Context context) throws IOException, InterruptedException {
mos.close();
}
/**
* reduce
*
* @param key key
* @param iterator iterator
* @param context context
* @throws IOException io-exception
* @throws InterruptedException interrupted-exception
*/
@Override
public void reduce(LongWritable key, Iterable<Text> iterator, Context context)
throws IOException, InterruptedException {
// anchor, <domainId, count>
minAnchor = Integer.parseInt(context.getConfiguration().get("minAnchor"));
baseOutputPath = context.getConfiguration().get("baseOutputPath");
secondFilter = context.getConfiguration().get("secondFilter");
HashMap<String, HashMap<Long, Long>> maps = new HashMap<>(MAP_INIT_SIZE);
// anchor, count
HashMap<String, Long> anchorCounts = new HashMap<>(MAP_INIT_SIZE);
boolean isBoth = false;
for (Text line : iterator) {
String[] tmpLists = line.toString().split(outterSeparator);
if (tmpLists.length == INDEX_SECOND) {
String tag = tmpLists[1];
if ("1".equals(tag)) {
isBoth = true;
} else {
try {
String[] innerLists = tmpLists[0].split(innerSeparator);
String anchor = innerLists[0];
long domainId = Long.parseLong(innerLists[INDEX_SECOND]);
if (maps.containsKey(anchor)) {
HashMap<Long, Long> siteCounts = maps.get(anchor);
long sitecnt = siteCounts.containsKey(domainId) ? siteCounts.get(domainId) + 1 : 1L;
siteCounts.put(domainId, sitecnt);
maps.put(anchor, siteCounts);
} else {
HashMap<Long, Long> siteCounts = new HashMap<>(MAP_INIT_SIZE);
siteCounts.put(domainId, 1L);
maps.put(anchor, siteCounts);
}
long cnt = anchorCounts.containsKey(anchor) ? anchorCounts.get(anchor) + 1L : 1L;
anchorCounts.put(anchor, cnt);
} catch (NumberFormatException e) {
continue;
}
}
}
}
if (isBoth) {
if (anchorCounts.size() > 0) {
// saveSiteCountAndRatio(key, maps, anchorCounts);
// mos.write(key, new Text(sb.toString()), baseOutputPath + "/scoreResult/" + prefix);
}
// writeAnchorResult(key, context, maps, anchorCounts);
// mos.write(key, new Text(text), baseOutputPath + "/siteSave/" + prefix);
}
}
}
}
// 调用入口类
package xxx;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.CombineTextInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.LazyOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
/**
* AnchorTextAnalyzerMrMain
*
* @author wangyizhong1
* @since 2019-08-23
*/
public class AnchorTextAnalyzerMrMain extends Configured implements Tool {
private static final Logger logger = LoggerFactory.getLogger(AnchorTextAnalyzerMrMain.class);
private static final int ARGS_LENGTH = 4;
private static final int MIN_ANCHOR_INDEX = 2;
private static final int SITE_COUNT_INDEX = 3;
private static final int SECOND_FILTER_INDEX = 4;
/**
* main entry
*
* @param args args
* @throws Exception exception
*/
public static void main(String[] args) throws Exception {
System.exit(ToolRunner.run(null, new AnchorTextAnalyzerMrMain(), args));
}
@Override
public int run(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
JobConf conf = new JobConf(getConf());
conf.setJobName("AnchorTextAnalyzerMrMain");
if (args.length > ARGS_LENGTH) {
conf.set("minAnchor", args[MIN_ANCHOR_INDEX]);
conf.set("baseOutputPath", args[SITE_COUNT_INDEX]);
conf.set("secondFilter", args[SECOND_FILTER_INDEX]);
} else {
conf.set("minAnchor", "4");
conf.set("siteCountPath", "/tmp/xxx");
conf.set("secondFilter", "off");
}
Job job = Job.getInstance(conf);
job.setJarByClass(AnchorTextAnalyzerMrMain.class);
job.setOutputKeyClass(LongWritable.class);
job.setOutputValueClass(Text.class);
job.setMapperClass(AnchorTextAnalyzer.AnchorTextAnalyzerMapper.class);
job.setReducerClass(AnchorTextAnalyzer.AnchorTextAnalyzerReducer.class);
job.setInputFormatClass(CombineTextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
LazyOutputFormat.setOutputFormatClass(job, TextOutputFormat.class);
job.setOutputValueClass(Text.class);
FileInputFormat.setInputPaths(job, args[0]);
FileOutputFormat.setOutputPath(job, new Path(args[1]));
job.waitForCompletion(true);
return 0;
}
}
相关文章
- 硅谷互联网公司的开发流程
开发流程包括这么几个阶段: OKR 的设立; 主项目及其子项目的确立; 每个子项目的生命周期; 主项目的生命周期; 收尾、维护、复盘。 第一点,OKR 的设立 所有项目的起始,都应该从 Ro
- RESTful-表述性状态转移风格
REST英文全拼:Representational State Transfer 面向资源编程 资源指的就是一类数据 产品表->就是产品资源 最重要的是如何表示一个资源 地址即
- 稳定性思考
产品功能线 0-1: 当系统从无到有的时候,首要考虑的是研发效率,功能快速迭代,满足快速增长的业务需求 1-10 系统已经搭建起来,此时考虑的是系统的稳定性。 可用性:1.隔离:区分出核心和非核心功能
- Supervisor守护队列发邮件
安装 CentOS: yum -y install supervisor Debien/Ubuntu适用:apt-get install supervisor 配置 修改主配置文件:vim /et
- 安装libsodium,让服务器支持chacha20等加密方式
用chacha20加密方式需要安装libsodium 注意:libsodium从1.0.15开始就废弃了aes-128-ctr yum install wget m2crypto git libsod
随机推荐
- 硅谷互联网公司的开发流程
开发流程包括这么几个阶段: OKR 的设立; 主项目及其子项目的确立; 每个子项目的生命周期; 主项目的生命周期; 收尾、维护、复盘。 第一点,OKR 的设立 所有项目的起始,都应该从 Ro
- RESTful-表述性状态转移风格
REST英文全拼:Representational State Transfer 面向资源编程 资源指的就是一类数据 产品表->就是产品资源 最重要的是如何表示一个资源 地址即
- 稳定性思考
产品功能线 0-1: 当系统从无到有的时候,首要考虑的是研发效率,功能快速迭代,满足快速增长的业务需求 1-10 系统已经搭建起来,此时考虑的是系统的稳定性。 可用性:1.隔离:区分出核心和非核心功能
- Supervisor守护队列发邮件
安装 CentOS: yum -y install supervisor Debien/Ubuntu适用:apt-get install supervisor 配置 修改主配置文件:vim /et
- 安装libsodium,让服务器支持chacha20等加密方式
用chacha20加密方式需要安装libsodium 注意:libsodium从1.0.15开始就废弃了aes-128-ctr yum install wget m2crypto git libsod