diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/CollectToListVarcharAggFunction.java b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/CollectToListVarcharFunction.java similarity index 55% rename from exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/CollectToListVarcharAggFunction.java rename to exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/CollectToListVarcharFunction.java index 7a42aeeae1a..4e2791924cc 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/CollectToListVarcharAggFunction.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/CollectToListVarcharFunction.java @@ -29,40 +29,47 @@ /** * Aggregate function which collects incoming VarChar column values into the list. */ -@FunctionTemplate(name = "collect_to_list_varchar", - scope = FunctionTemplate.FunctionScope.POINT_AGGREGATE, - isInternal = true) -public class CollectToListVarcharAggFunction implements DrillAggFunc { +public class CollectToListVarcharFunction { + public static final String NAME = "collect_to_list_varchar"; - @Param NullableVarCharHolder input; - @Output BaseWriter.ComplexWriter writer; - @Workspace ObjectHolder writerHolder; + @FunctionTemplate(name = NAME, + scope = FunctionTemplate.FunctionScope.POINT_AGGREGATE, + isInternal = true) + public static class CollectToListVarcharAggFunction implements DrillAggFunc { - @Override - public void setup() { - writerHolder = new ObjectHolder(); - } + @Param NullableVarCharHolder input; + @Output BaseWriter.ComplexWriter writer; + @Workspace ObjectHolder writerHolder; + + private CollectToListVarcharAggFunction() { + } - @Override - public void add() { - org.apache.drill.exec.vector.complex.writer.BaseWriter.ListWriter listWriter; - if (writerHolder.obj == null) { - writerHolder.obj = writer.rootAsList(); + @Override + public void setup() { + writerHolder = new ObjectHolder(); } - listWriter = (org.apache.drill.exec.vector.complex.writer.BaseWriter.ListWriter) writerHolder.obj; + @Override + public void add() { + org.apache.drill.exec.vector.complex.writer.BaseWriter.ListWriter listWriter; + if (writerHolder.obj == null) { + writerHolder.obj = writer.rootAsList(); + } + + listWriter = (org.apache.drill.exec.vector.complex.writer.BaseWriter.ListWriter) writerHolder.obj; - if (input.isSet > 0) { - listWriter.varChar().writeVarChar(input.start, input.end, input.buffer); + if (input.isSet > 0) { + listWriter.varChar().writeVarChar(input.start, input.end, input.buffer); + } } - } - @Override - public void output() { - } + @Override + public void output() { + } - @Override - public void reset() { - writerHolder.obj = null; + @Override + public void reset() { + writerHolder.obj = null; + } } } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashAggPrule.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashAggPrule.java index 1b3805e5bd1..da26e1a27c7 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashAggPrule.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashAggPrule.java @@ -19,11 +19,13 @@ import com.google.common.collect.Lists; import org.apache.calcite.util.ImmutableBitSet; +import org.apache.drill.exec.expr.fn.impl.CollectToListVarcharFunction; import org.apache.drill.exec.planner.logical.DrillAggregateRel; import org.apache.drill.exec.planner.logical.RelOptHelper; import org.apache.drill.exec.planner.physical.AggPrelBase.OperatorPhase; import org.apache.calcite.rel.InvalidRelException; import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.core.AggregateCall; import org.apache.calcite.plan.RelOptRule; import org.apache.calcite.plan.RelOptRuleCall; import org.apache.calcite.plan.RelTrait; @@ -58,6 +60,10 @@ public void onMatch(RelOptRuleCall call) { final DrillAggregateRel aggregate = call.rel(0); final RelNode input = call.rel(1); + if (hasIncompatibleAggCalls(aggregate)) { + return; + } + if (aggregate.containsDistinctCall() || aggregate.getGroupCount() == 0) { // currently, don't use HashAggregate if any of the logical aggrs contains DISTINCT or // if there are no grouping keys @@ -168,4 +174,35 @@ private void createTransformRequest(RelOptRuleCall call, DrillAggregateRel aggre call.transformTo(newAgg); } + /** + * Evaluates the logical aggregate expressions to determine if any are + * incompatible with the Hash Aggregate physical operator. + *

+ * While Hash Aggregate is generally performant for many aggregation types, + * certain functions (such as {@code collect_to_list_varchar}) may require specific + * data ordering or memory management patterns that the Hash Aggregate + * implementation does not provide. + *

+ *

+ * Current Incompatibilities: + *

+ *

+ * @param aggregate The logical aggregate relational nodes containing the + * list of {@link AggregateCall}s to inspect. + * @return {@code true} if at least one aggregation call is incompatible + * with HashAgg; {@code false} otherwise. + */ + private boolean hasIncompatibleAggCalls(DrillAggregateRel aggregate) { + for (AggregateCall aggCall : aggregate.getAggCallList()) { + if (CollectToListVarcharFunction.NAME.equalsIgnoreCase(aggCall.getAggregation().getName())) { + return true; + } + } + return false; + } } diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/fn/impl/TestAggregateFunctions.java b/exec/java-exec/src/test/java/org/apache/drill/exec/fn/impl/TestAggregateFunctions.java index f8fa2221ea0..f052ed2d8ff 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/fn/impl/TestAggregateFunctions.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/fn/impl/TestAggregateFunctions.java @@ -1101,29 +1101,37 @@ public void testCollectListHashAgg() throws Exception { public void testCollectToListVarcharStreamAgg() throws Exception { try { client.alterSession(PlannerSettings.HASHAGG.getOptionName(), false); + client.alterSession(PlannerSettings.STREAMAGG.getOptionName(), true); testBuilder() - .sqlQuery("select collect_to_list_varchar(`date`) as l from " + - "(select * from cp.`store/json/clicks.json` limit 2)") + .sqlQuery("select collect_to_list_varchar(`clicks`.`trans_id`) as ids from " + + "cp.`store/json/clicks.json` clicks group by `clicks`.`user_info`.`device`") .unOrdered() - .baselineColumns("l") - .baselineValues(listOf("2014-04-26", "2014-04-20")) + .baselineColumns("ids") + .baselineValues(listOf("31920", "32383", "32359")) + .baselineValues(listOf("31026")) + .baselineValues(listOf("33848")) .go(); } finally { client.resetSession(PlannerSettings.HASHAGG.getOptionName()); + client.resetSession(PlannerSettings.STREAMAGG.getOptionName()); } } + /** + * The current implementation of {@link org.apache.drill.exec.expr.fn.impl.CollectToListVarcharFunction} + * requires ordered input data. Because the Hash Aggregator does not maintain input order, + * it looks like there is no efficient way to process these values correctly within that operator + * by the function. {@code SortAggPrule} is intended to handle this to ensure deterministic + * results. + */ @Test public void testCollectToListVarcharHashAgg() throws Exception { try { + thrown.expect(UserRemoteException.class); + thrown.expectMessage(containsString("SYSTEM ERROR: CannotPlanException")); client.alterSession(PlannerSettings.STREAMAGG.getOptionName(), false); - testBuilder() - .sqlQuery("select collect_to_list_varchar(`date`) as l from " + - "(select * from cp.`store/json/clicks.json` limit 2) group by 'a'") - .unOrdered() - .baselineColumns("l") - .baselineValues(listOf("2014-04-26", "2014-04-20")) - .go(); + run("select collect_to_list_varchar(`clicks`.`trans_id`) as ids from" + + " cp.`store/json/clicks.json` clicks group by `clicks`.`user_info`.`device`"); } finally { client.resetSession(PlannerSettings.STREAMAGG.getOptionName()); }