Skip to content

Commit

Permalink
modified MulticastEvaluate UDF
Browse files Browse the repository at this point in the history
  • Loading branch information
tatsuya-nakamura committed Mar 26, 2013
1 parent 5a4d9ef commit b43b7fa
Show file tree
Hide file tree
Showing 11 changed files with 565 additions and 160 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ public class AliasConstants {

public static final String TUPLE_MIN_OUT_ALIAS_TOP;

public static final String MULTICAST_EVALUATE_ALIAS_TOP;

// -----------------------------------------------------------------------------------------------------------------

static {
Expand All @@ -71,6 +73,8 @@ public class AliasConstants {
TUPLE_MAX_OUT_ALIAS_TOP = tProperties.getProperty("TupleMax.OutAlias.Top", "max_tuples");

TUPLE_MIN_OUT_ALIAS_TOP = tProperties.getProperty("TupleMin.OutAlias.Top", "min_tuples");

MULTICAST_EVALUATE_ALIAS_TOP = tProperties.getProperty("MulticastEvaluate.OutAlias.Top", "multicasted");
}

// -----------------------------------------------------------------------------------------------------------------
Expand Down
2 changes: 2 additions & 0 deletions charsiu-udf/src/main/java/jp/ac/u/tokyo/m/pig/udf/alias.ini
Original file line number Diff line number Diff line change
Expand Up @@ -33,3 +33,5 @@ First.OutAlias.Top = first_tuple
TupleMax.OutAlias.Top = max_tuples

TupleMin.OutAlias.Top = min_tuples

MulticastEvaluate.OutAlias.Top = multicasted
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,8 @@
package jp.ac.u.tokyo.m.pig.udf.eval.util;

import java.io.IOException;
import java.util.List;

import org.apache.pig.EvalFunc;
import org.apache.pig.FuncSpec;
import org.apache.pig.impl.logicalLayer.FrontendException;
import org.apache.pig.impl.logicalLayer.schema.Schema;

Expand All @@ -33,47 +31,9 @@ public class DefaultColumnEvaluator implements ColumnEvaluator {
public DefaultColumnEvaluator(ColumnAccessor aColumnAccessor, ReflectionUDFSetting aReflectUDFSetting) throws InstantiationException, IllegalAccessException, FrontendException, CloneNotSupportedException {
mColumnAccessor = aColumnAccessor;
mReflectUDFSetting = aReflectUDFSetting;

Class<EvalFunc<?>> tMasterUDFClass = ReflectionUtil.getClassForName(aReflectUDFSetting.getmClassName());
EvalFunc<?> tMasterUDF = tMasterUDFClass.newInstance();
EvalFunc<?> tUDF = null;
// func-spec = null なら無視していい
// 拒否もしたい
// この方法だとPigクエリ的な構文補助機能が使えない(動かして初めてミスに気づく。これは地味にネック。スキーマ検証段階で処理されるためかろうじで大丈夫か。)
Schema tInputSchema = aColumnAccessor.getInputSchema().clone();
ReflectionUtil.removeAlias(tInputSchema);
List<FuncSpec> tArgToFuncMapping = tMasterUDF.getArgToFuncMapping();
if (tArgToFuncMapping == null) {
tUDF = tMasterUDF;
} else {
for (FuncSpec tFuncSpec : tArgToFuncMapping) {
// XXX Pig はどうやって FuncMapping を利用しているんだろう?
// if (Schema.equals(tFuncSpec.getInputArgsSchema(), tInputSchema, false, false)) {
if (tFuncSpec.getInputArgsSchema().equals(tInputSchema)) {
Class<EvalFunc<?>> tUDFClass = ReflectionUtil.getClassForName(tFuncSpec.getClassName());
tUDF = tUDFClass.newInstance();
break;
}
}
// 1周でダメなら {()} が問題の可能性が有るので、スキーマを調整してもう一度
if (tUDF == null) {
ReflectionUtil.removeTupleInBag(tInputSchema);
for (FuncSpec tFuncSpec : tArgToFuncMapping) {
if (tFuncSpec.getInputArgsSchema().equals(tInputSchema)) {
Class<EvalFunc<?>> tUDFClass = ReflectionUtil.getClassForName(tFuncSpec.getClassName());
tUDF = tUDFClass.newInstance();
break;
}
}
}

// TODO 例外メッセージ(UDF の FuncMapping に InputSchema が登録されていない)
if (tUDF == null)
throw new IllegalArgumentException();
}
mUDF = tUDF;
mUDF = ReflectionUtil.getUDFInstance(aReflectUDFSetting.getClassName(), aColumnAccessor.getInputSchema());
}

public ReflectionUDFSetting getReflectUDFSetting() {
return mReflectUDFSetting;
}
Expand All @@ -83,7 +43,6 @@ public Object evaluate(Object aInput) throws IOException {
return mUDF.exec(mColumnAccessor.generate(aInput));
}

// TODO 出力スキーマ操作(UDF名とか付けて返せるはず)
public Schema getOutputSchema() {
return mUDF.outputSchema(mColumnAccessor.getInputSchema());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,14 @@
import java.util.Iterator;
import java.util.List;

import jp.ac.u.tokyo.m.pig.udf.AliasConstants;

import org.apache.pig.EvalFunc;
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.data.DataType;
import org.apache.pig.data.Tuple;
import org.apache.pig.data.TupleFactory;
import org.apache.pig.impl.logicalLayer.FrontendException;
import org.apache.pig.impl.logicalLayer.schema.Schema;
import org.apache.pig.impl.logicalLayer.schema.Schema.FieldSchema;

Expand All @@ -36,38 +41,49 @@ public class MulticastEvaluate extends EvalFunc<Tuple> {

// -----------------------------------------------------------------------------------------------------------------

/**
* 「どんな処理」を「どのカラム」にするかという情報
*/
private final List<ReflectionUDFSetting> mReflectUDFSettings;

/**
* 「カラム毎」に「どんな処理」をするかという情報
*/
private static List<ColumnEvaluationSetting> mColumnEvaluationSettings;

// -----------------------------------------------------------------------------------------------------------------

// TODO args : ('<UDF>', '<bag column regex>', '<column control>', '<alias suffix>')
/**
* @param aArgs
* aArgs are ('\<UDF\>', '\<bag column regex\>', '\<column control\>', '\<alias suffix\>'[, ... ])
*/
public MulticastEvaluate(String... aArgs) {
super();
int tArgLength = aArgs.length;
// TODO 例外メッセージ
if (tArgLength % 3 != 0)
throw new IllegalArgumentException();
if (tArgLength % 4 != 0)
throw new IllegalArgumentException("引数の数が不正です : " + tArgLength);
List<ReflectionUDFSetting> tReflectUDFSettings = mReflectUDFSettings = new ArrayList<ReflectionUDFSetting>();
for (int i = 0; i < tArgLength; i++) {
String tShortClassName = aArgs[i];
String tClassName = MulticastEvaluationConstants.UDF_REFLECT_MAPPING.get(tShortClassName);
// TODO 例外メッセージ(2行目に対応しているクラス一覧を表示)
// TODO 例外メッセージ(2行目に対応しているクラス一覧を表示。自動化。
if (tClassName == null)
throw new IllegalArgumentException("unknown UDF : " + tShortClassName);
tReflectUDFSettings.add(new ReflectionUDFSetting(tClassName, aArgs[++i], aArgs[++i]));
throw new IllegalArgumentException("unknown UDF : " + tShortClassName + "\n" +
"MulticastEvaluate supported : MIN, MAX, SUM, AVG, SIZE, COUNT");
tReflectUDFSettings.add(new ReflectionUDFSetting(tClassName, aArgs[++i], aArgs[++i], aArgs[++i]));
}
}

// -----------------------------------------------------------------------------------------------------------------

// TODO 構造 (*) と ((*)) の両対応。外部でタプルを渡したい場合も有る。InnerGroup後などが該当する。
@Override
public Tuple exec(Tuple aInput) throws IOException {
Tuple tTargetTuple = getTurgetTuple(aInput);

Iterator<ColumnEvaluationSetting> ColumnEvaluationSettingsIterator = mColumnEvaluationSettings.iterator();
ArrayList<Object> tProtoTuple = new ArrayList<Object>();
for (Object tColumnValue : aInput.getAll()) {
if(ColumnEvaluationSettingsIterator.hasNext()){
for (Object tColumnValue : tTargetTuple.getAll()) {
if (ColumnEvaluationSettingsIterator.hasNext()) {
ColumnEvaluationSetting tSetting = ColumnEvaluationSettingsIterator.next();
for (ColumnEvaluator tColumnEvaluator : tSetting.getColumnEvaluators()) {
tProtoTuple.add(tColumnEvaluator.evaluate(tColumnValue));
Expand All @@ -77,6 +93,22 @@ public Tuple exec(Tuple aInput) throws IOException {
return TupleFactory.getInstance().newTupleNoCopy(tProtoTuple);
}

/**
* aInput の構造が Tuple( ... ) または Tuple(Tuple( ... )) のどちらでも Tuple( ... ) を選択する。
*
* @param aInput
* @return
*/
private Tuple getTurgetTuple(Tuple aInput) {
Tuple tTargetTuple = aInput;
if (aInput.size() == 1) {
try {
tTargetTuple = DataType.toTuple(aInput.get(0));
} catch (ExecException ee) {}
}
return tTargetTuple;
}

// -----------------------------------------------------------------------------------------------------------------

// TODO 可能なら実装する
Expand All @@ -91,21 +123,15 @@ public Tuple exec(Tuple aInput) throws IOException {

// -----------------------------------------------------------------------------------------------------------------

// TODO ここで「対象のカラム」「対象のカラムの型」を判断し、「対応するUDF」を特定する
// TODO alias 付与
// Tuple( ... ) => Tuple( ... )
@Override
public Schema outputSchema(Schema aInput) {
// このカラムをどう変換するか、という情報にまとめるのが良い
// MAX でも int, double とかあるかもだし
// このカラムをどう変換するか、という情報にまとめる
// インスタンスの使いまわしとかはリファクタ時に対応

// カラムへのアクセス情報 FLAT, SUB_BAG
// ColumnAccesser : Object -> Tuple

Schema tOutputSchema = new Schema();
Schema tTargetSchema = getTargetSchema(aInput);
Schema tInnerTupleSchema = new Schema();

List<FieldSchema> tInputFields = aInput.getFields();
List<FieldSchema> tInputFields = tTargetSchema.getFields();
List<ReflectionUDFSetting> tReflectUDFSettings = mReflectUDFSettings;
// レコードの最上位カラム数と一致します。
List<ColumnEvaluationSetting> tColumnEvaluationSettings = new ArrayList<ColumnEvaluationSetting>();
Expand All @@ -115,14 +141,17 @@ public Schema outputSchema(Schema aInput) {
ColumnEvaluationSetting tSetting = new ColumnEvaluationSetting();
// 評価方法毎のイテレーション
for (ReflectionUDFSetting tReflectUDFSetting : tReflectUDFSettings) {
if (tReflectUDFSetting.matchesColumnRegex(tFieldSchema.alias)) {
String tCurrentFieldAlias = tFieldSchema.alias;
if (tReflectUDFSetting.matchesColumnRegex(tCurrentFieldAlias)) {
tEvaluation = true;
try {
DefaultColumnEvaluator tDefaultColumnEvaluator = new DefaultColumnEvaluator(
new DefaultColumnAccessor(tReflectUDFSetting.getmUDFArgumentFormat(), tFieldSchema),
new DefaultColumnAccessor(tReflectUDFSetting.getUDFArgumentFormat(), tFieldSchema),
tReflectUDFSetting);
tSetting.addColumnEvaluator(tDefaultColumnEvaluator);
tOutputSchema.add(tDefaultColumnEvaluator.getOutputSchema().getFields().get(0));
FieldSchema tCurrentOutputFieldSchema = tDefaultColumnEvaluator.getOutputSchema().getFields().get(0);
tCurrentOutputFieldSchema.alias = tCurrentFieldAlias + "_" + tReflectUDFSetting.getAliasSuffix();
tInnerTupleSchema.add(tCurrentOutputFieldSchema);
} catch (Throwable e) {
// TODO 例外処理
throw new RuntimeException(e);
Expand All @@ -135,15 +164,43 @@ public Schema outputSchema(Schema aInput) {
// スルー評価器を設定
// かつ。OutputSchema情報を確定できる
tSetting.addColumnEvaluator(ThroughColumnEvaluator.INSTANCE);
tOutputSchema.add(tFieldSchema);
tInnerTupleSchema.add(tFieldSchema);
}
tColumnEvaluationSettings.add(tSetting);
}
mColumnEvaluationSettings = tColumnEvaluationSettings;

Schema tOutputSchema = new Schema();
try {
tOutputSchema.add(
new FieldSchema(AliasConstants.MULTICAST_EVALUATE_ALIAS_TOP,
tInnerTupleSchema,
DataType.TUPLE));
} catch (FrontendException e) {
// TODO 例外処理
throw new RuntimeException(e);
}
return tOutputSchema;
}

/**
* aInput の構造が Tuple( ... ) または Tuple(Tuple( ... )) のどちらでも Tuple( ... ) を選択する。
*
* @param aInput
* @return
*/
private Schema getTargetSchema(Schema aInput) {
Schema tTargetSchema = aInput;
try {
if (aInput.size() == 1 && aInput.getField(0).type == DataType.TUPLE) {
tTargetSchema = aInput.getField(0).schema;
}
} catch (FrontendException e) {
throw new RuntimeException(e);
}
return tTargetSchema;
}

// -----------------------------------------------------------------------------------------------------------------

}
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,9 @@ public class MulticastEvaluationConstants {
UDF_REFLECT_MAPPING.put("COUNT", "org.apache.pig.builtin.COUNT");

// charsiu functions
UDF_REFLECT_MAPPING.put("TupleFirst", "jp.ac.u.tokyo.m.pig.udf.eval.math.TupleFirst");
UDF_REFLECT_MAPPING.put("TupleMax", "jp.ac.u.tokyo.m.pig.udf.eval.math.TupleMax");
UDF_REFLECT_MAPPING.put("TupleMin", "jp.ac.u.tokyo.m.pig.udf.eval.math.TupleMin");
// UDF_REFLECT_MAPPING.put("TupleFirst", "jp.ac.u.tokyo.m.pig.udf.eval.math.TupleFirst");
// UDF_REFLECT_MAPPING.put("TupleMax", "jp.ac.u.tokyo.m.pig.udf.eval.math.TupleMax");
// UDF_REFLECT_MAPPING.put("TupleMin", "jp.ac.u.tokyo.m.pig.udf.eval.math.TupleMin");
}

public final static String REFLECTION_UDF_PARAMETERS_ROOT_COLUMN = "_";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ public ReflectionUDFParameters(String aReflectionUDFParametersString, FieldSchem
mInputSchema = generateInputSchema(mColumnNames);
}

// TODO 実装
// TODO $x access to column
static List<ColumnName> parseReflectionUDFParameters(String aReflectionUDFParametersString, FieldSchema aColumnValueFieldSchema) throws FrontendException {
List<ColumnName> tColumnNames = new ArrayList<ReflectionUDFParameters.ColumnName>();

Expand All @@ -53,9 +53,33 @@ static List<ColumnName> parseReflectionUDFParameters(String aReflectionUDFParame
for (int tIndex = 1; tIndex < tAddressesLength; tIndex++) {
String tAddressAlias = tAddresses[tIndex];
FieldSchema tField = tCurrentField.schema.getField(tAddressAlias);
if (tField == null)
throw new IllegalArgumentException(tAddressAlias + " is not found in the schema : " + tCurrentField);
else {
if (tField == null) {
// Bag{Tuple( ... )} ではなく Bag{ ... } で指定された可能性が有るので1階層無視して tAddressAlias を探す
if (tCurrentField.type == DataType.BAG) {
FieldSchema tBagTupleFieldSchema = tCurrentField.schema.getField(0);
try {
tField = tBagTupleFieldSchema.schema.getField(tAddressAlias);
} catch (Throwable e) {
throw new IllegalArgumentException(tAddressAlias + " is not found in the schema : " + tCurrentField, e);
}
if (tField != null) {
// BagTuple Scheam
String tChildColumnAlias = tBagTupleFieldSchema.alias == null ? "" : tBagTupleFieldSchema.alias;
ColumnName tChildColumnName = new ColumnName(tChildColumnAlias, 0, tBagTupleFieldSchema, AccessType.SUB_BAG);
tCurrentColumnName.setChild(tChildColumnName);
tCurrentColumnName = tChildColumnName;
// Column Schema
ColumnName tChildChildColumnName = new ColumnName(tAddressAlias, tBagTupleFieldSchema.schema.getPosition(tAddressAlias), tField, AccessType.SUB_BAG);
tChildColumnName.setChild(tChildChildColumnName);
tChildColumnName = tChildChildColumnName;

tCurrentField = tField;
continue;
} else
throw new IllegalArgumentException(tAddressAlias + " is not found in the schema : " + tCurrentField);
} else
throw new IllegalArgumentException(tAddressAlias + " is not found in the schema : " + tCurrentField);
} else {
AccessType tAccessType = null;
if (tCurrentColumnName.getFieldSchema().type == DataType.BAG
|| (tCurrentColumnName.getFieldSchema().type == DataType.TUPLE && tCurrentColumnName.getAccessType() == AccessType.SUB_BAG))
Expand All @@ -71,8 +95,6 @@ static List<ColumnName> parseReflectionUDFParameters(String aReflectionUDFParame
}
}

// mock
// tColumnNames.add(new ColumnName("", 0, aColumnValueFieldSchema, AccessType.FLAT));
return tColumnNames;
}

Expand Down
Loading

0 comments on commit b43b7fa

Please sign in to comment.