Skip to content

Commit

Permalink
[vectorized](udaf) fix java-udaf case of P0 is unstable (apache#18054)
Browse files Browse the repository at this point in the history
the udaf case is unstable reason:
when enable_pipeline_engine=true, the case of agg function only 1 instance,
so not merge the default value, but if instance>1, will merge the default value
  • Loading branch information
zhangstar333 authored Mar 24, 2023
1 parent 321bb3e commit 2a35adb
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -11,16 +11,16 @@
1 2022-01-01 2022-01-01T11:11:11 2022-01-01 2022-01-01T11:11:11

-- !select1 --
2022-01-11
2022-01-09

-- !select2 --
1 2022-01-10
1 2022-01-09

-- !select3 --
2022-01-11
2022-01-09

-- !select4 --
1 2022-01-10
1 2022-01-09

-- !select5 --
2022-01-01T03:00
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,51 +24,52 @@
public class MyDayDate {
private static final Logger LOG = Logger.getLogger(MyDayDate.class);
public static class State {
public LocalDate counter = LocalDate.of(2022,01,01);
public LocalDate counter;
public boolean inited = false;
}

public State create() {
LOG.info("call create func");
return new State();
}

public void destroy(State state) {
}

public void add(State state, LocalDate val1) {
if (val1 == null) {
LOG.info("add val is null");
return;
if (val1 == null) return;
if (!state.inited) {
state.inited = true;
state.counter = LocalDate.of(val1.getYear(),val1.getMonthValue(),val1.getDayOfMonth());
} else {
state.counter = state.counter.plusDays(val1.getDayOfMonth());
}
LOG.info("val1: " + val1.toString());
LOG.info("add before state is: " + state.counter.toString());
state.counter = state.counter.plusDays(val1.getDayOfMonth());
LOG.info("add after state is: " + state.counter.toString());
}

public void serialize(State state, DataOutputStream out) throws IOException {
LOG.info("serialize state is: " + state.counter.toString());
out.writeInt(state.counter.getYear());
out.writeInt(state.counter.getMonthValue());
out.writeInt(state.counter.getDayOfMonth());
out.writeBoolean(state.inited);
}

public void deserialize(State state, DataInputStream in) throws IOException {
LOG.info("deserialize before state is: " + state.counter.toString());
state.counter = LocalDate.of(in.readInt(),in.readInt(),in.readInt());
LOG.info("deserialize after state is: " + state.counter.toString());
state.inited = in.readBoolean();
}

public void merge(State state, State rhs) {
LOG.info("merge rhs state is: " + rhs.counter.toString());
LOG.info("merge before state is: " + state.counter.toString());
state.counter = state.counter.plusDays(rhs.counter.getDayOfMonth());
LOG.info("merge after state is: " + state.counter.toString());
if (!rhs.inited) {
return;
}
if (!state.inited) {
state.inited = true;
state.counter = LocalDate.of(rhs.counter.getYear(),rhs.counter.getMonthValue(),rhs.counter.getDayOfMonth());
} else {
state.counter = state.counter.plusDays(rhs.counter.getDayOfMonth());
}
}

public LocalDate getValue(State state) {
LOG.info("getValue state is: " + state.counter.toString());
LOG.info("------------------------------end----------------------");
return state.counter;
}
}

0 comments on commit 2a35adb

Please sign in to comment.