Skip to content

Commit

Permalink
Switch TSMB3 to AryInt
Browse files Browse the repository at this point in the history
Lowers GC alot; SF10 runs nicely in 16G... but its really slow.  All time still spent building hash from Comments to tag-sets.  Wrong approach somehow.
  • Loading branch information
cliffclick committed Mar 25, 2021
1 parent ea657f1 commit aaf08f5
Show file tree
Hide file tree
Showing 5 changed files with 258 additions and 20 deletions.
2 changes: 1 addition & 1 deletion h2o-core/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -265,7 +265,7 @@ tpch: build/h2o.jar

.PHONY: tsmb
tsmb: build/h2o.jar
java -ea -cp "$?$(SEP)$(jars)" org.cliffc.sql.TSMB
java -ea -Xmx16G -Xms16G -XX:+UseParallelGC -verbose:gc -cp "$?$(SEP)$(jars)" org.cliffc.sql.TSMB

# Build emacs tags (part of a tasty emacs ide experience)
tags: $(main_h2o_javas) $(test_h2o_javas)
Expand Down
187 changes: 187 additions & 0 deletions h2o-core/src/main/java/org/cliffc/sql/AryInt.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,187 @@
package org.cliffc.sql;

import water.util.SB;
import java.util.Arrays;
import java.util.Collection;
import java.util.function.IntUnaryOperator;

// ArrayList with saner syntax
public class AryInt {
public int[] _es;
public int _len;
public AryInt(int[] es) { this(es,es.length); }
public AryInt(int[] es, int len) { _es=es; _len=len; }
public AryInt() { this(new int[1],0); }

/** @return list is empty */
public boolean isEmpty() { return _len==0; }
/** @return active list length */
public int len() { return _len; }
/** @param i element index
* @return element being returned; throws if OOB */
public int at( int i ) {
range_check(i);
return _es[i];
}
/** @param i element index
* @return element being returned, or 0 if OOB */
public int atX( int i ) {
return i < _len ? _es[i] : 0;
}
/** @return last element */
public int last( ) {
range_check(0);
return _es[_len-1];
}

/** @return remove and return last element */
public int pop( ) {
range_check(0);
return _es[--_len];
}

/** Add element in amortized constant time
* @param e element to add at end of list
* @return 'this' for flow-coding */
public AryInt push( int e ) {
if( _len >= _es.length ) _es = Arrays.copyOf(_es,Math.max(1,_es.length<<1));
_es[_len++] = e;
return this;
}

/** Slow, linear-time, element insert. Preserves order.
* @param i index to insert at, between 0 and _len inclusive.
* @param e intlement to insert
*/
public void insert( int i, int e ) {
if( i < 0 || i>_len )
throw new ArrayIndexOutOfBoundsException(""+i+" >= "+_len);
if( _len >= _es.length ) _es = Arrays.copyOf(_es,Math.max(1,_es.length<<1));
System.arraycopy(_es,i,_es,i+1,(_len++)-i);
_es[i] = e;
}

/** Fast, constant-time, element removal. Does not preserve order
* @param i element to be removed
* @return element removed */
public int del( int i ) {
range_check(i);
int tmp = _es[i];
_es[i]=_es[--_len];
return tmp;
}

/** Slow, linear-time, element removal. Preserves order.
* @param i element to be removed
* @return element removed */
public int remove( int i ) {
range_check(i);
int e = _es[i];
System.arraycopy(_es,i+1,_es,i,(--_len)-i);
return e;
}

/** Remove all elements */
public void clear( ) { Arrays.fill(_es,0,_len,0); _len=0; }

// Extend and set
public int setX( int i, int e ) {
while( i>= _es.length ) _es = Arrays.copyOf(_es,_es.length<<1);
if( i >= _len ) _len = i+1;
return (_es[i] = e);
}

public int set( int i, int e ) {
range_check(i);
return (_es[i] = e);
}

public AryInt set_as( int e ) { _es[0] = e; _len=1; return this; }
public AryInt set_len( int len ) {
if( len > _len )
while( len>= _es.length ) _es = Arrays.copyOf(_es,_es.length<<1);
_len = len;
while( _es.length > (len<<1) ) // Shrink if hugely too large
_es = Arrays.copyOf(_es,_es.length>>1);
return this;
}

/** @param c Collection to be added */
public AryInt addAll( Collection<? extends Integer> c ) { for( int e : c ) push(e); return this; }

/** @param es Array to be added */
public AryInt addAll( int[] es ) {
if( es.length==0 ) return this;
while( _len+es.length > _es.length ) _es = Arrays.copyOf(_es,_es.length<<1);
System.arraycopy(es,0,_es,_len,es.length);
_len += es.length;
return this;
}

public AryInt map_update( IntUnaryOperator f ) { for( int i = 0; i<_len; i++ ) _es[i] = f.applyAsInt(_es[i]); return this; }

/** @return compact array version, using the internal base array where possible. */
public int[] asAry() { return _len==_es.length ? _es : Arrays.copyOf(_es,_len); }

/** Sorts in-place */
public void sort_update() { Arrays.sort(_es, 0, _len); }
/** Find the first matching element using ==, or -1 if none. Note that
* most del calls shuffle the list, so the first element might be random.
* @param e intlement to find
* @return index of first matching element, or -1 if none */
public int find( int e ) {
for( int i=0; i<_len; i++ ) if( _es[i]==e ) return i;
return -1;
}

@Override public String toString() {
SB sb = new SB().p('[');
for( int i=0; i<_len; i++ )
sb.p(_es[i]).p(',');
return sb.unchar().p(']').toString();
}

private void range_check( int i ) {
if( i < 0 || i>=_len )
throw new ArrayIndexOutOfBoundsException(""+i+" >= "+_len);
}

// Binary search sorted _es. Returns insertion point.
// Undefined results if _es is not sorted.
public int binary_search( int e ) {
int lo=0, hi=_len-1;
while( lo <= hi ) {
int mid = (hi + lo) >>> 1; // midpoint, rounded down
int mval = _es[mid];
if( e==mval ) {
// If dups, get to the first.
while( mid>0 && e==_es[mid-1] ) mid--;
return mid;
}
if( e >mval ) lo = mid+1;
else hi = mid-1;
}
return lo;
}

// Note that the hashCode() and equals() are not invariant to changes in the
// underlying array. If the hashCode() is used (e.g., inserting into a
// HashMap) and the then the array changes, the hashCode() will change also.
@Override public boolean equals( Object o ) {
if( this==o ) return true;
if( !(o instanceof AryInt) ) return false;
AryInt ary = (AryInt)o;
if( _len != ary._len ) return false;
if( _es == ary._es ) return true;
for( int i=0; i<_len; i++ )
if( _es[i] != ary._es[i] )
return false;
return true;
}
@Override public int hashCode( ) {
int sum=_len;
for( int i=0; i<_len; i++ )
sum += _es[i];
return sum;
}
}
22 changes: 20 additions & 2 deletions h2o-core/src/main/java/org/cliffc/sql/TSMB.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@

public class TSMB {
// Scale-factor; also part of the data directory name.
public static final String SCALE_FACTOR = "sf1";
public static final String SCALE_FACTOR = "sf0.1";
public static final String DIRNAME = "c:/Users/cliffc/Desktop/TSMB_DATA/social-network-"+SCALE_FACTOR+"-merged-fk/";

// The TSMB Data
Expand Down Expand Up @@ -170,7 +170,7 @@ private static class BuildP1P2 extends MRTask<BuildP1P2> {
static void build_hash(NonBlockingHashMapLong<NonBlockingHashMapLong> nbhms, long c0, long c1) {
NonBlockingHashMapLong nbhm = nbhms.get(c0);
if( nbhm==null ) {
nbhms.putIfAbsent(c0,new NonBlockingHashMapLong(8));
nbhms.putIfAbsent(c0,new NonBlockingHashMapLong(4));
nbhm = nbhms.get(c0);
}
nbhm.put(c1,""); // Sparse-bit-set, just a hash with no value payload
Expand All @@ -194,4 +194,22 @@ static void print(String msg, NonBlockingHashMapLong<NonBlockingHashMapLong> p2x
System.out.println(msg+": "+size+", avg="+avg+", min="+min+", max="+max+", stddev="+std);
}


// Summary printer for hash-of-hashes.
static void printA(String msg, NonBlockingHashMapLong<AryInt> p2xs) {
long sum=0,sum2=0;
long min=Long.MAX_VALUE;
long max=0;
for( AryInt p2x : p2xs.values() ) {
int size = p2x._len;
sum += size;
sum2 += size*size;
if( size < min ) min = size;
if( size > max ) max = size;
}
long size = p2xs.size();
double avg = (double)sum/size;
double std = Math.sqrt((double)sum2/size);
System.out.println(msg+": "+size+", avg="+avg+", min="+min+", max="+max+", stddev="+std);
}
}
56 changes: 39 additions & 17 deletions h2o-core/src/main/java/org/cliffc/sql/TSMB3.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@ and not(comment_has_tag(comment, tag1))
Answer Umbra 1 thrd Umbra 48thrd H2O 20thrd
SF0.1: 537142 0.0914 sec 0.0221 sec
SF1 : 6907213 1.7737 sec 0.1127 sec
SF10 : 70770955 15.429 sec 0.7805 sec
SF1 : 6907213 1.7737 sec 0.1127 sec 0.8sec
SF10 : 70770955 15.429 sec 0.7805 sec 20.0sec
*/

public class TSMB3 implements TSMB.TSMBI {
Expand All @@ -44,14 +44,14 @@ public class TSMB3 implements TSMB.TSMBI {
// Compute tags-per-comment/post. Lookup by comment/post id.
Vec cids0 = TSMB.COMMENT_HASTAG_TAG.vec("id");
Vec ctgs = TSMB.COMMENT_HASTAG_TAG.vec("hastag_tag");
NonBlockingHashMapLong<NonBlockingHashMapLong> ctags = new BuildTags(TSMB.COMMENT.numRows()).doAll(cids0,ctgs)._tags;
TSMB.print("Comment tags "+TSMB.COMMENT.numRows(),ctags);
NonBlockingHashMapLong<AryInt> ctags = new BuildTags(TSMB.COMMENT.numRows()).doAll(cids0,ctgs)._tags;
//TSMB.printA("Comments "+TSMB.COMMENT.numRows(),ctags);
if( PRINT_TIMING ) { t=System.currentTimeMillis(); System.out.println("Build comment tags hashes "+(t-t0)+" msec"); t0=t; }

Vec pids = TSMB.POST_HASTAG_TAG.vec("id");
Vec ptgs = TSMB.POST_HASTAG_TAG.vec("hastag_tag");
NonBlockingHashMapLong<NonBlockingHashMapLong> ptags = new BuildTags(TSMB.POST.numRows()).doAll(pids,ptgs)._tags;
TSMB.print("Post tags "+TSMB.POST.numRows(),ptags);
NonBlockingHashMapLong<AryInt> ptags = new BuildTags(TSMB.POST.numRows()).doAll(pids,ptgs)._tags;
//TSMB.printA("Posts "+TSMB.POST.numRows(),ptags);
if( PRINT_TIMING ) { t=System.currentTimeMillis(); System.out.println("Build post tags hashes "+(t-t0)+" msec"); t0=t; }

// Count tags
Expand All @@ -66,13 +66,35 @@ public class TSMB3 implements TSMB.TSMBI {


private static class BuildTags extends MRTask<BuildTags> {
transient NonBlockingHashMapLong<NonBlockingHashMapLong> _tags;
transient NonBlockingHashMapLong<AryInt> _tags;
final long _size; // Uniques on outer hash
BuildTags(long size) { _size=size; }
@Override protected void setupLocal() { _tags = new NonBlockingHashMapLong<NonBlockingHashMapLong>((int)(_size)); }
@Override protected void setupLocal() { _tags = new NonBlockingHashMapLong<AryInt>((int)(_size>>1)); }
@Override public void map(Chunk cids, Chunk ctgs) {
for( int i=0; i<cids._len; i++ )
TSMB.build_hash(_tags,cids.at8(i),ctgs.at8(i));
Vec.Reader vids = cids.vec().new Reader();
Vec.Reader vtgs = ctgs.vec().new Reader();
long row0 = cids.start(), rowN = cids.vec().length();
int row=0;
// Skip leading rows with equal comment tags from prior chunk
long prior = row0==0 ? -1 : vids.at8(row0-1);
while( prior == cids.at8(row) ) row++;

// Run to the chunk end and into the next chunk until the cid changes.
// This means a single thread handles all of a comment id.
AryInt tags=null;
while( (row0+row) < rowN ) {
long cid=vids.at8(row0+row);
if( prior!=cid ) {
if( row >= cids._len ) break;
prior=cid;
tags = new AryInt();
AryInt old = _tags.putIfAbsent(cid,tags);
assert old==null : "make id "+cid+", absrow "+(row0+row)+", start "+row0; // Single insertion
}
int tag = (int)vtgs.at8(row0+row); // tag
tags.push(tag);
row++;
}
}
@Override public void reduce( BuildTags bld ) {
if( _tags != bld._tags )
Expand All @@ -82,18 +104,18 @@ private static class BuildTags extends MRTask<BuildTags> {

private static class Count extends MRTask<Count> {
long _cnt;
final NonBlockingHashMapLong<NonBlockingHashMapLong> _ctags, _ptags;
Count( NonBlockingHashMapLong<NonBlockingHashMapLong> ctags, NonBlockingHashMapLong<NonBlockingHashMapLong> ptags ) { _ctags=ctags; _ptags=ptags; }
final NonBlockingHashMapLong<AryInt> _ctags, _ptags;
Count( NonBlockingHashMapLong<AryInt> ctags, NonBlockingHashMapLong<AryInt> ptags ) { _ctags=ctags; _ptags=ptags; }
@Override public void map( Chunk cids, Chunk crcs, Chunk crps ) {
long cnt=0;
for( int i=0; i<cids._len; i++ ) {
NonBlockingHashMapLong tag2s = _ctags.get(cids.at8(i));
AryInt tag2s = _ctags.get(cids.at8(i));
if( tag2s != null ) {
NonBlockingHashMapLong tag1s = crps.isNA(i) ? _ctags.get(crcs.at8(i)) : _ptags.get(crps.at8(i));
AryInt tag1s = crps.isNA(i) ? _ctags.get(crcs.at8(i)) : _ptags.get(crps.at8(i));
if( tag1s != null )
for( long tag1 : tag1s.keySetLong() )
if( !tag2s.containsKey(tag1) )
cnt += tag2s.size();
for( int j=0; j<tag1s._len; j++ )
if( tag2s.find(tag1s._es[j]) == -1 ) // no tag1 in tag2s
cnt += tag2s._len;
}
}
_cnt=cnt;
Expand Down
11 changes: 11 additions & 0 deletions h2o-core/src/main/java/water/util/SB.java
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,17 @@ public SB p(JCodeSB sb) {
_sb.append(sb.getContent());
return this;
}

// Delete last char. Useful when doing string-joins and JSON printing and an
// extra seperater char needs to be removed:
//
// sb.p('[');
// for( Foo foo : foos )
// sb.p(foo).p(',');
// sb.unchar().p(']'); // remove extra trailing comma
//
public SB unchar() { _sb.setLength(_sb.length()-1); return this; }

@Override public String toString() { return _sb.toString(); }

/** Java-string illegal characters which need to be escaped */
Expand Down

0 comments on commit aaf08f5

Please sign in to comment.