Amazon Athena Execution Plans - Additional Operations

Amazon Athena Execution Plans - Additional Operations

Neil Addison
Neil Addison

Estimated Reading Time: 7 minutes
Last Updated: 2024-12-05

Disclaimer: The details described in this post are the results of my own research and investigations. While some effort has been expended in ensuring their accuracy - with ubiquitous references to source material - I cannot guarantee that accuracy. The views and opinions expressed on this blog are my own and do not necessarily reflect the views of any organization I am associated with, past or present.

Earlier, we cast a light over a distributed execution plan, defining the terminology as we worked through the fragments. In this post, we look to extend our investigation to other operations and terms that we might find in a plan.

Values

First up we have:

  • Value [] - indicates that one or more rows will be produced containing literal values.

Here is an example of such a query:

SELECT 1
UNION ALL 
SELECT 2 
UNION ALL 
SELECT 3;

And the execution plan:

Fragment 0 [SINGLE] Output layout: [field] Output partitioning: SINGLE [] Output[columnNames = [_col0]] │ Layout: [field:integer] │ Estimates: {rows: 3 (15B), cpu: 0, memory: 0B, network: 0B} │ _col0 := field └─ LocalExchange[partitioning = ROUND_ROBIN] │ Layout: [field:integer] │ Estimates: {rows: 3 (15B), cpu: 15, memory: 0B, network: 0B} ├─ Values[] │ Layout: [expr:integer] │ Estimates: {rows: 1 (5B), cpu: 0, memory: 0B, network: 0B} │ (1) ├─ Values[] │ Layout: [expr_1:integer] │ Estimates: {rows: 1 (5B), cpu: 0, memory: 0B, network: 0B} │ (2) └─ Values[] Layout: [expr_2:integer] Estimates: {rows: 1 (5B), cpu: 0, memory: 0B, network: 0B} (3)

This simple query uses 3 Values operations to produce 3 separate rows containing 1, 2 and 3. The query is so simple that there is only one fragment and it is of type SINGLE, meaning that this computation takes place on a single node - the coordinator node. Note also, that in contrast to the query in the previous post, the statistics are provided because Athena already knows the relative CPU, memory and network cost involved.

Watch what happens though if we change the UNION ALL clauses to UNION (which requires the result set to be deduplicated):

SELECT 1
UNION 
SELECT 2 
UNION
SELECT 3;
Query Plan Fragment 0 [HASH] Output layout: [field] Output partitioning: SINGLE [] Output[columnNames = [_col0]] │ Layout: [field:integer] │ Estimates: {rows: 1 (5B), cpu: 0, memory: 0B, network: 0B} │ _col0 := field └─ Project[projectLocality = LOCAL, protectedBarrier = NONE] │ Layout: [field:integer] │ Estimates: {rows: 1 (5B), cpu: 5, memory: 0B, network: 0B} └─ Aggregate[type = FINAL, keys = [field], hash = [$hashvalue]] │ Layout: [field:integer, $hashvalue:bigint] │ Estimates: {rows: 1 (14B), cpu: 42, memory: 14B, network: 0B} └─ LocalExchange[partitioning = HASH, hashColumn = [$hashvalue], arguments = ["field"]] │ Layout: [field:integer, $hashvalue:bigint] │ Estimates: {rows: 3 (42B), cpu: 42, memory: 0B, network: 0B} ├─ Project[projectLocality = LOCAL, protectedBarrier = NONE] │ │ Layout: [field:integer, $hashvalue_3:bigint] │ │ Estimates: {rows: 1 (14B), cpu: 14, memory: 0B, network: 0B} │ │ field := "expr" │ └─ RemoteSource[sourceFragmentIds = [1]] │ Layout: [expr:integer, $hashvalue_3:bigint] ├─ Project[projectLocality = LOCAL, protectedBarrier = NONE] │ │ Layout: [field:integer, $hashvalue_5:bigint] │ │ Estimates: {rows: 1 (14B), cpu: 14, memory: 0B, network: 0B} │ │ field := "expr_1" │ └─ RemoteSource[sourceFragmentIds = [2]] │ Layout: [expr_1:integer, $hashvalue_5:bigint] └─ Project[projectLocality = LOCAL, protectedBarrier = NONE] │ Layout: [field:integer, $hashvalue_7:bigint] │ Estimates: {rows: 1 (14B), cpu: 14, memory: 0B, network: 0B} │ field := "expr_2" └─ RemoteSource[sourceFragmentIds = [3]] Layout: [expr_2:integer, $hashvalue_7:bigint] Fragment 1 [SINGLE] Output layout: [expr, $hashvalue_4] Output partitioning: HASH [expr][$hashvalue_4] Aggregate[type = PARTIAL, keys = [expr], hash = [$hashvalue_4]] │ Layout: [expr:integer, $hashvalue_4:bigint] │ Estimates: {rows: 1 (14B), cpu: 14, memory: 14B, network: 0B} └─ Project[projectLocality = LOCAL, protectedBarrier = NONE] │ Layout: [expr:integer, $hashvalue_4:bigint] │ Estimates: {rows: 1 (14B), cpu: 14, memory: 0B, network: 0B} │ $hashvalue_4 := combine_hash(bigint '0', COALESCE("$operator$hash_code"("expr"), 0)) └─ Values[] Layout: [expr:integer] Estimates: {rows: 1 (5B), cpu: 0, memory: 0B, network: 0B} (1) Fragment 2 [SINGLE] Output layout: [expr_1, $hashvalue_6] Output partitioning: HASH [expr_1][$hashvalue_6] Aggregate[type = PARTIAL, keys = [expr_1], hash = [$hashvalue_6]] │ Layout: [expr_1:integer, $hashvalue_6:bigint] │ Estimates: {rows: 1 (14B), cpu: 14, memory: 14B, network: 0B} └─ Project[projectLocality = LOCAL, protectedBarrier = NONE] │ Layout: [expr_1:integer, $hashvalue_6:bigint] │ Estimates: {rows: 1 (14B), cpu: 14, memory: 0B, network: 0B} │ $hashvalue_6 := combine_hash(bigint '0', COALESCE("$operator$hash_code"("expr_1"), 0)) └─ Values[] Layout: [expr_1:integer] Estimates: {rows: 1 (5B), cpu: 0, memory: 0B, network: 0B} (2) Fragment 3 [SINGLE] Output layout: [expr_2, $hashvalue_8] Output partitioning: HASH [expr_2][$hashvalue_8] Aggregate[type = PARTIAL, keys = [expr_2], hash = [$hashvalue_8]] │ Layout: [expr_2:integer, $hashvalue_8:bigint] │ Estimates: {rows: 1 (14B), cpu: 14, memory: 14B, network: 0B} └─ Project[projectLocality = LOCAL, protectedBarrier = NONE] │ Layout: [expr_2:integer, $hashvalue_8:bigint] │ Estimates: {rows: 1 (14B), cpu: 14, memory: 0B, network: 0B} │ $hashvalue_8 := combine_hash(bigint '0', COALESCE("$operator$hash_code"("expr_2"), 0)) └─ Values[] Layout: [expr_2:integer] Estimates: {rows: 1 (5B), cpu: 0, memory: 0B, network: 0B} (3)

Suddenly the simple plan on the coordinator node has exploded to multiple fragments. Given that all of these fragments are of type SINGLE, it remains unclear whether these operations take place on the coordinator or on workers as well. If nothing else, it gives us a warning of the extra steps required to use UNION over UNION ALL.

AssignUniqueId, MarkDistinct, FilterProject

Next we have a query that uses a correlated scalar sub-query, resulting in a few new operations:

SELECT sp.product_id, 
       (SELECT sc.category_name FROM aept_db.supermarket_categories sc WHERE sc.category_name = sp.category) AS product_category_name
FROM aept_db.supermarket_products sp;
Query Plan Fragment 0 [SOURCE] Output layout: [product_id, category_name] Output partitioning: SINGLE [] Output[columnNames = [product_id, product_category_name]] │ Layout: [product_id:varchar, category_name:varchar] │ Estimates: {rows: ? (?), cpu: 0, memory: 0B, network: 0B} │ product_category_name := category_name └─ FilterProject[filterPredicate = (CASE "is_distinct" WHEN true THEN true ELSE CAST(fail(28, VARCHAR 'Scalar sub-query has returned multiple rows') AS boolean) END), projectLocality = LOCAL, protectedBarrier = NONE] │ Layout: [product_id:varchar, category_name:varchar] │ Estimates: {rows: ? (?), cpu: ?, memory: 0B, network: 0B}/{rows: ? (?), cpu: ?, memory: 0B, network: 0B} └─ Project[projectLocality = LOCAL, protectedBarrier = NONE] │ Layout: [product_id:varchar, category_name:varchar, is_distinct:boolean] │ Estimates: {rows: ? (?), cpu: ?, memory: 0B, network: 0B} └─ MarkDistinct[distinct = [unique:bigint], marker = is_distinct] │ Layout: [product_id:varchar, category:varchar, unique:bigint, category_name:varchar, is_distinct:boolean] │ Estimates: {rows: ? (?), cpu: 0, memory: 0B, network: 0B} └─ LeftJoin[criteria = ("category" = "category_name"), hash = [$hashvalue, $hashvalue_3], distribution = REPLICATED] │ Layout: [product_id:varchar, category:varchar, unique:bigint, category_name:varchar] │ Estimates: {rows: ? (?), cpu: ?, memory: ?, network: 0B} │ Distribution: REPLICATED ├─ Project[projectLocality = LOCAL, protectedBarrier = NONE] │ │ Layout: [product_id:varchar, category:varchar, unique:bigint, $hashvalue:bigint] │ │ Estimates: {rows: ? (?), cpu: ?, memory: 0B, network: 0B} │ │ $hashvalue := combine_hash(bigint '0', COALESCE("$operator$hash_code"("category"), 0)) │ └─ AssignUniqueId[] │ │ Layout: [product_id:varchar, category:varchar, unique:bigint] │ │ Estimates: {rows: ? (?), cpu: ?, memory: 0B, network: 0B} │ └─ TableScan[table = awsdatacatalog:aept_db:supermarket_products] │ Layout: [product_id:varchar, category:varchar] │ Estimates: {rows: ? (?), cpu: ?, memory: 0B, network: 0B} │ product_id := product_id:string:REGULAR │ category := category:string:REGULAR └─ LocalExchange[partitioning = HASH, hashColumn = [$hashvalue_3], arguments = ["category_name"]] │ Layout: [category_name:varchar, $hashvalue_3:bigint] │ Estimates: {rows: ? (?), cpu: ?, memory: 0B, network: 0B} └─ RemoteSource[sourceFragmentIds = [1]] Layout: [category_name:varchar, $hashvalue_4:bigint] Fragment 1 [SOURCE] Output layout: [category_name, $hashvalue_5] Output partitioning: BROADCAST [] ScanProject[table = awsdatacatalog:aept_db:supermarket_categories, projectLocality = LOCAL, protectedBarrier = NONE] Layout: [category_name:varchar, $hashvalue_5:bigint] Estimates: {rows: ? (?), cpu: ?, memory: 0B, network: 0B}/{rows: ? (?), cpu: ?, memory: 0B, network: 0B} $hashvalue_5 := combine_hash(bigint '0', COALESCE("$operator$hash_code"("category_name"), 0)) category_name := category_name:string:REGULAR

In Fragment 0, we have operations[1]:

  • AssignUniqueId[] - adds to each row of the dataset a unique identifier.
    In the query above, a unique id of type bigint is assigned to each row of the data scanned from aept_db.supermarket_products. Notice that the unique id is projected and used within a LeftJoin to data from aept_db.supermarket_categories (output from Fragment 1)
  • MarkDistinct[distinct = [unique:bigint], marker = is_distinct] - evaluates if a row has the same unique id (from AssignUniqueId) as another row, adding a column is_distinct = false if the row shares a unique id with another row, true otherwise. MarkDistinct can also be used as a distinct aggregation strategy set via the Trino optimizer. See this link for more information about the various strategies that can be employed

Let’s briefly break this apart: the correlated sub-query

(SELECT sc.category_name FROM aept_db.supermarket_categories sc WHERE sc.category_name = sp.category)
FROM aept_db.supermarket_products sp;

gets converted into a join by the query engine (pseudo-code):

aept_db.supermarket_products LEFT JOIN aept_db.supermarket_categories

The correlated sub-query must be scalar; it can only produce 1 value from the join. So, when the AssignUniqueId operator added a unique id to every row of aept_db.supermarket_products, if aept_db.supermarket_categories had N matches for the condition sc.category_name = sp.category, then the unique_id would be duplicated after the join N-1 times. Thus, when the MarkDistinct operator executes, the unique_id would appear more than once, and would ‘mark’ the is_distinct column as false for those rows.

This becomes important in the operation:

  • FilterProject[filterPredicate = (CASE “is_distinct” WHEN true THEN true ELSE CAST(fail(28, VARCHAR ‘Scalar sub-query has returned multiple rows’) AS boolean) END), projectLocality = LOCAL, protectedBarrier = NONE] - a combination of the Filter and Project operations.

    Here, if any is_distinct value is false, then the query will fail with the error ‘Scalar sub-query has returned multiple rows’

TopNRanking, Window and TopN

We previously saw that the Aggregate operator appeared when we executed a SUM function in the previous post. What about Window functions such as ROW_NUMBER, LEAD, LAG and MAX?

Here we have a simple query to get the maximum total_amount value for each transaction_date using a ROW_NUMBER function.

SELECT t_outer.transaction_date,
       t_outer.total_amount AS max_total_amount
FROM (
      SELECT t.transaction_date,
             t.total_amount,
             ROW_NUMBER() OVER (PARTITION BY t.transaction_date ORDER BY t.total_amount DESC) AS rn
      FROM aept_db.supermarket_transactions t
     ) t_outer 
WHERE t_outer.rn = 1
ORDER BY t_outer.transaction_date DESC;
Query Plan Fragment 0 [SINGLE] Output layout: [transaction_date, total_amount] Output partitioning: SINGLE [] Output[columnNames = [transaction_date, max_total_amount]] │ Layout: [transaction_date:date, total_amount:decimal(10,2)] │ Estimates: {rows: ? (?), cpu: 0, memory: 0B, network: 0B} │ max_total_amount := total_amount └─ RemoteMerge[sourceFragmentIds = [1]] Layout: [transaction_date:date, total_amount:decimal(10,2)] Fragment 1 [ROUND_ROBIN] Output layout: [transaction_date, total_amount] Output partitioning: SINGLE [] LocalMerge[orderBy = [transaction_date DESC NULLS LAST]] │ Layout: [transaction_date:date, total_amount:decimal(10,2)] │ Estimates: {rows: ? (?), cpu: 0, memory: 0B, network: 0B} └─ PartialSort[orderBy = [transaction_date DESC NULLS LAST]] │ Layout: [transaction_date:date, total_amount:decimal(10,2)] │ Estimates: {rows: ? (?), cpu: 0, memory: 0B, network: 0B} └─ RemoteSource[sourceFragmentIds = [2]] Layout: [transaction_date:date, total_amount:decimal(10,2)] Fragment 2 [HASH] Output layout: [transaction_date, total_amount] Output partitioning: ROUND_ROBIN [] Project[projectLocality = LOCAL, protectedBarrier = NONE] │ Layout: [transaction_date:date, total_amount:decimal(10,2)] │ Estimates: {rows: ? (?), cpu: ?, memory: 0B, network: 0B} └─ TopNRanking[partitionBy = [transaction_date], orderBy = [total_amount DESC NULLS LAST], limit = 1, hash = [$hashvalue]] │ Layout: [transaction_date:date, total_amount:decimal(10,2), $hashvalue:bigint, row_number:bigint] │ Estimates: {rows: ? (?), cpu: 0, memory: 0B, network: 0B} │ row_number := ROW_NUMBER └─ LocalExchange[partitioning = HASH, hashColumn = [$hashvalue], arguments = ["transaction_date"]] │ Layout: [transaction_date:date, total_amount:decimal(10,2), $hashvalue:bigint] │ Estimates: {rows: ? (?), cpu: ?, memory: 0B, network: 0B} └─ RemoteSource[sourceFragmentIds = [3]] Layout: [transaction_date:date, total_amount:decimal(10,2), $hashvalue_0:bigint] Fragment 3 [SOURCE] Output layout: [transaction_date, total_amount, $hashvalue_1] Output partitioning: HASH [transaction_date][$hashvalue_1] TopNRanking[partitionBy = [transaction_date], orderBy = [total_amount DESC NULLS LAST], limit = 1, hash = [$hashvalue_1]] │ Layout: [transaction_date:date, total_amount:decimal(10,2), $hashvalue_1:bigint] │ Estimates: {rows: ? (?), cpu: 0, memory: 0B, network: 0B} │ row_number := ROW_NUMBER └─ ScanProject[table = awsdatacatalog:aept_db:supermarket_transactions, projectLocality = LOCAL, protectedBarrier = NONE] Layout: [transaction_date:date, total_amount:decimal(10,2), $hashvalue_1:bigint] Estimates: {rows: ? (?), cpu: ?, memory: 0B, network: 0B}/{rows: ? (?), cpu: ?, memory: 0B, network: 0B} $hashvalue_1 := combine_hash(bigint '0', COALESCE("$operator$hash_code"("transaction_date"), 0)) transaction_date := transaction_date:date:REGULAR total_amount := total_amount:decimal(10,2):REGULAR

We are introduced to the operator:

  • TopNRanking[partitionBy = [transaction_date], orderBy = [total_amount DESC NULLS LAST], limit = 1, hash = [$hashvalue_1]] - indicates that a ranking function is to be executed on the table and that at most N rows need to be returned. In this case, the table is partitioned by transaction_date and ordered by total_amount DESC NULLS LAST.

Note: I was unable to find a good reference to confirm the above definition but it’s unlikely to be significantly wrong

The limit = 1 (the number N of rows to be returned) ties in with the predicate

WHERE t_outer.rn = 1

It appears that the TopNRanking operator will appear as long as there is an upper bound to the number of rows required. For example, the following predicates will invoke the TopNRanking operation:

1) WHERE t_outer.rn = 2
2) WHERE t_outer.rn <= 10
3) WHERE t_outer.rn >= 25 AND t_outer.rn <= 50

However, change the predicate so it no longer has an upper bound on the returned data:

WHERE t_outer.rn > 10
Query Plan Fragment 0 [SINGLE] Output layout: [transaction_date, total_amount, row_number] Output partitioning: SINGLE [] Output[columnNames = [transaction_date, max_total_amount, rn]] │ Layout: [transaction_date:date, total_amount:decimal(10,2), row_number:bigint] │ Estimates: {rows: ? (?), cpu: 0, memory: 0B, network: 0B} │ max_total_amount := total_amount │ rn := row_number └─ RemoteMerge[sourceFragmentIds = [1]] Layout: [transaction_date:date, total_amount:decimal(10,2), row_number:bigint] Fragment 1 [ROUND_ROBIN] Output layout: [transaction_date, total_amount, row_number] Output partitioning: SINGLE [] LocalMerge[orderBy = [transaction_date DESC NULLS LAST]] │ Layout: [transaction_date:date, total_amount:decimal(10,2), row_number:bigint] │ Estimates: {rows: ? (?), cpu: 0, memory: 0B, network: 0B} └─ PartialSort[orderBy = [transaction_date DESC NULLS LAST]] │ Layout: [transaction_date:date, total_amount:decimal(10,2), row_number:bigint] │ Estimates: {rows: ? (?), cpu: 0, memory: 0B, network: 0B} └─ RemoteSource[sourceFragmentIds = [2]] Layout: [transaction_date:date, total_amount:decimal(10,2), row_number:bigint] Fragment 2 [HASH] Output layout: [transaction_date, total_amount, row_number] Output partitioning: ROUND_ROBIN [] Filter[filterPredicate = ("row_number" > BIGINT '10')] │ Layout: [transaction_date:date, total_amount:decimal(10,2), row_number:bigint] │ Estimates: {rows: ? (?), cpu: ?, memory: 0B, network: 0B} └─ Project[projectLocality = LOCAL, protectedBarrier = NONE] │ Layout: [transaction_date:date, total_amount:decimal(10,2), row_number:bigint] │ Estimates: {rows: ? (?), cpu: ?, memory: 0B, network: 0B} └─ Window[partitionBy = [transaction_date], orderBy = [total_amount DESC NULLS LAST], hash = [$hashvalue]] │ Layout: [transaction_date:date, total_amount:decimal(10,2), $hashvalue:bigint, row_number:bigint] │ Estimates: {rows: ? (?), cpu: 0, memory: 0B, network: 0B} │ row_number := row_number() RANGE UNBOUNDED_PRECEDING CURRENT_ROW └─ LocalExchange[partitioning = HASH, hashColumn = [$hashvalue], arguments = ["transaction_date"]] │ Layout: [transaction_date:date, total_amount:decimal(10,2), $hashvalue:bigint] │ Estimates: {rows: ? (?), cpu: ?, memory: 0B, network: 0B} └─ RemoteSource[sourceFragmentIds = [3]] Layout: [transaction_date:date, total_amount:decimal(10,2), $hashvalue_0:bigint] Fragment 3 [SOURCE] Output layout: [transaction_date, total_amount, $hashvalue_1] Output partitioning: HASH [transaction_date][$hashvalue_1] ScanProject[table = awsdatacatalog:aept_db:supermarket_transactions, projectLocality = LOCAL, protectedBarrier = NONE] Layout: [transaction_date:date, total_amount:decimal(10,2), $hashvalue_1:bigint] Estimates: {rows: ? (?), cpu: ?, memory: 0B, network: 0B}/{rows: ? (?), cpu: ?, memory: 0B, network: 0B} $hashvalue_1 := combine_hash(bigint '0', COALESCE("$operator$hash_code"("transaction_date"), 0)) transaction_date := transaction_date:date:REGULAR total_amount := total_amount:decimal(10,2):REGULAR

and suddenly our TopNRanking is replaced with:

  • Window[partitionBy = [transaction_date], orderBy = [total_amount DESC NULLS LAST], hash = [$hashvalue]] - indicates that a Window function is to be applied to the dataset. In this case, the window partitioning is on transaction_date and the data are ordered by total_amount DESC NULLS LAST.

The TopNRanking operator seems to appear whenever there is an upper bound on some sequential labelling of rows, through Window functions such as:

  • RANK
  • DENSE_RANK

The Window operator is more common. Here is an example with the MAX function.

SELECT t.transaction_date,
       t.total_amount,
       MAX(t.total_amount) OVER (PARTITION BY t.transaction_date ORDER BY t.total_amount DESC ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) AS max_amount_for_date
FROM aept_db.supermarket_transactions t
ORDER BY t.transaction_date DESC;
~Part of execution plan └─ Window[partitionBy = [transaction_date], orderBy = [total_amount DESC NULLS LAST], hash = [$hashvalue]] │ Layout: [transaction_date:date, total_amount:decimal(10,2), $hashvalue:bigint, max:decimal(10,2)] │ Estimates: {rows: ? (?), cpu: 0, memory: 0B, network: 0B} │ max := max("total_amount") ROWS UNBOUNDED_PRECEDING UNBOUNDED_FOLLOWING ~Remaining execution plan

Note the third line of the Window operator block. The function max is referenced there, indicating that it will apply on the window, with ROWS UNBOUNDED PRECEDING and UNBOUNDED FOLLOWING.


Now, we have seen the TopNRanking node, but there is also a TopN node which appears in a more straightforward scenario:

SELECT td.* 
FROM aept_db.supermarket_transaction_details td
ORDER BY td.total_price DESC
LIMIT 10;
Query Plan Fragment 0 [SINGLE] Output layout: [transaction_id, product_id, quantity_sold, price_per_unit, total_price] Output partitioning: SINGLE [] Output[columnNames = [transaction_id, product_id, quantity_sold, price_per_unit, total_price]] │ Layout: [transaction_id:varchar, product_id:varchar, quantity_sold:bigint, price_per_unit:decimal(10,2), total_price:decimal(10,2)] │ Estimates: {rows: 10 (1.34kB), cpu: 0, memory: 0B, network: 0B} └─ TopN[count = 10, orderBy = [total_price DESC NULLS LAST]] │ Layout: [transaction_id:varchar, product_id:varchar, quantity_sold:bigint, price_per_unit:decimal(10,2), total_price:decimal(10,2)] │ Estimates: {rows: 10 (1.34kB), cpu: 0, memory: 0B, network: 0B} └─ LocalExchange[partitioning = SINGLE] │ Layout: [transaction_id:varchar, product_id:varchar, quantity_sold:bigint, price_per_unit:decimal(10,2), total_price:decimal(10,2)] │ Estimates: {rows: ? (?), cpu: 0, memory: 0B, network: 0B} └─ RemoteSource[sourceFragmentIds = [1]] Layout: [transaction_id:varchar, product_id:varchar, quantity_sold:bigint, price_per_unit:decimal(10,2), total_price:decimal(10,2)] Fragment 1 [SOURCE] Output layout: [transaction_id, product_id, quantity_sold, price_per_unit, total_price] Output partitioning: SINGLE [] TopNPartial[count = 10, orderBy = [total_price DESC NULLS LAST]] │ Layout: [transaction_id:varchar, product_id:varchar, quantity_sold:bigint, price_per_unit:decimal(10,2), total_price:decimal(10,2)] │ Estimates: {rows: ? (?), cpu: 0, memory: 0B, network: 0B} └─ TableScan[table = awsdatacatalog:aept_db:supermarket_transaction_details] Layout: [transaction_id:varchar, product_id:varchar, quantity_sold:bigint, price_per_unit:decimal(10,2), total_price:decimal(10,2)] Estimates: {rows: ? (?), cpu: ?, memory: 0B, network: 0B} transaction_id := transaction_id:string:REGULAR quantity_sold := quantity_sold:bigint:REGULAR total_price := total_price:decimal(10,2):REGULAR product_id := product_id:string:REGULAR price_per_unit := price_per_unit:decimal(10,2):REGULAR

We have[2]:

  • TopN[count = 10, orderBy = [total_price DESC NULLS LAST]] - indicates that the first N rows will be retrieved based on the global ordering specified for the dataset. In this case, the data are sorted by total_price DESC NULLS LAST, and only the first 10 rows are required
  • TopNPartial[count = 10, orderBy = [total_price DESC NULLS LAST]] - indicates that the first N rows will be retrieved on the global ordering of a subset of the total dataset. In this case, the data are sorted by total_price DESC NULLS LAST, and only the first 10 rows are required

The TopNPartial operation executes on one or more worker nodes to get the top N rows within a subset of data on each node whereas the TopN operation executes on the final projected dataset.

UNNEST

The last operation we will explore today is the UNNEST operator.

Let us create some fake array data and then unnest it:

WITH q_array1 AS (
                  SELECT 'id_101' AS id,
                         ARRAY[1,2,3] AS items
                 )
SELECT qa1.id,
       item
FROM q_array1 qa1, UNNEST(items) t(item)
Query Plan Fragment 0 [SINGLE] Output layout: [expr, field] Output partitioning: SINGLE [] Output[columnNames = [id, item]] │ Layout: [expr:varchar(6), field:integer] │ Estimates: {rows: 1 (60B), cpu: 0, memory: 0B, network: 0B} │ id := expr │ item := field └─ CrossJoin Unnest[replicate = [expr:varchar(6)], unnest = [expr_0:array(integer)]] │ Layout: [expr:varchar(6), field:integer] │ Estimates: {rows: 1 (60B), cpu: 0, memory: 0B, network: 0B} └─ LocalExchange[partitioning = ROUND_ROBIN] │ Layout: [expr:varchar(6), expr_0:array(integer)] │ Estimates: {rows: 1 (110B), cpu: 110, memory: 0B, network: 0B} └─ Values[] Layout: [expr:varchar(6), expr_0:array(integer)] Estimates: {rows: 1 (110B), cpu: 0, memory: 0B, network: 0B} ('id_101', "$literal$"(from_base64('CQAAAElOVF9BUlJBWQMAAAAAAQAAAAIAAAADAAAA')))

In this example, the UNNEST clause translates in the plan into a CrossJoin Unnest:

  • CrossJoin Unnest[replicate = [expr:varchar(6)], unnest = [expr_0:array(integer)]] - indicates that a cross join will occur between a table and a dataset generated by unnesting an array.

Note: I was unable to find a good reference for this either, but this is clearly what it’s doing

The Values[] section labels the literal ‘id_101’ as expr against the array of integers. The data are then exchanged between tasks in the node before the UNNEST function is applied. The expr is marked for replication on each row and the unnest is executed on each entry in the array.

Conclusion

There are many different types of operators/operations/nodes that can achieve a variety of outcomes within the Athena/Trino query engine. We have only reviewed a few of the most important, but if you are interested in finding out more, then this link and this link to the Presto and Trino Github code may be of interest. Just be wary that not all operators listed in these repositories will necessarily be available or used within Amazon Athena.

References

[1] - Trino Software Foundation. Trino Documentation - Supporting MERGE. https://trino.io/docs/current/develop/supporting-merge.html
[2] - PrestoDB/Presto Github - Plan Nodes. https://github.com/prestodb/presto/wiki/Plan-nodes