Skip to content

Commit f664552

Browse files
authored
HBASE-27904: A random data generator tool leveraging hbase bulk load (#5294)
1 parent cfa3f13 commit f664552

File tree

6 files changed

+724
-0
lines changed

6 files changed

+724
-0
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package org.apache.hadoop.hbase.util.bulkdatagenerator;
19+
20+
import java.io.IOException;
21+
import java.util.ArrayList;
22+
import java.util.List;
23+
import org.apache.hadoop.io.NullWritable;
24+
import org.apache.hadoop.io.Text;
25+
import org.apache.hadoop.mapreduce.InputFormat;
26+
import org.apache.hadoop.mapreduce.InputSplit;
27+
import org.apache.hadoop.mapreduce.JobContext;
28+
import org.apache.hadoop.mapreduce.RecordReader;
29+
import org.apache.hadoop.mapreduce.TaskAttemptContext;
30+
31+
import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
32+
33+
public class BulkDataGeneratorInputFormat extends InputFormat<Text, NullWritable> {
34+
35+
public static final String MAPPER_TASK_COUNT_KEY =
36+
BulkDataGeneratorInputFormat.class.getName() + "mapper.task.count";
37+
38+
@Override
39+
public List<InputSplit> getSplits(JobContext job) throws IOException {
40+
// Get the number of mapper tasks configured
41+
int mapperCount = job.getConfiguration().getInt(MAPPER_TASK_COUNT_KEY, -1);
42+
Preconditions.checkArgument(mapperCount > 1, MAPPER_TASK_COUNT_KEY + " is not set.");
43+
44+
// Create a number of input splits equal to the number of mapper tasks
45+
ArrayList<InputSplit> splits = new ArrayList<InputSplit>();
46+
for (int i = 0; i < mapperCount; ++i) {
47+
splits.add(new BulkDataGeneratorInputSplit());
48+
}
49+
return splits;
50+
}
51+
52+
@Override
53+
public RecordReader<Text, NullWritable> createRecordReader(InputSplit split,
54+
TaskAttemptContext context) throws IOException, InterruptedException {
55+
BulkDataGeneratorRecordReader bulkDataGeneratorRecordReader =
56+
new BulkDataGeneratorRecordReader();
57+
bulkDataGeneratorRecordReader.initialize(split, context);
58+
return bulkDataGeneratorRecordReader;
59+
}
60+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package org.apache.hadoop.hbase.util.bulkdatagenerator;
19+
20+
import java.io.DataInput;
21+
import java.io.DataOutput;
22+
import java.io.IOException;
23+
import org.apache.hadoop.io.Writable;
24+
import org.apache.hadoop.mapreduce.InputSplit;
25+
26+
/**
27+
* Dummy input split to be used by {@link BulkDataGeneratorRecordReader}
28+
*/
29+
public class BulkDataGeneratorInputSplit extends InputSplit implements Writable {
30+
31+
@Override
32+
public void readFields(DataInput arg0) throws IOException {
33+
}
34+
35+
@Override
36+
public void write(DataOutput arg0) throws IOException {
37+
}
38+
39+
@Override
40+
public long getLength() throws IOException, InterruptedException {
41+
return 0;
42+
}
43+
44+
@Override
45+
public String[] getLocations() throws IOException, InterruptedException {
46+
return new String[0];
47+
}
48+
}
Lines changed: 138 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,138 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package org.apache.hadoop.hbase.util.bulkdatagenerator;
19+
20+
import java.io.IOException;
21+
import java.math.BigDecimal;
22+
import java.util.List;
23+
import java.util.Map;
24+
import java.util.Random;
25+
import org.apache.commons.math3.util.Pair;
26+
import org.apache.hadoop.conf.Configuration;
27+
import org.apache.hadoop.hbase.KeyValue;
28+
import org.apache.hadoop.hbase.client.Admin;
29+
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
30+
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
31+
import org.apache.hadoop.io.NullWritable;
32+
import org.apache.hadoop.io.Text;
33+
import org.apache.hadoop.mapreduce.Mapper;
34+
35+
import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
36+
import org.apache.hbase.thirdparty.com.google.common.collect.Maps;
37+
38+
public class BulkDataGeneratorMapper
39+
extends Mapper<Text, NullWritable, ImmutableBytesWritable, KeyValue> {
40+
41+
/** Counter enumeration to count number of rows generated. */
42+
public static enum Counters {
43+
ROWS_GENERATED
44+
}
45+
46+
public static final String SPLIT_COUNT_KEY =
47+
BulkDataGeneratorMapper.class.getName() + "split.count";
48+
49+
private static final String ORG_ID = "00D000000000062";
50+
private static final int MAX_EVENT_ID = Integer.MAX_VALUE;
51+
private static final int MAX_VEHICLE_ID = 100;
52+
private static final int MAX_SPEED_KPH = 140;
53+
private static final int NUM_LOCATIONS = 10;
54+
private static int splitCount = 1;
55+
private static final Random random = new Random(System.currentTimeMillis());
56+
private static final Map<String, Pair<BigDecimal, BigDecimal>> LOCATIONS =
57+
Maps.newHashMapWithExpectedSize(NUM_LOCATIONS);
58+
private static final List<String> LOCATION_KEYS = Lists.newArrayListWithCapacity(NUM_LOCATIONS);
59+
static {
60+
LOCATIONS.put("Belém", new Pair<>(BigDecimal.valueOf(-01.45), BigDecimal.valueOf(-48.48)));
61+
LOCATIONS.put("Brasília", new Pair<>(BigDecimal.valueOf(-15.78), BigDecimal.valueOf(-47.92)));
62+
LOCATIONS.put("Campinas", new Pair<>(BigDecimal.valueOf(-22.90), BigDecimal.valueOf(-47.05)));
63+
LOCATIONS.put("Cuiaba", new Pair<>(BigDecimal.valueOf(-07.25), BigDecimal.valueOf(-58.42)));
64+
LOCATIONS.put("Manaus", new Pair<>(BigDecimal.valueOf(-03.10), BigDecimal.valueOf(-60.00)));
65+
LOCATIONS.put("Porto Velho",
66+
new Pair<>(BigDecimal.valueOf(-08.75), BigDecimal.valueOf(-63.90)));
67+
LOCATIONS.put("Recife", new Pair<>(BigDecimal.valueOf(-08.10), BigDecimal.valueOf(-34.88)));
68+
LOCATIONS.put("Rio de Janeiro",
69+
new Pair<>(BigDecimal.valueOf(-22.90), BigDecimal.valueOf(-43.23)));
70+
LOCATIONS.put("Santarém", new Pair<>(BigDecimal.valueOf(-02.43), BigDecimal.valueOf(-54.68)));
71+
LOCATIONS.put("São Paulo", new Pair<>(BigDecimal.valueOf(-23.53), BigDecimal.valueOf(-46.62)));
72+
LOCATION_KEYS.addAll(LOCATIONS.keySet());
73+
}
74+
75+
final static byte[] COLUMN_FAMILY_BYTES = Utility.COLUMN_FAMILY.getBytes();
76+
77+
/** {@inheritDoc} */
78+
@Override
79+
protected void setup(Context context) throws IOException, InterruptedException {
80+
Configuration c = context.getConfiguration();
81+
splitCount = c.getInt(SPLIT_COUNT_KEY, 1);
82+
}
83+
84+
/**
85+
* Generates a single record based on value set to the key by
86+
* {@link BulkDataGeneratorRecordReader#getCurrentKey()}.
87+
* {@link Utility.TableColumnNames#TOOL_EVENT_ID} is first part of row key. Keeping first
88+
* {@link Utility#SPLIT_PREFIX_LENGTH} characters as index of the record to be generated ensures
89+
* that records are equally distributed across all regions of the table since region boundaries
90+
* are generated in similar fashion. Check {@link Utility#createTable(Admin, String, int, Map)}
91+
* method for region split info.
92+
* @param key - The key having index of next record to be generated
93+
* @param value - Value associated with the key (not used)
94+
* @param context - Context of the mapper container
95+
*/
96+
@Override
97+
protected void map(Text key, NullWritable value, Context context)
98+
throws IOException, InterruptedException {
99+
100+
int recordIndex = Integer.parseInt(key.toString());
101+
102+
// <6-characters-region-boundary-prefix>_<15-random-chars>_<record-index-for-this-mapper-task>
103+
final String toolEventId =
104+
String.format("%0" + Utility.SPLIT_PREFIX_LENGTH + "d", recordIndex % (splitCount + 1)) + "_"
105+
+ EnvironmentEdgeManager.currentTime() + (1e14 + (random.nextFloat() * 9e13)) + "_"
106+
+ recordIndex;
107+
final String eventId = String.valueOf(Math.abs(random.nextInt(MAX_EVENT_ID)));
108+
final String vechileId = String.valueOf(Math.abs(random.nextInt(MAX_VEHICLE_ID)));
109+
final String speed = String.valueOf(Math.abs(random.nextInt(MAX_SPEED_KPH)));
110+
final String location = LOCATION_KEYS.get(random.nextInt(NUM_LOCATIONS));
111+
final Pair<BigDecimal, BigDecimal> coordinates = LOCATIONS.get(location);
112+
final BigDecimal latitude = coordinates.getFirst();
113+
final BigDecimal longitude = coordinates.getSecond();
114+
115+
final ImmutableBytesWritable hKey =
116+
new ImmutableBytesWritable(String.format("%s:%s", toolEventId, ORG_ID).getBytes());
117+
addKeyValue(context, hKey, Utility.TableColumnNames.ORG_ID, ORG_ID);
118+
addKeyValue(context, hKey, Utility.TableColumnNames.TOOL_EVENT_ID, toolEventId);
119+
addKeyValue(context, hKey, Utility.TableColumnNames.EVENT_ID, eventId);
120+
addKeyValue(context, hKey, Utility.TableColumnNames.VEHICLE_ID, vechileId);
121+
addKeyValue(context, hKey, Utility.TableColumnNames.SPEED, speed);
122+
addKeyValue(context, hKey, Utility.TableColumnNames.LATITUDE, latitude.toString());
123+
addKeyValue(context, hKey, Utility.TableColumnNames.LONGITUDE, longitude.toString());
124+
addKeyValue(context, hKey, Utility.TableColumnNames.LOCATION, location);
125+
addKeyValue(context, hKey, Utility.TableColumnNames.TIMESTAMP,
126+
String.valueOf(EnvironmentEdgeManager.currentTime()));
127+
128+
context.getCounter(Counters.ROWS_GENERATED).increment(1);
129+
}
130+
131+
private void addKeyValue(final Context context, ImmutableBytesWritable key,
132+
final Utility.TableColumnNames columnName, final String value)
133+
throws IOException, InterruptedException {
134+
KeyValue kv =
135+
new KeyValue(key.get(), COLUMN_FAMILY_BYTES, columnName.getColumnName(), value.getBytes());
136+
context.write(key, kv);
137+
}
138+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,75 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package org.apache.hadoop.hbase.util.bulkdatagenerator;
19+
20+
import java.io.IOException;
21+
import org.apache.hadoop.io.NullWritable;
22+
import org.apache.hadoop.io.Text;
23+
import org.apache.hadoop.mapreduce.InputSplit;
24+
import org.apache.hadoop.mapreduce.RecordReader;
25+
import org.apache.hadoop.mapreduce.TaskAttemptContext;
26+
27+
import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
28+
29+
public class BulkDataGeneratorRecordReader extends RecordReader<Text, NullWritable> {
30+
31+
private int numRecordsToCreate = 0;
32+
private int createdRecords = 0;
33+
private Text key = new Text();
34+
private NullWritable value = NullWritable.get();
35+
36+
public static final String RECORDS_PER_MAPPER_TASK_KEY =
37+
BulkDataGeneratorInputFormat.class.getName() + "records.per.mapper.task";
38+
39+
@Override
40+
public void initialize(InputSplit split, TaskAttemptContext context)
41+
throws IOException, InterruptedException {
42+
// Get the number of records to create from the configuration
43+
this.numRecordsToCreate = context.getConfiguration().getInt(RECORDS_PER_MAPPER_TASK_KEY, -1);
44+
Preconditions.checkArgument(numRecordsToCreate > 0,
45+
"Number of records to be created by per mapper should be greater than 0.");
46+
}
47+
48+
@Override
49+
public boolean nextKeyValue() {
50+
createdRecords++;
51+
return createdRecords <= numRecordsToCreate;
52+
}
53+
54+
@Override
55+
public Text getCurrentKey() {
56+
// Set the index of record to be created
57+
key.set(String.valueOf(createdRecords));
58+
return key;
59+
}
60+
61+
@Override
62+
public NullWritable getCurrentValue() {
63+
return value;
64+
}
65+
66+
@Override
67+
public float getProgress() throws IOException, InterruptedException {
68+
return (float) createdRecords / (float) numRecordsToCreate;
69+
}
70+
71+
@Override
72+
public void close() throws IOException {
73+
74+
}
75+
}

0 commit comments

Comments
 (0)