23
23
import java .io .DataOutput ;
24
24
import java .io .IOException ;
25
25
import java .io .InputStreamReader ;
26
+ import java .net .URI ;
27
+ import java .net .URISyntaxException ;
26
28
import java .nio .charset .StandardCharsets ;
27
29
import java .util .HashSet ;
28
30
import java .util .LinkedList ;
36
38
import org .apache .hadoop .fs .FileSystem ;
37
39
import org .apache .hadoop .fs .Path ;
38
40
import org .apache .hadoop .hbase .Cell ;
39
- import org .apache .hadoop .hbase .HBaseCommonTestingUtility ;
40
41
import org .apache .hadoop .hbase .HBaseConfiguration ;
41
42
import org .apache .hadoop .hbase .HBaseTestingUtility ;
42
43
import org .apache .hadoop .hbase .IntegrationTestBase ;
52
53
import org .apache .hadoop .hbase .client .Table ;
53
54
import org .apache .hadoop .hbase .client .TableDescriptor ;
54
55
import org .apache .hadoop .hbase .client .TableDescriptorBuilder ;
55
- import org .apache .hadoop .hbase .io .compress .Compression ;
56
56
import org .apache .hadoop .hbase .io .encoding .DataBlockEncoding ;
57
57
import org .apache .hadoop .hbase .mapreduce .TableMapReduceUtil ;
58
58
import org .apache .hadoop .hbase .regionserver .BloomType ;
64
64
import org .apache .hadoop .hbase .test .util .warc .WARCRecord ;
65
65
import org .apache .hadoop .hbase .test .util .warc .WARCWritable ;
66
66
import org .apache .hadoop .hbase .util .Bytes ;
67
- import org .apache .hadoop .hbase .util .CompressionTest ;
68
67
import org .apache .hadoop .hbase .util .RegionSplitter ;
69
68
import org .apache .hadoop .io .BytesWritable ;
70
- import org .apache .hadoop .io .IOUtils ;
71
69
import org .apache .hadoop .io .LongWritable ;
72
70
import org .apache .hadoop .io .NullWritable ;
73
71
import org .apache .hadoop .io .SequenceFile .CompressionType ;
74
72
import org .apache .hadoop .io .Writable ;
75
73
import org .apache .hadoop .mapreduce .Counters ;
76
- import org .apache .hadoop .mapreduce .InputFormat ;
77
- import org .apache .hadoop .mapreduce .InputSplit ;
78
74
import org .apache .hadoop .mapreduce .Job ;
79
75
import org .apache .hadoop .mapreduce .JobContext ;
80
76
import org .apache .hadoop .mapreduce .Mapper ;
81
- import org .apache .hadoop .mapreduce .RecordReader ;
82
- import org .apache .hadoop .mapreduce .Reducer ;
83
- import org .apache .hadoop .mapreduce .TaskAttemptContext ;
84
77
import org .apache .hadoop .mapreduce .lib .input .FileInputFormat ;
85
78
import org .apache .hadoop .mapreduce .lib .input .SequenceFileInputFormat ;
86
- import org .apache .hadoop .mapreduce .lib .output .FileOutputFormat ;
87
79
import org .apache .hadoop .mapreduce .lib .output .NullOutputFormat ;
88
80
import org .apache .hadoop .mapreduce .lib .output .SequenceFileOutputFormat ;
89
81
import org .apache .hadoop .util .Tool ;
@@ -560,12 +552,27 @@ protected void map(LongWritable key, WARCWritable value, Context output)
560
552
if (warcHeader .getRecordType ().equals ("response" ) && warcHeader .getTargetURI () != null ) {
561
553
String contentType = warcHeader .getField ("WARC-Identified-Payload-Type" );
562
554
if (contentType != null ) {
563
- byte [] rowKey = Bytes .toBytes (warcHeader .getTargetURI ());
555
+
556
+ // Make row key
557
+
558
+ byte [] rowKey ;
559
+ try {
560
+ rowKey = rowKeyFromTargetURI (warcHeader .getTargetURI ());
561
+ } catch (URISyntaxException e ) {
562
+ LOG .warn ("Could not parse URI \" " + warcHeader .getTargetURI () + "\" for record " +
563
+ warcHeader .getRecordID ());
564
+ return ;
565
+ }
566
+
567
+ // Get the content and calculate the CRC64
568
+
564
569
byte [] content = value .getRecord ().getContent ();
565
570
CRC64 crc = new CRC64 ();
566
571
crc .update (content );
567
572
long crc64 = crc .getValue ();
568
573
574
+ // Store to HBase
575
+
569
576
Put put = new Put (rowKey );
570
577
571
578
put .addColumn (CONTENT_FAMILY_NAME , CONTENT_QUALIFIER , content );
@@ -585,6 +592,8 @@ protected void map(LongWritable key, WARCWritable value, Context output)
585
592
586
593
table .put (put );
587
594
595
+ // If we succeeded in storing to HBase, write records for later verification
596
+
588
597
output .write (new HBaseKeyWritable (rowKey , INFO_FAMILY_NAME , CRC_QUALIFIER ),
589
598
new BytesWritable (Bytes .toBytes (crc64 )));
590
599
output .write (new HBaseKeyWritable (rowKey , INFO_FAMILY_NAME , CONTENT_LENGTH_QUALIFIER ),
@@ -602,6 +611,43 @@ protected void map(LongWritable key, WARCWritable value, Context output)
602
611
}
603
612
}
604
613
}
614
+
615
+ private byte [] rowKeyFromTargetURI (String targetURI ) throws URISyntaxException {
616
+ URI uri = new URI (targetURI );
617
+ StringBuffer sb = new StringBuffer ();
618
+ // Ignore the scheme
619
+ // Reverse the components of the hostname
620
+ String [] hostComponents = uri .getHost ().split ("\\ ." );
621
+ for (int i = hostComponents .length - 1 ; i >= 0 ; i --) {
622
+ sb .append (hostComponents [i ]);
623
+ if (i != 0 ) {
624
+ sb .append ('.' );
625
+ }
626
+ }
627
+ // Port
628
+ if (uri .getPort () != -1 ) {
629
+ sb .append (":" );
630
+ sb .append (uri .getPort ());
631
+ }
632
+ // Ignore the rest of the authority
633
+ sb .append ("/" );
634
+ // Path, if present
635
+ if (uri .getRawPath () != null ) {
636
+ sb .append (uri .getRawPath ());
637
+ }
638
+ // Query, if present
639
+ if (uri .getRawQuery () != null ) {
640
+ sb .append ("?" );
641
+ sb .append (uri .getRawQuery ());
642
+ }
643
+ // Fragment, if present
644
+ if (uri .getRawFragment () != null ) {
645
+ sb .append ("#" );
646
+ sb .append (uri .getRawFragment ());
647
+ }
648
+ return Bytes .toBytes (sb .toString ());
649
+ }
650
+
605
651
}
606
652
}
607
653
0 commit comments