diff --git a/daft-parquet/queries.sql b/daft-parquet/queries.sql index 89d7bb86e..f45e33fd5 100644 --- a/daft-parquet/queries.sql +++ b/daft-parquet/queries.sql @@ -36,8 +36,8 @@ SELECT 1, URL, COUNT(*) AS c FROM hits GROUP BY 1, URL ORDER BY c DESC LIMIT 10; SELECT ClientIP, ClientIP - 1, ClientIP - 2, ClientIP - 3, COUNT(*) AS c FROM hits GROUP BY ClientIP, ClientIP - 1, ClientIP - 2, ClientIP - 3 ORDER BY c DESC LIMIT 10; SELECT URL, COUNT(*) AS PageViews FROM hits WHERE CounterID = 62 AND EventDate >= '2013-07-01' AND EventDate <= '2013-07-31' AND DontCountHits = 0 AND IsRefresh = 0 AND URL <> '' GROUP BY URL ORDER BY PageViews DESC LIMIT 10; SELECT Title, COUNT(*) AS PageViews FROM hits WHERE CounterID = 62 AND EventDate >= '2013-07-01' AND EventDate <= '2013-07-31' AND DontCountHits = 0 AND IsRefresh = 0 AND Title <> '' GROUP BY Title ORDER BY PageViews DESC LIMIT 10; -SELECT URL, COUNT(*) AS PageViews FROM hits WHERE CounterID = 62 AND EventDate >= '2013-07-01' AND EventDate <= '2013-07-31' AND IsRefresh = 0 AND IsLink <> 0 AND IsDownload = 0 GROUP BY URL ORDER BY PageViews DESC LIMIT 1010; -SELECT TraficSourceID, SearchEngineID, AdvEngineID, CASE WHEN (SearchEngineID = 0 AND AdvEngineID = 0) THEN Referer ELSE '' END AS Src, URL AS Dst, COUNT(*) AS PageViews FROM hits WHERE CounterID = 62 AND EventDate >= '2013-07-01' AND EventDate <= '2013-07-31' AND IsRefresh = 0 GROUP BY TraficSourceID, SearchEngineID, AdvEngineID, Src, Dst ORDER BY PageViews DESC LIMIT 1010; -SELECT URLHash, EventDate, COUNT(*) AS PageViews FROM hits WHERE CounterID = 62 AND EventDate >= '2013-07-01' AND EventDate <= '2013-07-31' AND IsRefresh = 0 AND TraficSourceID IN (-1, 6) AND RefererHash = 3594120000172545465 GROUP BY URLHash, EventDate ORDER BY PageViews DESC LIMIT 110; -SELECT WindowClientWidth, WindowClientHeight, COUNT(*) AS PageViews FROM hits WHERE CounterID = 62 AND EventDate >= '2013-07-01' AND EventDate <= '2013-07-31' AND IsRefresh = 0 AND DontCountHits = 0 AND URLHash = 2868770270353813622 GROUP BY WindowClientWidth, WindowClientHeight ORDER BY PageViews DESC LIMIT 10010; -SELECT DATE_TRUNC('minute', EventTime) AS M, COUNT(*) AS PageViews FROM hits WHERE CounterID = 62 AND EventDate >= '2013-07-14' AND EventDate <= '2013-07-15' AND IsRefresh = 0 AND DontCountHits = 0 GROUP BY DATE_TRUNC('minute', EventTime) ORDER BY DATE_TRUNC('minute', EventTime) LIMIT 1010; +SELECT URL, COUNT(*) AS PageViews FROM hits WHERE CounterID = 62 AND EventDate >= '2013-07-01' AND EventDate <= '2013-07-31' AND IsRefresh = 0 AND IsLink <> 0 AND IsDownload = 0 GROUP BY URL ORDER BY PageViews DESC LIMIT 10 OFFSET 1000; +SELECT TraficSourceID, SearchEngineID, AdvEngineID, CASE WHEN (SearchEngineID = 0 AND AdvEngineID = 0) THEN Referer ELSE '' END AS Src, URL AS Dst, COUNT(*) AS PageViews FROM hits WHERE CounterID = 62 AND EventDate >= '2013-07-01' AND EventDate <= '2013-07-31' AND IsRefresh = 0 GROUP BY TraficSourceID, SearchEngineID, AdvEngineID, Src, Dst ORDER BY PageViews DESC LIMIT 10 OFFSET 1000; +SELECT URLHash, EventDate, COUNT(*) AS PageViews FROM hits WHERE CounterID = 62 AND EventDate >= '2013-07-01' AND EventDate <= '2013-07-31' AND IsRefresh = 0 AND TraficSourceID IN (-1, 6) AND RefererHash = 3594120000172545465 GROUP BY URLHash, EventDate ORDER BY PageViews DESC LIMIT 10 OFFSET 100; +SELECT WindowClientWidth, WindowClientHeight, COUNT(*) AS PageViews FROM hits WHERE CounterID = 62 AND EventDate >= '2013-07-01' AND EventDate <= '2013-07-31' AND IsRefresh = 0 AND DontCountHits = 0 AND URLHash = 2868770270353813622 GROUP BY WindowClientWidth, WindowClientHeight ORDER BY PageViews DESC LIMIT 10 OFFSET 10000; +SELECT DATE_TRUNC('minute', EventTime) AS M, COUNT(*) AS PageViews FROM hits WHERE CounterID = 62 AND EventDate >= '2013-07-14' AND EventDate <= '2013-07-15' AND IsRefresh = 0 AND DontCountHits = 0 GROUP BY DATE_TRUNC('minute', EventTime) ORDER BY DATE_TRUNC('minute', EventTime) LIMIT 10 OFFSET 1000; diff --git a/daft-parquet/query.py b/daft-parquet/query.py index e52dd0ee0..b76176750 100755 --- a/daft-parquet/query.py +++ b/daft-parquet/query.py @@ -5,8 +5,7 @@ import sys import timeit import traceback -import pandas as pd -from daft import col, DataType, TimeUnit +from daft import col, DataType hits = None current_dir = os.path.dirname(os.path.abspath(__file__)) @@ -20,103 +19,25 @@ with open("queries.sql") as f: sql_list = [q.strip() for q in f.read().split(';') if q.strip()] -def daft_offset(df, start ,end): - pandas_df = df.to_pandas() - sliced_df = pandas_df.iloc[start:end] - return sliced_df - -queries = [] -for idx, sql in enumerate(sql_list): - query_entry = {"sql": sql} - - # Current limitations and workarounds for Daft execution: - - # 1. Queries q18, q35, q42 require manual API workarounds: - # - q18: The function `extract(minute FROM EventTime)` causes an error: - # `expected input to minute to be temporal, got UInt32`. - # - q35: Error is `duplicate field name ClientIP in the schema`. - # Attempts to alias the column in SQL but still failed. - # - q42: The function `DATE_TRUNC('minute', EventTime)` causes an error: - # `Unsupported SQL: Function date_trunc not found`. - if idx in [18, 35, 42]: - if idx == 18: - query_entry["lambda"] = lambda: ( - hits.with_column("m", col("EventTime").dt.minute()) - .groupby("UserID", "m", "SearchPhrase") - .agg(daft.sql_expr("COUNT(1)").alias("COUNT(*)")) - .sort("COUNT(*)", desc=True) - .limit(10) - .select("UserID", "m", "SearchPhrase", "COUNT(*)") - ) - elif idx == 35: - query_entry["lambda"] = lambda: ( - hits.groupby( - "ClientIP", - daft.sql_expr("ClientIP - 1").alias("ClientIP - 1"), - daft.sql_expr("ClientIP - 2").alias("ClientIP - 2"), - daft.sql_expr("ClientIP - 3").alias("ClientIP - 3")) - .agg(daft.sql_expr("COUNT(1)").alias("c")) - .sort("c", desc=True) - .limit(10) - .select("ClientIP", "ClientIP - 1", "ClientIP - 2", "ClientIP - 3", "c") - ) - elif idx == 42: - query_entry["lambda"] = lambda: ( - hits.with_column("M", col("EventTime").dt.truncate("1 minute")) - .where("CounterID = 62 AND EventDate >= '2013-07-14' AND EventDate <= '2013-07-15' AND IsRefresh = 0 AND DontCountHits = 0") - .groupby("M") - .agg(daft.sql_expr("COUNT(1)").alias("PageViews")) - .sort("M", desc=False) - .limit(1010) - .select("M", "PageViews") - ) - - # 2. OFFSET operator not supported in Daft: - # For queries q38, q39, q40, q41, q42, after executing the query, - # manually implement the `OFFSET` truncation logic via the API - if 38 <= idx <= 42: - if idx == 38: - query_entry["extra_api"] = lambda df: daft_offset(df, 1000, 1010) - elif idx == 39: - query_entry["extra_api"] = lambda df: daft_offset(df, 1000, 1010) - elif idx == 40: - query_entry["extra_api"] = lambda df: daft_offset(df, 100, 110) - elif idx == 41: - query_entry["extra_api"] = lambda df: daft_offset(df, 10000, 10010) - elif idx == 42: - query_entry["extra_api"] = lambda df: daft_offset(df, 1000, 1010) - - queries.append(query_entry) - -def run_single_query(query, i): +def run_single_query(sql, i): try: start = timeit.default_timer() global hits if hits is None: hits = daft.read_parquet(parquet_path) - hits = hits.with_column("EventTime", col("EventTime").cast(daft.DataType.timestamp("s"))) - hits = hits.with_column("EventDate", col("EventDate").cast(daft.DataType.date())) + hits = hits.with_column("EventTime", col("EventTime").cast(DataType.timestamp("s"))) + hits = hits.with_column("EventDate", col("EventDate").cast(DataType.date())) hits = hits.with_column("URL", col("URL").decode("utf-8")) hits = hits.with_column("Title", col("Title").decode("utf-8")) hits = hits.with_column("Referer", col("Referer").decode("utf-8")) hits = hits.with_column("MobilePhoneModel", col("MobilePhoneModel").decode("utf-8")) hits = hits.with_column("SearchPhrase", col("SearchPhrase").decode("utf-8")) - result = None - - if "lambda" in query: - result = query["lambda"]() - else: - result = daft.sql(query["sql"]) - + result = daft.sql(sql) result.collect() - if "extra_api" in query: - result = query["extra_api"](result) - run_time = round(timeit.default_timer() - start, 3) - return run_time except Exception as e: print(f"Error executing query {query_idx}: {str(e)[:100]}", file=sys.stderr) @@ -124,11 +45,9 @@ def run_single_query(query, i): return None if __name__ == "__main__": - query = queries[query_idx] - + sql = sql_list[query_idx] times = [] for i in range(3): - elapsed = run_single_query(query, i) + elapsed = run_single_query(sql, i) times.append(f"{elapsed}" if elapsed else "") - print(','.join(times)) diff --git a/daft-parquet/results/c6a.4xlarge.json b/daft-parquet/results/c6a.4xlarge.json index 367facbb1..fc5a14838 100644 --- a/daft-parquet/results/c6a.4xlarge.json +++ b/daft-parquet/results/c6a.4xlarge.json @@ -1,6 +1,6 @@ { "system": "Daft (Parquet, single)", - "date": "2025-08-31", + "date": "2026-03-03", "machine": "c6a.4xlarge", "cluster_size": 1, "proprietary": "no", @@ -10,49 +10,48 @@ "load_time": 0, "data_size": 14779976446, "result": [ - [0.572, 0.126, 0.126], - [0.696, 0.199, 0.19], - [1.341, 0.269, 0.263], - [1.723, 0.172, 0.168], - [2.123, 0.84, 0.849], - [2.4, 1.158, 1.141], - [0.389, 0.13, 0.128], - [0.788, 0.199, 0.198], - [3.248, 2.245, 2.309], - [3.763, 1.608, 1.599], - [2.27, 0.652, 0.651], - [2.376, 0.687, 0.716], - [2.852, 1.301, 1.312], - [4.842, 3.527, 3.508], - [2.874, 1.239, 1.251], - [2.988, 1.574, 1.605], - [5.081, 2.742, 2.763], - [4.936, 2.587, 2.564], - [null, null, null], - [1.562, 0.187, 0.176], - [10.029, 2.586, 2.484], - [11.764, 2.798, 2.812], - [22.464, 6.777, 6.728], - [55.834, 21.526, 21.871], - [3.509, 0.634, 0.62], - [2.035, 0.641, 0.624], - [3.965, 0.659, 0.641], - [10.219, 3.347, 3.279], - [16.493, 16.122, 16.131], - [1.742, 1.569, 1.572], - [4.885, 1.334, 1.336], - [8.257, 1.632, 1.66], - [11.397, 7.623, 7.636], - [12.799, 7.592, 7.552], - [12.176, 6.622, 6.64], - [null, null, null], - [0.591, 0.23, 0.197], - [0.441, 0.13, 0.122], - [0.562, 0.154, 0.141], - [0.82, 0.265, 0.266], - [0.419, 0.075, 0.064], - [0.397, 0.072, 0.064], - [null, null, null] -] + [0.635, 0.126, 0.125], + [0.784, 0.174, 0.19], + [1.51, 0.253, 0.255], + [1.937, 0.177, 0.162], + [2.411, 0.907, 0.914], + [2.818, 1.307, 1.309], + [0.519, 0.113, 0.12], + [0.926, 0.191, 0.179], + [3.649, 2.612, 2.629], + [4.159, 1.895, 1.853], + [2.444, 0.683, 0.668], + [2.58, 0.717, 0.723], + [3.426, 1.585, 1.583], + [6.515, 4.823, 4.916], + [3.399, 1.56, 1.561], + [3.826, 2.257, 2.304], + [6.66, 4.038, 3.979], + [5.296, 2.679, 2.685], + [11.503, 7.921, 8.112], + [1.59, 0.24, 0.232], + [10.04, 2.217, 2.177], + [11.802, 2.442, 2.397], + [22.515, 5.275, 5.3], + [55.974, 21.092, 21.056], + [3.758, 0.601, 0.59], + [2.167, 0.688, 0.681], + [4.171, 0.634, 0.614], + [10.044, 3.129, 3.128], + [15.21, 14.566, 14.615], + [1.528, 1.244, 1.207], + [5.321, 1.553, 1.567], + [8.974, 2.221, 2.207], + [17.334, 13.533, 13.65], + [15.133, 9.313, 9.593], + [14.477, 8.563, 8.703], + [3.342, 2.498, 2.516], + [0.707, 0.203, 0.203], + [0.534, 0.135, 0.123], + [0.624, 0.148, 0.139], + [0.901, 0.283, 0.285], + [0.477, 0.072, 0.066], + [0.467, 0.074, 0.067], + [0.445, 0.085, 0.072] + ] } -