Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -29,40 +29,47 @@
/**
* Aggregate function which collects incoming VarChar column values into the list.
*/
@FunctionTemplate(name = "collect_to_list_varchar",
scope = FunctionTemplate.FunctionScope.POINT_AGGREGATE,
isInternal = true)
public class CollectToListVarcharAggFunction implements DrillAggFunc {
public class CollectToListVarcharFunction {
public static final String NAME = "collect_to_list_varchar";

@Param NullableVarCharHolder input;
@Output BaseWriter.ComplexWriter writer;
@Workspace ObjectHolder writerHolder;
@FunctionTemplate(name = NAME,
scope = FunctionTemplate.FunctionScope.POINT_AGGREGATE,
isInternal = true)
public static class CollectToListVarcharAggFunction implements DrillAggFunc {

@Override
public void setup() {
writerHolder = new ObjectHolder();
}
@Param NullableVarCharHolder input;
@Output BaseWriter.ComplexWriter writer;
@Workspace ObjectHolder writerHolder;

private CollectToListVarcharAggFunction() {
}

@Override
public void add() {
org.apache.drill.exec.vector.complex.writer.BaseWriter.ListWriter listWriter;
if (writerHolder.obj == null) {
writerHolder.obj = writer.rootAsList();
@Override
public void setup() {
writerHolder = new ObjectHolder();
}

listWriter = (org.apache.drill.exec.vector.complex.writer.BaseWriter.ListWriter) writerHolder.obj;
@Override
public void add() {
org.apache.drill.exec.vector.complex.writer.BaseWriter.ListWriter listWriter;
if (writerHolder.obj == null) {
writerHolder.obj = writer.rootAsList();
}

listWriter = (org.apache.drill.exec.vector.complex.writer.BaseWriter.ListWriter) writerHolder.obj;

if (input.isSet > 0) {
listWriter.varChar().writeVarChar(input.start, input.end, input.buffer);
if (input.isSet > 0) {
listWriter.varChar().writeVarChar(input.start, input.end, input.buffer);
}
}
}

@Override
public void output() {
}
@Override
public void output() {
}

@Override
public void reset() {
writerHolder.obj = null;
@Override
public void reset() {
writerHolder.obj = null;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,13 @@

import com.google.common.collect.Lists;
import org.apache.calcite.util.ImmutableBitSet;
import org.apache.drill.exec.expr.fn.impl.CollectToListVarcharFunction;
import org.apache.drill.exec.planner.logical.DrillAggregateRel;
import org.apache.drill.exec.planner.logical.RelOptHelper;
import org.apache.drill.exec.planner.physical.AggPrelBase.OperatorPhase;
import org.apache.calcite.rel.InvalidRelException;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.core.AggregateCall;
import org.apache.calcite.plan.RelOptRule;
import org.apache.calcite.plan.RelOptRuleCall;
import org.apache.calcite.plan.RelTrait;
Expand Down Expand Up @@ -58,6 +60,10 @@ public void onMatch(RelOptRuleCall call) {
final DrillAggregateRel aggregate = call.rel(0);
final RelNode input = call.rel(1);

if (hasIncompatibleAggCalls(aggregate)) {
return;
}

if (aggregate.containsDistinctCall() || aggregate.getGroupCount() == 0) {
// currently, don't use HashAggregate if any of the logical aggrs contains DISTINCT or
// if there are no grouping keys
Expand Down Expand Up @@ -168,4 +174,35 @@ private void createTransformRequest(RelOptRuleCall call, DrillAggregateRel aggre
call.transformTo(newAgg);
}

/**
* Evaluates the logical aggregate expressions to determine if any are
* incompatible with the Hash Aggregate physical operator.
* <p>
* While Hash Aggregate is generally performant for many aggregation types,
* certain functions (such as {@code collect_to_list_varchar}) may require specific
* data ordering or memory management patterns that the Hash Aggregate
* implementation does not provide.
* </p>
* <p>
* <b>Current Incompatibilities:</b>
* <ul>
* <li>{@link org.apache.drill.exec.expr.fn.impl.CollectToListVarcharFunction}: Excluded from
* HashAgg because it requires data ordering and cannot be processed efficiently in an unordered
* fashion. {@code SortAggPrule} is intended to handle this, ensuring deterministic results.
* </li>
* </ul>
* </p>
* @param aggregate The logical aggregate relational nodes containing the
* list of {@link AggregateCall}s to inspect.
* @return {@code true} if at least one aggregation call is incompatible
* with HashAgg; {@code false} otherwise.
*/
private boolean hasIncompatibleAggCalls(DrillAggregateRel aggregate) {
for (AggregateCall aggCall : aggregate.getAggCallList()) {
if (CollectToListVarcharFunction.NAME.equalsIgnoreCase(aggCall.getAggregation().getName())) {
return true;
}
}
return false;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1101,29 +1101,37 @@ public void testCollectListHashAgg() throws Exception {
public void testCollectToListVarcharStreamAgg() throws Exception {
try {
client.alterSession(PlannerSettings.HASHAGG.getOptionName(), false);
client.alterSession(PlannerSettings.STREAMAGG.getOptionName(), true);
testBuilder()
.sqlQuery("select collect_to_list_varchar(`date`) as l from " +
"(select * from cp.`store/json/clicks.json` limit 2)")
.sqlQuery("select collect_to_list_varchar(`clicks`.`trans_id`) as ids from " +
"cp.`store/json/clicks.json` clicks group by `clicks`.`user_info`.`device`")
.unOrdered()
.baselineColumns("l")
.baselineValues(listOf("2014-04-26", "2014-04-20"))
.baselineColumns("ids")
.baselineValues(listOf("31920", "32383", "32359"))
.baselineValues(listOf("31026"))
.baselineValues(listOf("33848"))
.go();
} finally {
client.resetSession(PlannerSettings.HASHAGG.getOptionName());
client.resetSession(PlannerSettings.STREAMAGG.getOptionName());
}
}

/**
* The current implementation of {@link org.apache.drill.exec.expr.fn.impl.CollectToListVarcharFunction}
* requires ordered input data. Because the Hash Aggregator does not maintain input order,
* it looks like there is no efficient way to process these values correctly within that operator
* by the function. {@code SortAggPrule} is intended to handle this to ensure deterministic
* results.
*/
@Test
public void testCollectToListVarcharHashAgg() throws Exception {
try {
thrown.expect(UserRemoteException.class);
thrown.expectMessage(containsString("SYSTEM ERROR: CannotPlanException"));
client.alterSession(PlannerSettings.STREAMAGG.getOptionName(), false);
testBuilder()
.sqlQuery("select collect_to_list_varchar(`date`) as l from " +
"(select * from cp.`store/json/clicks.json` limit 2) group by 'a'")
.unOrdered()
.baselineColumns("l")
.baselineValues(listOf("2014-04-26", "2014-04-20"))
.go();
run("select collect_to_list_varchar(`clicks`.`trans_id`) as ids from" +
" cp.`store/json/clicks.json` clicks group by `clicks`.`user_info`.`device`");
} finally {
client.resetSession(PlannerSettings.STREAMAGG.getOptionName());
}
Expand Down
Loading