Skip to content

Commit 78c0f25

Browse files
committed
remove useless code and consideration, neaten the code style
1 parent 7d22941 commit 78c0f25

File tree

6 files changed

+164
-169
lines changed

6 files changed

+164
-169
lines changed

mllib/src/main/java/org/apache/spark/mllib/input/BatchFilesRecordReader.java

Lines changed: 0 additions & 109 deletions
This file was deleted.

mllib/src/main/java/org/apache/spark/mllib/input/BatchFilesInputFormat.java renamed to mllib/src/main/java/org/apache/spark/mllib/input/WholeTextFileInputFormat.java

Lines changed: 16 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -31,22 +31,22 @@
3131

3232
/**
3333
* The specific InputFormat reads files in HDFS or local disk. It will be called by
34-
* HadoopRDD to generate new BatchFilesRecordReader.
34+
* HadoopRDD to generate new WholeTextFileRecordReader.
3535
*/
36-
public class BatchFilesInputFormat
37-
extends CombineFileInputFormat<String, Text> {
36+
public class WholeTextFileInputFormat
37+
extends CombineFileInputFormat<String, Text> {
3838

39-
@Override
40-
protected boolean isSplitable(JobContext context, Path file) {
41-
return false;
42-
}
43-
@Override
44-
public RecordReader<String, Text> createRecordReader(
45-
InputSplit split,
46-
TaskAttemptContext context) throws IOException {
47-
return new CombineFileRecordReader<String, Text>(
48-
(CombineFileSplit)split,
49-
context,
50-
(Class) BatchFilesRecordReader.class);
51-
}
39+
@Override
40+
protected boolean isSplitable(JobContext context, Path file) {
41+
return false;
42+
}
43+
@Override
44+
public RecordReader<String, Text> createRecordReader(
45+
InputSplit split,
46+
TaskAttemptContext context) throws IOException {
47+
return new CombineFileRecordReader<String, Text>(
48+
(CombineFileSplit)split,
49+
context,
50+
(Class) WholeTextFileRecordReader.class);
51+
}
5252
}
Lines changed: 103 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,103 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.mllib.input;
19+
20+
import java.io.IOException;
21+
22+
import com.google.common.io.Closeables;
23+
import org.apache.commons.io.IOUtils;
24+
import org.apache.hadoop.fs.Path;
25+
import org.apache.hadoop.fs.FSDataInputStream;
26+
import org.apache.hadoop.fs.FileSystem;
27+
import org.apache.hadoop.io.Text;
28+
import org.apache.hadoop.mapreduce.InputSplit;
29+
import org.apache.hadoop.mapreduce.lib.input.CombineFileSplit;
30+
import org.apache.hadoop.mapreduce.RecordReader;
31+
import org.apache.hadoop.mapreduce.TaskAttemptContext;
32+
33+
/**
34+
* Reads an entire file out in <filename, content> format.
35+
*/
36+
37+
public class WholeTextFileRecordReader extends RecordReader<String, Text> {
38+
private Path path;
39+
40+
private String key = null;
41+
private Text value = null;
42+
43+
private boolean processed = false;
44+
45+
private FileSystem fs;
46+
47+
public WholeTextFileRecordReader(
48+
CombineFileSplit split,
49+
TaskAttemptContext context,
50+
Integer index)
51+
throws IOException {
52+
path = split.getPath(index);
53+
fs = path.getFileSystem(context.getConfiguration());
54+
}
55+
56+
@Override
57+
public void initialize(InputSplit arg0, TaskAttemptContext arg1)
58+
throws IOException, InterruptedException {
59+
}
60+
61+
@Override
62+
public void close() throws IOException {
63+
}
64+
65+
@Override
66+
public float getProgress() throws IOException {
67+
return processed ? 1.0f : 0.0f;
68+
}
69+
70+
@Override
71+
public String getCurrentKey() throws IOException, InterruptedException {
72+
return key;
73+
}
74+
75+
@Override
76+
public Text getCurrentValue() throws IOException, InterruptedException{
77+
return value;
78+
}
79+
80+
@Override
81+
public boolean nextKeyValue() throws IOException {
82+
if (!processed) {
83+
if (key == null) {
84+
key = path.getName();
85+
}
86+
if (value == null) {
87+
value = new Text();
88+
}
89+
90+
FSDataInputStream fileIn = null;
91+
try {
92+
fileIn = fs.open(path);
93+
byte[] innerBuffer = IOUtils.toByteArray(fileIn);
94+
value.set(innerBuffer, 0, innerBuffer.length);
95+
} finally {
96+
Closeables.close(fileIn, false);
97+
}
98+
processed = true;
99+
return true;
100+
}
101+
return false;
102+
}
103+
}

mllib/src/main/scala/org/apache/spark/mllib/util/MLUtils.scala

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -20,11 +20,11 @@ package org.apache.spark.mllib.util
2020
import org.apache.hadoop.io.Text
2121
import org.jblas.DoubleMatrix
2222

23-
import org.apache.spark.SparkContext
23+
import org.apache.spark.mllib.input.WholeTextFileInputFormat
24+
import org.apache.spark.mllib.regression.LabeledPoint
2425
import org.apache.spark.rdd.RDD
26+
import org.apache.spark.SparkContext
2527
import org.apache.spark.SparkContext._
26-
import org.apache.spark.mllib.regression.LabeledPoint
27-
import org.apache.spark.mllib.input.BatchFilesInputFormat
2828

2929
/**
3030
* Helper methods to load, save and pre-process data used in ML Lib.
@@ -124,7 +124,7 @@ object MLUtils {
124124
}
125125

126126
/**
127-
* Reads a bunch of small files from HDFS, or a local file system (available on all nodes), or any
127+
* Reads a bunch of whole files from HDFS, or a local file system (available on all nodes), or any
128128
* Hadoop-supported file system URI, and return an RDD[(String, String)].
129129
*
130130
* @param path The directory you should specified, such as
@@ -133,10 +133,10 @@ object MLUtils {
133133
* @return RDD[(fileName: String, content: String)]
134134
* i.e. the first is the file name of a file, the second one is its content.
135135
*/
136-
def smallTextFiles(sc: SparkContext, path: String): RDD[(String, String)] = {
136+
def wholeTextFile(sc: SparkContext, path: String): RDD[(String, String)] = {
137137
sc.newAPIHadoopFile(
138138
path,
139-
classOf[BatchFilesInputFormat],
139+
classOf[WholeTextFileInputFormat],
140140
classOf[String],
141141
classOf[Text]).mapValues(_.toString)
142142
}

0 commit comments

Comments
 (0)