From fa6855bb9a5ec60f55172967d35a1ea4e545bf1f Mon Sep 17 00:00:00 2001 From: Matthew Kim <11141331+mattdeekay@users.noreply.github.com> Date: Tue, 27 Jun 2023 15:32:02 -0700 Subject: [PATCH 1/5] Cloud Fetch e2e tests Signed-off-by: Matthew Kim <11141331+mattdeekay@users.noreply.github.com> --- tests/e2e/test_driver.py | 25 ++++++++++++++++++++++++- 1 file changed, 24 insertions(+), 1 deletion(-) diff --git a/tests/e2e/test_driver.py b/tests/e2e/test_driver.py index 26b7d186b..c9e187c3a 100644 --- a/tests/e2e/test_driver.py +++ b/tests/e2e/test_driver.py @@ -1,3 +1,4 @@ +import itertools from contextlib import contextmanager from collections import OrderedDict import datetime @@ -52,6 +53,7 @@ def __init__(self, method_name): # If running in local mode, just use environment variables for params. self.arguments = os.environ if get_args_from_env else {} self.arraysize = 1000 + self.buffer_size_bytes = 10485760 def connection_params(self, arguments): params = { @@ -84,7 +86,7 @@ def connection(self, extra_params=()): @contextmanager def cursor(self, extra_params=()): with self.connection(extra_params) as conn: - cursor = conn.cursor(arraysize=self.arraysize) + cursor = conn.cursor(arraysize=self.arraysize, buffer_size_bytes=self.buffer_size_bytes) try: yield cursor finally: @@ -633,6 +635,27 @@ def test_closing_a_closed_connection_doesnt_fail(self): self.assertTrue(expected_message_was_found, "Did not find expected log messages") + @skipUnless(pysql_supports_arrow(), 'needs arrow support') + def test_cloud_fetch(self): + # This test can take several minutes to run + limits = [100000, 600000] + threads = [10, 25] + self.buffer_size_bytes = 104857600 + self.arraysize = 100000 + base_query = "SELECT * FROM store_sales " + for num_limit, num_threads, lz4_compression in itertools.product(limits, threads, [True, False]): + with self.subTest(num_limit=num_limit, num_threads=num_threads, lz4_compression=lz4_compression): + cf_result, noop_result = None, None + query = base_query + "LIMIT " + str(num_limit) + with self.cursor({"use_cloud_fetch": True, "max_download_threads": num_threads}) as cursor: + cursor.execute(query) + cf_result = cursor.fetchall() + with self.cursor({}) as cursor: + cursor.execute(query) + noop_result = cursor.fetchall() + assert len(cf_result) == len(noop_result) + for i in range(len(cf_result)): + assert cf_result[i] == noop_result[i] # use a RetrySuite to encapsulate these tests which we'll typically want to run together; however keep # the 429/503 subsuites separate since they execute under different circumstances. From 2a642cbaafa78f47982b08d8a6e5101e28b7d468 Mon Sep 17 00:00:00 2001 From: Matthew Kim <11141331+mattdeekay@users.noreply.github.com> Date: Mon, 3 Jul 2023 14:43:41 -0700 Subject: [PATCH 2/5] Test case works for e2-dogfood shared unity catalog Signed-off-by: Matthew Kim <11141331+mattdeekay@users.noreply.github.com> --- tests/e2e/test_driver.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/e2e/test_driver.py b/tests/e2e/test_driver.py index c9e187c3a..c046484f9 100644 --- a/tests/e2e/test_driver.py +++ b/tests/e2e/test_driver.py @@ -638,11 +638,11 @@ def test_closing_a_closed_connection_doesnt_fail(self): @skipUnless(pysql_supports_arrow(), 'needs arrow support') def test_cloud_fetch(self): # This test can take several minutes to run - limits = [100000, 600000] + limits = [100000, 300000] threads = [10, 25] self.buffer_size_bytes = 104857600 self.arraysize = 100000 - base_query = "SELECT * FROM store_sales " + base_query = "SELECT * FROM store_sales WHERE ss_sold_date_sk = 2452234 " for num_limit, num_threads, lz4_compression in itertools.product(limits, threads, [True, False]): with self.subTest(num_limit=num_limit, num_threads=num_threads, lz4_compression=lz4_compression): cf_result, noop_result = None, None From 5661bf18da9f4570a509c1c0ed41047456e331e8 Mon Sep 17 00:00:00 2001 From: Matthew Kim <11141331+mattdeekay@users.noreply.github.com> Date: Wed, 5 Jul 2023 11:50:27 -0700 Subject: [PATCH 3/5] Moving test to LargeQueriesSuite and setting catalog to hive_metastore Signed-off-by: Matthew Kim <11141331+mattdeekay@users.noreply.github.com> --- tests/e2e/test_driver.py | 49 +++++++++++++++++++++++----------------- 1 file changed, 28 insertions(+), 21 deletions(-) diff --git a/tests/e2e/test_driver.py b/tests/e2e/test_driver.py index c046484f9..49f6d80a5 100644 --- a/tests/e2e/test_driver.py +++ b/tests/e2e/test_driver.py @@ -106,6 +106,34 @@ def get_some_rows(self, cursor, fetchmany_size): else: return None + @skipUnless(pysql_supports_arrow(), 'needs arrow support') + def test_cloud_fetch(self): + # This test can take several minutes to run + limits = [100000, 300000] + threads = [10, 25] + self.buffer_size_bytes = 104857600 + self.arraysize = 100000 + base_query = "SELECT * FROM store_sales WHERE ss_sold_date_sk = 2452234 " + for num_limit, num_threads, lz4_compression in itertools.product(limits, threads, [True, False]): + with self.subTest(num_limit=num_limit, num_threads=num_threads, lz4_compression=lz4_compression): + cf_result, noop_result = None, None + query = base_query + "LIMIT " + str(num_limit) + with self.cursor({ + "use_cloud_fetch": True, + "max_download_threads": num_threads, + "catalog": "hive_metastore" + }) as cursor: + cursor.execute(query) + cf_result = cursor.fetchall() + with self.cursor({ + "catalog": "hive_metastore" + }) as cursor: + cursor.execute(query) + noop_result = cursor.fetchall() + assert len(cf_result) == len(noop_result) + for i in range(len(cf_result)): + assert cf_result[i] == noop_result[i] + # Exclude Retry tests because they require specific setups, and LargeQueries too slow for core # tests @@ -635,27 +663,6 @@ def test_closing_a_closed_connection_doesnt_fail(self): self.assertTrue(expected_message_was_found, "Did not find expected log messages") - @skipUnless(pysql_supports_arrow(), 'needs arrow support') - def test_cloud_fetch(self): - # This test can take several minutes to run - limits = [100000, 300000] - threads = [10, 25] - self.buffer_size_bytes = 104857600 - self.arraysize = 100000 - base_query = "SELECT * FROM store_sales WHERE ss_sold_date_sk = 2452234 " - for num_limit, num_threads, lz4_compression in itertools.product(limits, threads, [True, False]): - with self.subTest(num_limit=num_limit, num_threads=num_threads, lz4_compression=lz4_compression): - cf_result, noop_result = None, None - query = base_query + "LIMIT " + str(num_limit) - with self.cursor({"use_cloud_fetch": True, "max_download_threads": num_threads}) as cursor: - cursor.execute(query) - cf_result = cursor.fetchall() - with self.cursor({}) as cursor: - cursor.execute(query) - noop_result = cursor.fetchall() - assert len(cf_result) == len(noop_result) - for i in range(len(cf_result)): - assert cf_result[i] == noop_result[i] # use a RetrySuite to encapsulate these tests which we'll typically want to run together; however keep # the 429/503 subsuites separate since they execute under different circumstances. From 72b950708f4ee6a82c7f3c33c9b39e26bb12eb0c Mon Sep 17 00:00:00 2001 From: Matthew Kim <11141331+mattdeekay@users.noreply.github.com> Date: Wed, 5 Jul 2023 12:03:22 -0700 Subject: [PATCH 4/5] Align default value of buffer_size_bytes in driver tests Signed-off-by: Matthew Kim <11141331+mattdeekay@users.noreply.github.com> --- tests/e2e/test_driver.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/tests/e2e/test_driver.py b/tests/e2e/test_driver.py index 49f6d80a5..1be741f4a 100644 --- a/tests/e2e/test_driver.py +++ b/tests/e2e/test_driver.py @@ -53,7 +53,7 @@ def __init__(self, method_name): # If running in local mode, just use environment variables for params. self.arguments = os.environ if get_args_from_env else {} self.arraysize = 1000 - self.buffer_size_bytes = 10485760 + self.buffer_size_bytes = 104857600 def connection_params(self, arguments): params = { @@ -111,7 +111,6 @@ def test_cloud_fetch(self): # This test can take several minutes to run limits = [100000, 300000] threads = [10, 25] - self.buffer_size_bytes = 104857600 self.arraysize = 100000 base_query = "SELECT * FROM store_sales WHERE ss_sold_date_sk = 2452234 " for num_limit, num_threads, lz4_compression in itertools.product(limits, threads, [True, False]): From 186bfcce008d8d7a4033c6628bff6b059edf1a2a Mon Sep 17 00:00:00 2001 From: Matthew Kim <11141331+mattdeekay@users.noreply.github.com> Date: Fri, 7 Jul 2023 10:43:50 -0700 Subject: [PATCH 5/5] Adding comment to specify what's needed to run successfully Signed-off-by: Matthew Kim <11141331+mattdeekay@users.noreply.github.com> --- tests/e2e/test_driver.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/tests/e2e/test_driver.py b/tests/e2e/test_driver.py index 1be741f4a..c8713bf08 100644 --- a/tests/e2e/test_driver.py +++ b/tests/e2e/test_driver.py @@ -112,6 +112,9 @@ def test_cloud_fetch(self): limits = [100000, 300000] threads = [10, 25] self.arraysize = 100000 + # This test requires a large table with many rows to properly initiate cloud fetch. + # e2-dogfood host > hive_metastore catalog > main schema has such a table called store_sales. + # If this table is deleted or this test is run on a different host, a different table may need to be used. base_query = "SELECT * FROM store_sales WHERE ss_sold_date_sk = 2452234 " for num_limit, num_threads, lz4_compression in itertools.product(limits, threads, [True, False]): with self.subTest(num_limit=num_limit, num_threads=num_threads, lz4_compression=lz4_compression):