Skip to content

Commit

Permalink
add prototype MulticastEvaluate UDF.
Browse files Browse the repository at this point in the history
  • Loading branch information
tatsuya-nakamura committed Mar 23, 2013
1 parent 697c578 commit be74d16
Show file tree
Hide file tree
Showing 15 changed files with 1,056 additions and 8 deletions.
8 changes: 4 additions & 4 deletions charsiu-dpc/pom.xml
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
<!--
Copyright 2012 Hiromasa Horiguchi ( The University of Tokyo )
Copyright 2012-2013 Hiromasa Horiguchi ( The University of Tokyo )
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
Expand All @@ -21,13 +21,13 @@
<parent>
<artifactId>charsiu</artifactId>
<groupId>jp.ac.u.tokyo.m</groupId>
<version>1.0</version>
<version>1.1</version>
<relativePath>..</relativePath>
</parent>

<artifactId>charsiu-dpc</artifactId>
<name>charsiu-dpc</name>
<version>2011.0</version>
<version>2011.1</version>

<build>
<resources>
Expand All @@ -44,7 +44,7 @@
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>charsiu-udf</artifactId>
<version>1.0</version>
<version>1.1</version>
</dependency>
</dependencies>

Expand Down
4 changes: 2 additions & 2 deletions charsiu-udf/pom.xml
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
<!--
Copyright 2012 Hiromasa Horiguchi ( The University of Tokyo )
Copyright 2012-2013 Hiromasa Horiguchi ( The University of Tokyo )
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
Expand All @@ -21,7 +21,7 @@
<parent>
<artifactId>charsiu</artifactId>
<groupId>jp.ac.u.tokyo.m</groupId>
<version>1.0</version>
<version>1.1</version>
<relativePath>..</relativePath>
</parent>

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
/*
* Copyright 2012-2013 Hiromasa Horiguchi ( The University of Tokyo )
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package jp.ac.u.tokyo.m.pig.udf.eval.util;

import org.apache.pig.data.Tuple;
import org.apache.pig.impl.logicalLayer.schema.Schema;

public interface ColumnAccessor {
Tuple generate(Object aColumnValue);
Schema getInputSchema();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/*
* Copyright 2012-2013 Hiromasa Horiguchi ( The University of Tokyo )
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package jp.ac.u.tokyo.m.pig.udf.eval.util;

import java.util.ArrayList;
import java.util.List;

/**
* 1 カラムに対する編集情報
*/
public class ColumnEvaluationSetting {

// -----------------------------------------------------------------------------------------------------------------

private final List<ColumnEvaluator> mColumnEvaluators = new ArrayList<ColumnEvaluator>();

// -----------------------------------------------------------------------------------------------------------------

public List<ColumnEvaluator> getColumnEvaluators() {
return mColumnEvaluators;
}

// -----------------------------------------------------------------------------------------------------------------

public void addColumnEvaluator(ColumnEvaluator aTarget) {
mColumnEvaluators.add(aTarget);
}

// -----------------------------------------------------------------------------------------------------------------

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
/*
* Copyright 2012-2013 Hiromasa Horiguchi ( The University of Tokyo )
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package jp.ac.u.tokyo.m.pig.udf.eval.util;

import java.io.IOException;

public interface ColumnEvaluator {
Object evaluate(Object aInput) throws IOException;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
/*
* Copyright 2012-2013 Hiromasa Horiguchi ( The University of Tokyo )
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package jp.ac.u.tokyo.m.pig.udf.eval.util;

import java.util.ArrayList;
import java.util.Iterator;

import jp.ac.u.tokyo.m.pig.udf.eval.util.ReflectionUDFParameters.AccessType;
import jp.ac.u.tokyo.m.pig.udf.eval.util.ReflectionUDFParameters.ColumnName;

import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.data.DataBag;
import org.apache.pig.data.DataType;
import org.apache.pig.data.DefaultBagFactory;
import org.apache.pig.data.Tuple;
import org.apache.pig.data.TupleFactory;
import org.apache.pig.impl.logicalLayer.FrontendException;
import org.apache.pig.impl.logicalLayer.schema.Schema;
import org.apache.pig.impl.logicalLayer.schema.Schema.FieldSchema;

public class DefaultColumnAccessor implements ColumnAccessor {

private ReflectionUDFParameters mReflectionUDFParameters;

public DefaultColumnAccessor(String aReflectionUDFParametersString, FieldSchema aColumnValueFieldSchema) throws FrontendException {
mReflectionUDFParameters = new ReflectionUDFParameters(aReflectionUDFParametersString, aColumnValueFieldSchema);
}

@Override
public Tuple generate(Object aColumnValue) {
ArrayList<Object> tTuple = new ArrayList<Object>();
for (ColumnName tColumnName : mReflectionUDFParameters.getColumnNames()) {
if (tColumnName.hasChild()) {
// TODO 例外処理
// TODO 実装( hasChild 時の多階層解析)
try {
generateChildValue(tTuple, tColumnName, aColumnValue);
} catch (ExecException e) {
throw new RuntimeException(e);
}
} else {
// use aColumnValue
tTuple.add(aColumnValue);
}
}
return TupleFactory.getInstance().newTupleNoCopy(tTuple);
}

private void generateChildValue(ArrayList<Object> aResultTuple, ColumnName aColumnName, Object aColumnValue) throws ExecException {
DataBag tDataSourceBag = null;

Object tDataSource = aColumnValue;
ColumnName tCurrentColumnName = aColumnName;
while (tCurrentColumnName.hasChild()) {
ColumnName tNextColumnName = tCurrentColumnName.getChild();
if (tNextColumnName.getAccessType() == AccessType.SUB_BAG) {
// subbag 対象の bag を特定
tDataSourceBag = DataType.toBag(tDataSource);
break;
} else {
// TODO 実装(プロトタイプ版では1層目が Bag の物しか扱わない、ということにする)
switch (tCurrentColumnName.getFieldSchema().type) {
case DataType.BAG:
// tDataSource = DataType.toBag(aColumnValue);
break;
case DataType.TUPLE:
break;
default:
// aResultTuple.add(e);
return;
}
}
}

// 次層が Tuple なら無視してその次の index
ColumnName tNextColumnName = tCurrentColumnName.getChild();
int tChildIndex = tNextColumnName.getFieldSchema().type == DataType.TUPLE ? tNextColumnName.getChild().getIndex() : tNextColumnName.getIndex();
Iterator<Tuple> tDataSourceBagIterator = tDataSourceBag.iterator();
ArrayList<Tuple> tProtoBag = new ArrayList<Tuple>();

while (tDataSourceBagIterator.hasNext()) {
Tuple tCurrentTuple = tDataSourceBagIterator.next();
tProtoBag.add(createTuple(tCurrentTuple.get(tChildIndex)));
}

aResultTuple.add(DefaultBagFactory.getInstance().newDefaultBag(tProtoBag));

}

private Tuple createTuple(Object aValue) {
ArrayList<Object> tTuple = new ArrayList<Object>();
tTuple.add(aValue);
return TupleFactory.getInstance().newTupleNoCopy(tTuple);
}

@Override
public Schema getInputSchema() {
return mReflectionUDFParameters.getInputSchema();
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
/*
* Copyright 2012-2013 Hiromasa Horiguchi ( The University of Tokyo )
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package jp.ac.u.tokyo.m.pig.udf.eval.util;

import java.io.IOException;
import java.util.List;

import org.apache.pig.EvalFunc;
import org.apache.pig.FuncSpec;
import org.apache.pig.impl.logicalLayer.FrontendException;
import org.apache.pig.impl.logicalLayer.schema.Schema;

public class DefaultColumnEvaluator implements ColumnEvaluator {

private final ReflectionUDFSetting mReflectUDFSetting;
private final ColumnAccessor mColumnAccessor;
private final EvalFunc<?> mUDF;

public DefaultColumnEvaluator(ColumnAccessor aColumnAccessor, ReflectionUDFSetting aReflectUDFSetting) throws InstantiationException, IllegalAccessException, FrontendException, CloneNotSupportedException {
mColumnAccessor = aColumnAccessor;
mReflectUDFSetting = aReflectUDFSetting;

Class<EvalFunc<?>> tMasterUDFClass = ReflectionUtil.getClassForName(aReflectUDFSetting.getmClassName());
EvalFunc<?> tMasterUDF = tMasterUDFClass.newInstance();
EvalFunc<?> tUDF = null;
// func-spec = null なら無視していい
// 拒否もしたい
// この方法だとPigクエリ的な構文補助機能が使えない(動かして初めてミスに気づく。これは地味にネック。スキーマ検証段階で処理されるためかろうじで大丈夫か。)
Schema tInputSchema = aColumnAccessor.getInputSchema().clone();
ReflectionUtil.removeAlias(tInputSchema);
List<FuncSpec> tArgToFuncMapping = tMasterUDF.getArgToFuncMapping();
if (tArgToFuncMapping == null) {
tUDF = tMasterUDF;
} else {
for (FuncSpec tFuncSpec : tArgToFuncMapping) {
// XXX Pig はどうやって FuncMapping を利用しているんだろう?
// if (Schema.equals(tFuncSpec.getInputArgsSchema(), tInputSchema, false, false)) {
if (tFuncSpec.getInputArgsSchema().equals(tInputSchema)) {
Class<EvalFunc<?>> tUDFClass = ReflectionUtil.getClassForName(tFuncSpec.getClassName());
tUDF = tUDFClass.newInstance();
break;
}
}
// 1周でダメなら {()} が問題の可能性が有るので、スキーマを調整してもう一度
if (tUDF == null) {
ReflectionUtil.removeTupleInBag(tInputSchema);
for (FuncSpec tFuncSpec : tArgToFuncMapping) {
if (tFuncSpec.getInputArgsSchema().equals(tInputSchema)) {
Class<EvalFunc<?>> tUDFClass = ReflectionUtil.getClassForName(tFuncSpec.getClassName());
tUDF = tUDFClass.newInstance();
break;
}
}
}

// TODO 例外メッセージ(UDF の FuncMapping に InputSchema が登録されていない)
if (tUDF == null)
throw new IllegalArgumentException();
}
mUDF = tUDF;
}

public ReflectionUDFSetting getReflectUDFSetting() {
return mReflectUDFSetting;
}

@Override
public Object evaluate(Object aInput) throws IOException {
return mUDF.exec(mColumnAccessor.generate(aInput));
}

// TODO 出力スキーマ操作(UDF名とか付けて返せるはず)
public Schema getOutputSchema() {
return mUDF.outputSchema(mColumnAccessor.getInputSchema());
}

}
Loading

0 comments on commit be74d16

Please sign in to comment.