Multiple lines of text to a single map
You have to implement your own input format. You also have the possibility to define your own record reader then.
Unfortunately you have to define a getSplits()-method. In my opinion this will be harder than implementing the record reader: This method has to implement a logic to chunk the input data.
See the following excerpt from "Hadoop - The definitive guide" (a great book I would always recommend!):
Here’s the interface:
public interface InputFormat<K, V> {
InputSplit[] getSplits(JobConf job, int numSplits) throws IOException;
RecordReader<K, V> getRecordReader(InputSplit split,
JobConf job,
Reporter reporter) throws IOException;
}
The JobClient calls the getSplits() method, passing the desired number of map tasks as the numSplits argument. This number is treated as a hint, as InputFormat imple- mentations are free to return a different number of splits to the number specified in numSplits. Having calculated the splits, the client sends them to the jobtracker, which uses their storage locations to schedule map tasks to process them on the tasktrackers.
On a tasktracker, the map task passes the split to the getRecordReader() method on InputFormat to obtain a RecordReader for that split. A RecordReader is little more than an iterator over records, and the map task uses one to generate record key-value pairs, which it passes to the map function. A code snippet (based on the code in MapRunner) illustrates the idea:
K key = reader.createKey();
V value = reader.createValue();
while (reader.next(key, value)) {
mapper.map(key, value, output, reporter);
}
I solved this problem recently by simply creating my own InputFormat that overrides NLineInputFormat and implements a custom MultiLineRecordReader instead of the default LineReader.
I chose to extend NLineInputFormat because I wanted to have the same guarantee of having exactly N line(s) per split.
This record reader is taken almost as is from http://bigdatacircus.com/2012/08/01/wordcount-with-custom-record-reader-of-textinputformat/
The only things I modified is the property for maxLineLength
that now uses the new API, and the value for NLINESTOPROCESS
that gets read from NLineInputFormat's setNumLinesPerSplit()
insead of being hardcoded (for more flexibility).
Here is the result:
public class MultiLineInputFormat extends NLineInputFormat{
@Override
public RecordReader<LongWritable, Text> createRecordReader(InputSplit genericSplit, TaskAttemptContext context) {
context.setStatus(genericSplit.toString());
return new MultiLineRecordReader();
}
public static class MultiLineRecordReader extends RecordReader<LongWritable, Text>{
private int NLINESTOPROCESS;
private LineReader in;
private LongWritable key;
private Text value = new Text();
private long start =0;
private long end =0;
private long pos =0;
private int maxLineLength;
@Override
public void close() throws IOException {
if (in != null) {
in.close();
}
}
@Override
public LongWritable getCurrentKey() throws IOException,InterruptedException {
return key;
}
@Override
public Text getCurrentValue() throws IOException, InterruptedException {
return value;
}
@Override
public float getProgress() throws IOException, InterruptedException {
if (start == end) {
return 0.0f;
}
else {
return Math.min(1.0f, (pos - start) / (float)(end - start));
}
}
@Override
public void initialize(InputSplit genericSplit, TaskAttemptContext context)throws IOException, InterruptedException {
NLINESTOPROCESS = getNumLinesPerSplit(context);
FileSplit split = (FileSplit) genericSplit;
final Path file = split.getPath();
Configuration conf = context.getConfiguration();
this.maxLineLength = conf.getInt("mapreduce.input.linerecordreader.line.maxlength",Integer.MAX_VALUE);
FileSystem fs = file.getFileSystem(conf);
start = split.getStart();
end= start + split.getLength();
boolean skipFirstLine = false;
FSDataInputStream filein = fs.open(split.getPath());
if (start != 0){
skipFirstLine = true;
--start;
filein.seek(start);
}
in = new LineReader(filein,conf);
if(skipFirstLine){
start += in.readLine(new Text(),0,(int)Math.min((long)Integer.MAX_VALUE, end - start));
}
this.pos = start;
}
@Override
public boolean nextKeyValue() throws IOException, InterruptedException {
if (key == null) {
key = new LongWritable();
}
key.set(pos);
if (value == null) {
value = new Text();
}
value.clear();
final Text endline = new Text("\n");
int newSize = 0;
for(int i=0;i<NLINESTOPROCESS;i++){
Text v = new Text();
while (pos < end) {
newSize = in.readLine(v, maxLineLength,Math.max((int)Math.min(Integer.MAX_VALUE, end-pos),maxLineLength));
value.append(v.getBytes(),0, v.getLength());
value.append(endline.getBytes(),0, endline.getLength());
if (newSize == 0) {
break;
}
pos += newSize;
if (newSize < maxLineLength) {
break;
}
}
}
if (newSize == 0) {
key = null;
value = null;
return false;
} else {
return true;
}
}
}
}