diff --git a/Data-Gen/create_join_tables.py b/Data-Gen/create_join_tables.py new file mode 100644 index 0000000..b4bb0ce --- /dev/null +++ b/Data-Gen/create_join_tables.py @@ -0,0 +1,476 @@ +#!/usr/bin/env python3 +import pandas as pd +from pyspark.sql import SparkSession, Row +import pyspark.sql.functions as F +from pyspark.sql.types import ( + StructType, StructField, StringType, IntegerType, LongType, + FloatType, DoubleType, BooleanType, DateType, TimestampType, ArrayType +) + +spark = ( + SparkSession.builder + .master("local[*]") + .appName("CreateJoinTables") + .config("spark.driver.host", "127.0.0.1") + .config("spark.driver.bindAddress", "127.0.0.1") + .config("spark.driver.memory", "128g") + .config("spark.memory.fraction", "0.8") + .config("spark.sql.shuffle.partitions", "800") + .config("spark.local.dir", "/scratch/prestouser/spark-tmp") + .config("spark.executor.heartbeatInterval", "60s") + .config("spark.network.timeout", "300s") + .getOrCreate() +) + +target_size = 1 * 1024**4 # 1 TB +num_nodes_per_graph = 500_000 +PATH_PREFIX = f"/scratch/prestouser/test-data/{num_nodes_per_graph}-1TB" + +# ------------------------------------- +# Step 1. Read all sheets from the Excel file. +# ------------------------------------- +excel_path = "../datagen_schema.xlsx" # update this path as necessary + +# Read every sheet into a dictionary: keys are sheet names, values are DataFrames. +sheets = pd.read_excel(excel_path, sheet_name=None) +# sheets = spark.read. +sheet_names = list(sheets.keys()) +print("Found sheets:", sheet_names) + +table_a = spark.read.parquet(PATH_PREFIX) +# ------------------------------------- +# Step 2. Process the tables overview (first sheet) +# ------------------------------------- +# Assumption: The first sheet (e.g. "Tables") lists the table names and approximate row counts. +tables_overview_df = sheets[sheet_names[0]] +# Adjust these column names if your Excel file uses different names. +table_names = tables_overview_df["masked_table_id"].tolist() +approx_row_counts = tables_overview_df["num_rows_approx"].tolist() + +print("Tables and approximate row counts:") +for tbl, cnt in zip(table_names, approx_row_counts): + print(f" {tbl}: ~{cnt} rows") + +# ------------------------------------- +# Step 3. Read each table's metadata (columns, types, etc.) +# ------------------------------------- +# Here we assume that the sheet name for each table is the same as the table name. +table_metadata = {} +for tbl in table_names: + if tbl in sheets: + meta_df = sheets[tbl] + table_metadata[tbl] = meta_df + print(f"Loaded metadata for table '{tbl}'.") + else: + print(f"Warning: No metadata sheet found for table '{tbl}'.") + +# ------------------------------------- +# Step 4. Define a mapping from your Excel type names to Spark types. +# ------------------------------------- +spark_type_mapping = { + "StringType()": StringType(), + "StringType": StringType(), + "IntegerType()": IntegerType(), + "IntegerType()": IntegerType(), + "LongType()": LongType(), + "FloatType()": FloatType(), + "DoubleType()": DoubleType(), + "BooleanType()": BooleanType(), + "BooleanType()": BooleanType(), + "DateType()": DateType(), + "TimestampType()": TimestampType(), + "ArrayType(IntegerType(), True)": ArrayType(IntegerType(), True), + "ArrayType(StringType(), True)": ArrayType(StringType(), True) +} + +def create_schema(meta_df): + """ + Create a Spark schema (StructType) from the metadata DataFrame. + For numerical types, if "min" and "max" are provided, they are stored in the field metadata. + This version ensures that the type from the spreadsheet is used (if it matches). + """ + fields = [] + # Ensure that the range columns exist in the DataFrame. + has_range = ("min" in meta_df.columns) and ("max" in meta_df.columns) + + for idx, row in meta_df.iterrows(): + col_name = row["masked_column_name"] + # Convert the Type from the spreadsheet to a lower-case string. + type_str = str(row["spark_data_type"]).strip() if pd.notna(row["spark_data_type"]) else "string" + spark_type = spark_type_mapping.get(type_str) + + if spark_type is None: + # If the type is not recognized, warn and default to StringType. + print(f"Warning: Unrecognized type '{row['spark_data_type']}' for column '{col_name}'. Using StringType.") + spark_type = StringType() + + md = {} + # For numerical types, if min and max values are provided, store them in metadata. + if isinstance(spark_type, (IntegerType, LongType, FloatType, DoubleType)) and has_range: + if pd.notna(row["min"]) and pd.notna(row["max"]): + md["min"] = row["min"] + md["max"] = row["max"] + + fields.append(StructField(col_name, spark_type, True, metadata=md)) + + return StructType(fields) + +# Create a dictionary of schemas for each table. +schemas = {} +for tbl, meta_df in table_metadata.items(): + schema = create_schema(meta_df) + schemas[tbl] = schema + print(f"Schema for table '{tbl}': {schema}") + + +# ------------------------------------- +# Step 5. Process join information. +# ------------------------------------- +# Assumption: The final sheet (last sheet) is named "Joins" and holds the join definitions. +join_info_df = sheets[sheet_names[1]] +joins = [] +# Here we assume join_info_df has columns: "LeftTable", "LeftColumn", "RightTable", "RightColumn", and optionally "JoinType" +for idx, row in join_info_df.iterrows(): + join_detail = { + "left_table": row["table1"], + "right_table": row["table2"], + "join_method": row["join_method"], + "left_column": row["column1"], + "right_column": row["column2"] + } + joins.append(join_detail) + +# ======================================== +# PART 2: Generate random data for each table and register as temp views +# ======================================== + +def generate_random_dataframe(schema, num_rows): + """ + Simpler version with basic array generation + """ + df = spark.range(num_rows) + + for field in schema.fields: + col_name = field.name + dt = field.dataType + md = field.metadata or {} + + if isinstance(dt, (IntegerType, LongType)): + min_val = md.get("min", 1) + max_val = md.get("max", 1000) + expr = (F.rand() * (float(max_val) - float(min_val)) + float(min_val)) + if isinstance(dt, IntegerType): + df = df.withColumn(col_name, expr.cast("int")) + else: + df = df.withColumn(col_name, expr.cast("long")) + + elif isinstance(dt, (FloatType, DoubleType)): + min_val = md.get("min", 0.0) + max_val = md.get("max", 1000.0) + expr = (F.rand() * (float(max_val) - float(min_val)) + float(min_val)) + if isinstance(dt, FloatType): + df = df.withColumn(col_name, expr.cast("float")) + else: + df = df.withColumn(col_name, expr.cast("double")) + + elif isinstance(dt, BooleanType): + df = df.withColumn(col_name, F.rand() > 0.5) + + elif isinstance(dt, DateType): + df = df.withColumn(col_name, F.expr("date_add('2000-01-01', cast(rand() * 9000 as int))")) + + elif isinstance(dt, TimestampType): + df = df.withColumn(col_name, F.expr("to_timestamp(date_add('2000-01-01', cast(rand() * 9000 as int)))")) + + elif isinstance(dt, StringType): + df = df.withColumn(col_name, + F.concat(F.lit("str_"), + F.abs(F.hash(F.col("id"), F.rand())).cast("string"))) + + elif isinstance(dt, ArrayType): + # Simpler array generation - fixed size arrays + element_type = dt.elementType + + if isinstance(element_type, IntegerType): + # Create array of 3 random integers + df = df.withColumn(col_name, + F.array( + (F.rand() * 100).cast("int"), + (F.rand() * 100).cast("int"), + (F.rand() * 100).cast("int") + )) + + elif isinstance(element_type, LongType): + # Create array of 3 random longs + df = df.withColumn(col_name, + F.array( + (F.rand() * 1000).cast("long"), + (F.rand() * 1000).cast("long"), + (F.rand() * 1000).cast("long") + )) + + elif isinstance(element_type, StringType): + # Create array of 3 random strings + df = df.withColumn(col_name, + F.array( + F.concat(F.lit("item_"), (F.rand() * 100).cast("int").cast("string")), + F.concat(F.lit("item_"), (F.rand() * 100).cast("int").cast("string")), + F.concat(F.lit("item_"), (F.rand() * 100).cast("int").cast("string")) + )) + + else: + # Default to empty array for unsupported types + df = df.withColumn(col_name, F.array()) + + else: + df = df.withColumn(col_name, F.lit(None)) + + return df.drop("id") + +# Create and register a DataFrame for each table using the distributed random data generation. +# NOTE: THIS WAS SCALED DOWN FOR TESTING PURPOSES. UNCOMMENT LINE 74 AND COMMENT OUT LINES 68-73 FOR REAL TESTING +dfs = {} +for tbl, count in zip(table_names, approx_row_counts): + if tbl != 'table_a': + schema = schemas[tbl] + if tbl == 'table_c': + num_rows = 21000000 + else: + num_rows = int(count) + # num_rows = int(count) + df = generate_random_dataframe(schema, num_rows) + dfs[tbl] = df + print(f"Created DataFrame for table '{tbl}' with {num_rows} random rows.") + +table_b = dfs['table_b'] +table_c = dfs['table_c'] +table_d = dfs['table_d'] +table_e = dfs['table_e'] + + +# ========================= +# CONFIGURATION - Set your desired match percentages here +# ========================= +MATCH_PERCENTAGE_A = 0.001 # 0.1% of table_a rows will match +MATCH_PERCENTAGE_C = 0.01 # 1% of table_c rows will match +MATCH_PERCENTAGE_D = 0.01 # 1% of table_d rows will match +MATCH_PERCENTAGE_E = 0.01 # 1% of table_e rows will match + +print("=" * 60) +print("FORCING TABLES TO MATCH TABLE_B VALUES") +print(f"Match percentages: A={MATCH_PERCENTAGE_A*100}%, C={MATCH_PERCENTAGE_C*100}%, D={MATCH_PERCENTAGE_D*100}%, E={MATCH_PERCENTAGE_E*100}%") +print("=" * 60) + +# ========================= +# Force table_a (4 columns: col_a, col_c, col_b, col_d) +# ========================= +print("\n1. Processing table_a...") +table_a_combos_list = ( + table_b + .select("col_b_8", "col_b_3", "col_b_9", "col_b_1") + .distinct() + .filter( + F.col("col_b_8").isNotNull() & + F.col("col_b_3").isNotNull() & + F.col("col_b_9").isNotNull() & + F.col("col_b_1").isNotNull() + ) + .collect() +) + +combo_count_a = len(table_a_combos_list) +print(f" Distinct combinations for table_a: {combo_count_a}") + +combos_a_with_id = [ + Row( + new_col_a=combo['col_b_8'], + new_col_c=combo['col_b_3'], + new_col_b=combo['col_b_9'], + new_col_d=combo['col_b_1'], + combo_id=idx + 1 + ) + for idx, combo in enumerate(table_a_combos_list) +] + +table_a_combos_df = spark.createDataFrame(combos_a_with_id) + +# Add a random number to each row to decide if it should be forced +table_a_forced = ( + table_a + .withColumn("should_force", F.rand() < MATCH_PERCENTAGE_A) + .withColumn("combo_id", + F.when(F.col("should_force"), F.floor(F.rand() * combo_count_a) + 1) + .otherwise(F.lit(None)) + ) + # Left join to preserve all rows + .join( + F.broadcast(table_a_combos_df), + "combo_id", + "left" + ) + # For forced rows, use new values; for others, keep original + .withColumn("col_a", F.coalesce("new_col_a", "col_a")) + .withColumn("col_c", F.coalesce("new_col_c", "col_c")) + .withColumn("col_b", F.coalesce("new_col_b", "col_b")) + .withColumn("col_d", F.coalesce("new_col_d", "col_d")) + .drop("should_force", "combo_id", "new_col_a", "new_col_c", "new_col_b", "new_col_d") + .select(*table_a.columns) +) + +table_a_forced.write.mode("overwrite").parquet(f"{PATH_PREFIX}/temp_forced_table_a") +table_a_forced = spark.read.parquet(f"{PATH_PREFIX}/temp_forced_table_a") +print(f" ✓ table_a_forced created ({MATCH_PERCENTAGE_A*100}% forced)") + +# ========================= +# Force table_c (3 columns: col_c_10, col_c_9, col_c_11) +# ========================= +print("\n2. Processing table_c...") +table_c_combos_list = ( + table_b + .select("col_b_8", "col_b_9", "col_b_3") + .distinct() + .filter( + F.col("col_b_8").isNotNull() & + F.col("col_b_9").isNotNull() & + F.col("col_b_3").isNotNull() + ) + .collect() +) + +combo_count_c = len(table_c_combos_list) +print(f" Distinct combinations for table_c: {combo_count_c}") + +combos_c_with_id = [ + Row( + new_col_c_10=combo['col_b_8'], + new_col_c_9=combo['col_b_9'], + new_col_c_11=combo['col_b_3'], + combo_id=idx + 1 + ) + for idx, combo in enumerate(table_c_combos_list) +] + +table_c_combos_df = spark.createDataFrame(combos_c_with_id) + +table_c_forced = ( + table_c + .withColumn("should_force", F.rand() < MATCH_PERCENTAGE_C) + .withColumn("combo_id", + F.when(F.col("should_force"), F.floor(F.rand() * combo_count_c) + 1) + .otherwise(F.lit(None)) + ) + .join( + F.broadcast(table_c_combos_df), + "combo_id", + "left" + ) + .withColumn("col_c_10", F.coalesce("new_col_c_10", "col_c_10")) + .withColumn("col_c_9", F.coalesce("new_col_c_9", "col_c_9")) + .withColumn("col_c_11", F.coalesce("new_col_c_11", "col_c_11")) + .drop("should_force", "combo_id", "new_col_c_10", "new_col_c_9", "new_col_c_11") + .select(*table_c.columns) +) + +table_c_forced.write.mode("overwrite").parquet(f"{PATH_PREFIX}/temp_forced_table_c") +table_c_forced = spark.read.parquet(f"{PATH_PREFIX}/temp_forced_table_c") +print(f" ✓ table_c_forced created ({MATCH_PERCENTAGE_C*100}% forced)") + +# ========================= +# Force table_d (2 columns: col_d_0, col_d_1) +# ========================= +print("\n3. Processing table_d...") +table_d_combos_list = ( + table_b + .select("col_b_8", "col_b_9") + .distinct() + .filter( + F.col("col_b_8").isNotNull() & + F.col("col_b_9").isNotNull() + ) + .collect() +) + +combo_count_d = len(table_d_combos_list) +print(f" Distinct combinations for table_d: {combo_count_d}") + +combos_d_with_id = [ + Row( + new_col_d_0=combo['col_b_8'], + new_col_d_1=combo['col_b_9'], + combo_id=idx + 1 + ) + for idx, combo in enumerate(table_d_combos_list) +] + +table_d_combos_df = spark.createDataFrame(combos_d_with_id) + +table_d_forced = ( + table_d + .withColumn("should_force", F.rand() < MATCH_PERCENTAGE_D) + .withColumn("combo_id", + F.when(F.col("should_force"), F.floor(F.rand() * combo_count_d) + 1) + .otherwise(F.lit(None)) + ) + .join( + F.broadcast(table_d_combos_df), + "combo_id", + "left" + ) + .withColumn("col_d_0", F.coalesce("new_col_d_0", "col_d_0")) + .withColumn("col_d_1", F.coalesce("new_col_d_1", "col_d_1")) + .drop("should_force", "combo_id", "new_col_d_0", "new_col_d_1") + .select(*table_d.columns) +) + +table_d_forced.write.mode("overwrite").parquet(f"{PATH_PREFIX}/temp_forced_table_d") +table_d_forced = spark.read.parquet(f"{PATH_PREFIX}/temp_forced_table_d") +print(f" ✓ table_d_forced created ({MATCH_PERCENTAGE_D*100}% forced)") + +# ========================= +# Force table_e (1 column: col_e_0) +# ========================= +print("\n4. Processing table_e...") +table_e_values_list = ( + table_b + .select("col_b_8") + .distinct() + .filter(F.col("col_b_8").isNotNull()) + .collect() +) + +value_count_e = len(table_e_values_list) +print(f" Distinct values for table_e: {value_count_e}") + +values_e_with_id = [ + Row( + new_col_e_0=value['col_b_8'], + value_id=idx + 1 + ) + for idx, value in enumerate(table_e_values_list) +] + +table_e_values_df = spark.createDataFrame(values_e_with_id) + +table_e_forced = ( + table_e + .withColumn("should_force", F.rand() < MATCH_PERCENTAGE_E) + .withColumn("value_id", + F.when(F.col("should_force"), F.floor(F.rand() * value_count_e) + 1) + .otherwise(F.lit(None)) + ) + .join( + F.broadcast(table_e_values_df), + "value_id", + "left" + ) + .withColumn("col_e_0", F.coalesce("new_col_e_0", "col_e_0")) + .drop("should_force", "value_id", "new_col_e_0") + .select(*table_e.columns) +) + +table_e_forced.write.mode("overwrite").parquet(f"{PATH_PREFIX}/temp_forced_table_e") +table_e_forced = spark.read.parquet(f"{PATH_PREFIX}/temp_forced_table_e") +print(f" ✓ table_e_forced created ({MATCH_PERCENTAGE_E*100}% forced)") + +table_b.write.mode("overwrite").parquet(f"{PATH_PREFIX}/temp_table_b") +table_b = spark.read.parquet(f"{PATH_PREFIX}/temp_table_b") diff --git a/Data-Gen/new_datagen_faster.py b/Data-Gen/new_datagen_faster.py new file mode 100644 index 0000000..c00508d --- /dev/null +++ b/Data-Gen/new_datagen_faster.py @@ -0,0 +1,265 @@ +import random +import subprocess + +import networkx as nx +import numpy as np +import pandas as pd +from pyspark.sql import SparkSession, Row + +target_size = 1 * 1024**4 # 1 TB +num_nodes_per_graph = 500_000 +PATH_PREFIX = f"/scratch/prestouser/test-data/{num_nodes_per_graph}-1TB" +directory_path = PATH_PREFIX + +spark = ( + SparkSession.builder + .master("local[*]") + .appName("LargeGraph") + .config("spark.driver.host", "127.0.0.1") + .config("spark.driver.bindAddress", "127.0.0.1") + .config("spark.driver.memory", "128g") + .config("spark.executor.heartbeatInterval", "60s") + .config("spark.network.timeout", "300s") + .config("spark.memory.fraction", "0.8") + .config("spark.sql.shuffle.partitions", "800") + .config("spark.local.dir", "/scratch/prestouser/spark-tmp") + .getOrCreate() +) +def create_synthetic_distribution(params, plot=True): + + slope = params.get('slope', -2) + min_degree = params.get('min_degree', 1) + max_degree = params.get('max_degree', 200_000) + max_prob = params.get('max_prob', 0.5) + + degrees = np.arange(min_degree, max_degree + 1, dtype=float) + + A = max_prob / (min_degree ** slope) + + y_values = A * degrees ** slope + + degrees_int = degrees.astype(int) + + decay_dict = dict(zip(degrees_int, y_values)) + + return decay_dict + +params = { + 'slope': -2, + 'intercpet': 0.8, + 'r_squared': 0.98, + 'max_degree': 200_000, + 'min_degree': 1, + 'max_prob': 0.5, + 'degree_range': list(np.arange(1, 200_000)) +} + +target_distribution = create_synthetic_distribution(params, 200_000) + +def get_disk_usage(path): + import platform + try: + if platform.system() == "Darwin": + result = subprocess.run( + ['du', '-sk', path], + check=True, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + text=True + ) + size_in_bytes = int(result.stdout.split()[0]) * 1024 + else: + result = subprocess.run( + ['du', '-sb', path], + check=True, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + text=True + ) + size_in_bytes = int(result.stdout.split()[0]) + return size_in_bytes + except subprocess.CalledProcessError as e: + raise RuntimeError(f"Failed to get disk usage: {e.stderr.strip()}") + +def random_node(): + return int(np.random.randint(1_000_000, 10_000_000_000)) + +def random_feature(): + return int(np.random.randint(1, 70000)) # cast to native int + +def random_col_e(): + return str(np.random.choice(['col_e_A', 'col_e_B'])) # cast to native str + +num_graphs = spark.sparkContext.defaultParallelism # number of cores available + +def configuration_model_with_distribution(n, degree_distribution,seed): + """ + Generate a graph with a specific degree distribution + """ + degrees = [] + remaining_nodes = n + + for degree, prob in sorted(degree_distribution.items()): + if remaining_nodes <= 0: + break + count = min(int(n * prob + 0.5), remaining_nodes) + if count > 0: + degrees.extend([int(degree)] * count) + remaining_nodes -= count + + if remaining_nodes > 0: + min_degree = min(degree_distribution.keys()) + degrees.extend([min_degree] * remaining_nodes) + + if len(degrees) < 2: + degrees = [1, 1] + + if sum(degrees) % 2 != 0: + degrees[0] += 1 + + try: + g = nx.configuration_model(degrees, seed=seed) + g = nx.Graph(g) + + if g.number_of_edges() == 0: + raise nx.NetworkXError("Generated graph has no edges") + + return g + except Exception as e: + print(f"Error generating graph: {e}") + return nx.barabasi_albert_graph(n, 2) + +def generate_graph_partition(pdf_iterator): + """ + Generator function that yeilds edges for each partition. + pdf_iterator yields pandas DataFrames (one per partition) + """ + # Get broadcasted values + degree_dist = target_distribution_bc.value + seed_val = seed_bc.value + + for pdf in pdf_iterator: + all_edges = [] + + for partition_id in pdf['id'].values: + partition_id = int(partition_id) + + g = configuration_model_with_distribution( + num_nodes_per_graph, + degree_dist, + seed_val + partition_id + ) + + node_map = {node: random_node() for node in g.nodes()} + + for edge in g.edges(): + all_edges.append({ + 'col_a': int(node_map[edge[0]]), + 'col_b': int(node_map[edge[1]]), + 'col_c': int(random_feature()), + 'col_d': int(random_feature()), + 'col_e': random_col_e() + }) + + if all_edges: + yield pd.DataFrame(all_edges) + +target_distribution_bc = spark.sparkContext.broadcast(target_distribution) + +seed = 1000 +seed_bc = spark.sparkContext.broadcast(seed) + +edge_df = ( + spark.range(num_graphs) + .repartition(num_graphs) + .mapInPandas( + generate_graph_partition, + schema="col_a long, col_b long, col_c int, col_d int, col_e string" + ) + .distinct() +) + +edge_df.write.mode("overwrite").parquet(directory_path) + +print(f"Initial write complete. Size: {get_disk_usage(directory_path) / 1024**3:.2f} GB") + + +# ======================================================================================= +# ================================ REVAMPED DATA SCALER ================================= +# ======================================================================================= + +from pyspark.sql.functions import col, floor, rand, lit, when, hash as spark_hash +import math + +initial_dir_size = get_disk_usage(directory_path) + +print(f"Initial dataset size: {round(initial_dir_size / 1024**3, 2)} GB") + +copies_needed = math.ceil(target_size / initial_dir_size) + +print(f"Target size: {round(target_size / 1024**3, 2)} GB") +print(f"Copies needed (including original): {copies_needed}") + +long_cols = ["col_a", "col_b"] +integer_cols = ["col_c", "col_d"] +string_cols = ["col_e"] + +print(f"Reading data from: {directory_path}") +df_original = spark.read.parquet(directory_path) + +if 'source' in df_original.columns: + df_original = df_original.drop('source') + +# Create a "copy_id" Dataframe +df_copies = spark.range(copies_needed).toDF("copy_id") + +# Cross join to create all copies at once +df_expanded = df_original.crossJoin(df_copies) + +print(f"Creating {copies_needed} versions in parallel...") + +# Define noise range +NOISE_MIN = -1 +NOISE_MAX = 1 + +# Create a seed column based on copy_id and row hash +# This gives us different randomness for each day +df_with_seed = df_expanded.withColumn( + "rand_seed", + (spark_hash(col("col_a"), col("col_b"), col("copy_id")) % 1000000).cast("integer") +) + +# Apply noise based on copy_id (copy_id=0 is orignal, no noise) +# Use rand() with a base seed, then add variation based on the row's values +df_augmented = df_with_seed.select( + *[ + when(col("copy_id") == 0, col(c)).otherwise( + (col(c) + floor(rand(42) * (NOISE_MAX - NOISE_MIN + 1)) + NOISE_MIN) + ).cast("long").alias(c) + for c in long_cols + ], + *[ + when(col("copy_id") == 0, col(c)).otherwise( + (col(c) + floor(rand(1042) * (NOISE_MAX - NOISE_MIN + 1)) + NOISE_MIN) + ).cast("integer").alias(c) + for c in integer_cols + ], + *[col(c) for c in string_cols] +) + +# Calculate partitions based on target file size +target_file_size_mb = 250 +estimated_size = initial_dir_size * copies_needed +repartitions = max(1, int(estimated_size / (target_file_size_mb * 1024**2))) + +print(f"Writing combined dataset with {repartitions} partitions...") +print(f"Estimated final size: {round(estimated_size / 1024**3, 2)} GB") + +# Write everytihing in one shot +df_augmented.repartition(repartitions).write.mode("overwrite").parquet(directory_path) + +# Verify final size +final_size = get_disk_usage(directory_path) +print(f"\nFinal size: {round(final_size / 1024**3, 2)} GB") +print(f"Target was: {round(target_size / 1024**3, 2)} GB") +print(f"Achieved: {round(100 * final_size / target_size, 1)}% of target") diff --git a/Data-Gen/notes.md b/Data-Gen/notes.md new file mode 100644 index 0000000..5993da7 --- /dev/null +++ b/Data-Gen/notes.md @@ -0,0 +1,122 @@ +1. new_datagen_faster.py +2. create_join_tables.py + +new_datagen_faster.py runs first. It generates the base table_a data by: + +Creating synthetic graph partitions using a power-law degree distribution (via networkx configuration models) +Writing the initial edge data as parquet to PATH_PREFIX +Scaling it up to the target size (1 GB) by cross-joining copies with noise +Then create_join_tables.py runs second. It: + +Reads the table_a parquet that new_datagen_faster.py produced (line 37: table_a = spark.read.parquet(PATH_PREFIX)) +Generates the other tables (table_b, table_c, table_d, table_e) from schema metadata in an Excel file +Forces join-key alignment by overwriting a percentage of rows in each table with values from table_b + + +## 50 GB +target_sze = 50 * 1024**3 # 1 GB +num_nodes_per_graph is still 500_000 + +### Spark CPU + +workflow_join-50gb.py changes the config. But I doubt this is optimal. Running on `2xGrace` (144 cores) +.config("spark.driver.memory", "128g") +.config("spark.memory.fraction", "0.8") +.config("spark.sql.shuffle.partitions", "800") + +``` +Workflow join completed in 1235.04 seconds + Output: /scratch/prestouser/test-data/500000-50GB/workflow_join_spark_output + Result rows: 2662631628 +``` + +### cuDF-Polars +workflow_join_polars-50gb.py Running on full NVL4: 2xGrace 4xB200 + +workflow_join_polars-50gb.py + +``` +Workflow join completed in 43.47 seconds + Output: /scratch/prestouser/test-data/500000-50GB/workflow_join_polars_output + Result rows: 2662631628 + +RapidsMPF statistics: +Statistics: + - alloc-device: 353.05 GiB | 1.63 s | 216.76 GiB/s | avg-stream-delay 2.02 ms + - alloc-host: 1.98 KiB | 270.61 us | 7.14 MiB/s | avg-stream-delay 97.29 us + - copy-device-to-pinned_host: 322.07 GiB | 2.94 s | 109.71 GiB/s | avg-stream-delay 996.55 us + - copy-pinned_host-to-device: 322.07 GiB | 1.71 s | 188.09 GiB/s | avg-stream-delay 26.02 us +``` + + +## 1GB + +target_size = 1 * 1024**3 # 1 GB +num_nodes_per_graph = 500_000 +PATH_PREFIX = f"/scratch/prestouser/test-data/{num_nodes_per_graph}-1GB" + +Reading data from: /scratch/prestouser/test-data/500000-1GB +Creating 1 versions in parallel... +Writing combined dataset with 10 partitions... +Estimated final size: 2.68 GB + Final size: 2.76 GB +Target was: 1.0 GB +Achieved: 276.0% of target + + +1. Processing table_a... + Distinct combinations for table_a: 14000 + ✓ table_a_forced created (0.1% forced) + +2. Processing table_c... + Distinct combinations for table_c: 14000 + ✓ table_c_forced created (1.0% forced) + +3. Processing table_d... + Distinct combinations for table_d: 14000 + ✓ table_d_forced created (1.0% forced) + +4. Processing table_e... + Distinct values for table_e: 14000 + ✓ table_e_forced created (1.0% forced) + + + +## 10GB +target_size = 10 * 1024**3 # 1 GB +num_nodes_per_graph = 500_000 +PATH_PREFIX = f"/scratch/prestouser/test-data/{num_nodes_per_graph}-1GB" + +Reading data from: /scratch/prestouser/test-data/500000-1GB +Creating 4 versions in parallel... +Writing combined dataset with 43 partitions... +Estimated final size: 10.7 GB + +Final size: 11.04 GB +Target was: 10.0 GB +Achieved: 110.4% of target + +Created DataFrame for table 'table_b' with 14000 random rows. +Created DataFrame for table 'table_c' with 21000000 random rows. +Created DataFrame for table 'table_d' with 12000000 random rows. +Created DataFrame for table 'table_e' with 33000000 random rows. +============================================================ +FORCING TABLES TO MATCH TABLE_B VALUES +Match percentages: A=0.1%, C=1.0%, D=1.0%, E=1.0% +============================================================ + +1. Processing table_a... + Distinct combinations for table_a: 14000 + ✓ table_a_forced created (0.1% forced) + +2. Processing table_c... + Distinct combinations for table_c: 14000 + ✓ table_c_forced created (1.0% forced) + +3. Processing table_d... + Distinct combinations for table_d: 14000 + ✓ table_d_forced created (1.0% forced) + +4. Processing table_e... + Distinct values for table_e: 14000 + ✓ table_e_forced created (1.0% forced) diff --git a/Data-Gen/workflow_join-50gb.py b/Data-Gen/workflow_join-50gb.py new file mode 100644 index 0000000..cccf238 --- /dev/null +++ b/Data-Gen/workflow_join-50gb.py @@ -0,0 +1,92 @@ +#!/usr/bin/env python3 +""" +Standalone workflow join test. + +Reads all tables from parquet (produced by new_datagen_faster.py + create_join_tables.py) +and performs the full chained 4-table left join workflow. +""" +import time +from pathlib import Path + +from pyspark.sql import SparkSession + +spark = ( + SparkSession.builder + .master("local[*]") + .appName("WorkflowJoin") + .config("spark.driver.host", "127.0.0.1") + .config("spark.driver.bindAddress", "127.0.0.1") + .config("spark.driver.memory", "128g") + .config("spark.executor.heartbeatInterval", "60s") + .config("spark.network.timeout", "300s") + .config("spark.memory.fraction", "0.8") + .config("spark.sql.shuffle.partitions", "800") + .getOrCreate() +) + +PATH_PREFIX = Path(__file__).resolve().parent / "test-data" / "500000-1GB" +PATH_PREFIX = "/scratch/prestouser/test-data/500000-50GB" + +# ─── Read all tables from parquet ──────────────────────────────────────────── +print("Reading tables from parquet...") +table_a = spark.read.parquet(f"{PATH_PREFIX}/temp_forced_table_a") +table_b = spark.read.parquet(f"{PATH_PREFIX}/temp_table_b") +table_c = spark.read.parquet(f"{PATH_PREFIX}/temp_forced_table_c") +table_d = spark.read.parquet(f"{PATH_PREFIX}/temp_forced_table_d") +table_e = spark.read.parquet(f"{PATH_PREFIX}/temp_forced_table_e") + +print(f" table_a: {table_a.count()} rows") +print(f" table_b: {table_b.count()} rows") +print(f" table_c: {table_c.count()} rows") +print(f" table_d: {table_d.count()} rows") +print(f" table_e: {table_e.count()} rows") + +# ─── Workflow Join ─────────────────────────────────────────────────────────── +print("\nRunning workflow join: table_a → table_b → table_c → table_d → table_e ...") +start = time.time() + +workflow_result = ( + table_a + .join( + table_b, + [ + table_a["col_a"] == table_b["col_b_8"], + table_a["col_b"] == table_b["col_b_3"], + table_a["col_c"] == table_b["col_b_9"], + table_a["col_d"] == table_b["col_b_1"], + ], + how="left", + ) + .join( + table_c, + [ + table_a["col_a"] == table_c["col_c_10"], + table_a["col_b"] == table_c["col_c_9"], + table_a["col_e"] == table_c["col_c_11"].cast("string"), + ], + how="left", + ) + .join( + table_d, + [ + table_a["col_a"] == table_d["col_d_0"], + table_a["col_c"] == table_d["col_d_1"], + ], + how="left", + ) + .join( + table_e, + table_a["col_a"] == table_e["col_e_0"], + how="left", + ) +) + +OUTPUT = f"{PATH_PREFIX}/workflow_join_spark_output" +workflow_result.write.mode("overwrite").parquet(OUTPUT) + +elapsed = time.time() - start +print(f"\nWorkflow join completed in {elapsed:.2f} seconds") +print(f" Output: {OUTPUT}") +print(f" Result rows: {spark.read.parquet(OUTPUT).count()}") + +spark.stop() diff --git a/Data-Gen/workflow_join.py b/Data-Gen/workflow_join.py new file mode 100644 index 0000000..903b8c4 --- /dev/null +++ b/Data-Gen/workflow_join.py @@ -0,0 +1,86 @@ +#!/usr/bin/env python3 +""" +Standalone workflow join test. + +Reads all tables from parquet (produced by new_datagen_faster.py + create_join_tables.py) +and performs the full chained 4-table left join workflow. +""" +import time +from pathlib import Path + +from pyspark.sql import SparkSession + +spark = ( + SparkSession.builder + .master("local[*]") + .appName("WorkflowJoin") + .config("spark.driver.host", "127.0.0.1") + .config("spark.driver.bindAddress", "127.0.0.1") + .config("spark.driver.memory", "32g") + .config("spark.executor.heartbeatInterval", "60s") + .config("spark.network.timeout", "300s") + .getOrCreate() +) + +PATH_PREFIX = Path(__file__).resolve().parent / "test-data" / "500000-1GB" + +# ─── Read all tables from parquet ──────────────────────────────────────────── +print("Reading tables from parquet...") +table_a = spark.read.parquet(f"{PATH_PREFIX}/temp_forced_table_a") +table_b = spark.read.parquet(f"{PATH_PREFIX}/temp_table_b") +table_c = spark.read.parquet(f"{PATH_PREFIX}/temp_forced_table_c") +table_d = spark.read.parquet(f"{PATH_PREFIX}/temp_forced_table_d") +table_e = spark.read.parquet(f"{PATH_PREFIX}/temp_forced_table_e") + +print(f" table_a: {table_a.count()} rows") +print(f" table_b: {table_b.count()} rows") +print(f" table_c: {table_c.count()} rows") +print(f" table_d: {table_d.count()} rows") +print(f" table_e: {table_e.count()} rows") + +# ─── Workflow Join ─────────────────────────────────────────────────────────── +print("\nRunning workflow join: table_a → table_b → table_c → table_d → table_e ...") +start = time.time() + +workflow_result = ( + table_a + .join( + table_b, + [ + table_a["col_a"] == table_b["col_b_8"], + table_a["col_b"] == table_b["col_b_3"], + table_a["col_c"] == table_b["col_b_9"], + table_a["col_d"] == table_b["col_b_1"], + ], + how="left", + ) + .join( + table_c, + [ + table_a["col_a"] == table_c["col_c_10"], + table_a["col_b"] == table_c["col_c_9"], + table_a["col_e"] == table_c["col_c_11"].cast("string"), + ], + how="left", + ) + .join( + table_d, + [ + table_a["col_a"] == table_d["col_d_0"], + table_a["col_c"] == table_d["col_d_1"], + ], + how="left", + ) + .join( + table_e, + table_a["col_a"] == table_e["col_e_0"], + how="left", + ) +) + +workflow_result.write.format("noop").mode("overwrite").save() + +elapsed = time.time() - start +print(f"\nWorkflow join completed in {elapsed:.2f} seconds") + +spark.stop() diff --git a/Data-Gen/workflow_join_polars-1tb-ray.py b/Data-Gen/workflow_join_polars-1tb-ray.py new file mode 100755 index 0000000..d3e7800 --- /dev/null +++ b/Data-Gen/workflow_join_polars-1tb-ray.py @@ -0,0 +1,144 @@ +#!/usr/bin/env python3 +import argparse +import os +import shutil +import time +from pathlib import Path + +import polars as pl +from cudf_polars.engine.options import StreamingOptions +from cudf_polars.engine.ray import RayEngine +from cudf_polars.utils.config import MemoryResourceConfig + +PATH_PREFIX = "/scratch/prestouser/test-data/500000-1TB" +OUTPUT = Path(PATH_PREFIX) / "workflow_join_polars_ray_output" + + +def parse_args() -> argparse.Namespace: + parser = argparse.ArgumentParser( + description="Run workflow join with cudf-polars RayEngine." + ) + parser.add_argument("--path-prefix", default=os.environ.get("DATASET_PATH", PATH_PREFIX)) + parser.add_argument("--output", default=os.environ.get("OUTPUT_PATH", str(OUTPUT))) + parser.add_argument("--ray-address", default=os.environ.get("RAY_ADDRESS")) + parser.add_argument("--num-py-executors", type=int, default=8) + parser.add_argument("--target-partition-size", type=int, default=3_221_225_472) + parser.add_argument("--spill-device-limit", default="70%") + parser.add_argument("--pinned-initial-pool-size", type=int, default=51_539_607_552) + parser.add_argument("--rmm-release-threshold", type=int, default=160_000_000_000) + return parser.parse_args() + + +def prepare_output_path(path: Path) -> None: + if path.is_dir(): + shutil.rmtree(path) + elif path.exists(): + path.unlink() + + +def scan_tables(path_prefix: str) -> dict[str, pl.LazyFrame]: + return { + "table_a": pl.scan_parquet(f"{path_prefix}/temp_forced_table_a/*.parquet"), + "table_b": pl.scan_parquet(f"{path_prefix}/temp_table_b/*.parquet"), + "table_c": pl.scan_parquet(f"{path_prefix}/temp_forced_table_c/*.parquet"), + "table_d": pl.scan_parquet(f"{path_prefix}/temp_forced_table_d/*.parquet"), + "table_e": pl.scan_parquet(f"{path_prefix}/temp_forced_table_e/*.parquet"), + } + + +def print_row_counts(tables: dict[str, pl.LazyFrame]) -> None: + for name, table in tables.items(): + row_count = table.select(pl.len()).collect().item() + print(f" {name}: {row_count} rows") + + +def build_workflow_join_query(tables: dict[str, pl.LazyFrame]) -> pl.LazyFrame: + table_a = tables["table_a"] + table_b = tables["table_b"] + table_c = tables["table_c"].with_columns(pl.col("col_c_11").cast(pl.Utf8)) + table_d = tables["table_d"] + table_e = tables["table_e"] + + return ( + table_a + .join( + table_b, + left_on=["col_a", "col_b", "col_c", "col_d"], + right_on=["col_b_8", "col_b_3", "col_b_9", "col_b_1"], + how="left", + ) + .join( + table_c, + left_on=["col_a", "col_b", "col_e"], + right_on=["col_c_10", "col_c_9", "col_c_11"], + how="left", + ) + .join( + table_d, + left_on=["col_a", "col_c"], + right_on=["col_d_0", "col_d_1"], + how="left", + ) + .join( + table_e, + left_on=["col_a"], + right_on=["col_e_0"], + how="left", + ) + ) + + +def main() -> None: + args = parse_args() + path_prefix = str(args.path_prefix) + output = Path(args.output) + + print("Reading tables from parquet...") + print(f" path_prefix: {path_prefix}") + tables = scan_tables(path_prefix) + print_row_counts(tables) + + print("\nRunning workflow join: table_a -> table_b -> table_c -> table_d -> table_e ...") + workflow_query = build_workflow_join_query(tables) + + memory_resource_config = MemoryResourceConfig( + qualname="rmm.mr.CudaAsyncMemoryResource", + options={"release_threshold": args.rmm_release_threshold}, + ) + + streaming_options = StreamingOptions( + spill_device_limit=args.spill_device_limit, + pinned_memory=True, + pinned_initial_pool_size=args.pinned_initial_pool_size, + statistics=True, + num_py_executors=args.num_py_executors, + fallback_mode="silent", + target_partition_size=args.target_partition_size, + memory_resource_config=memory_resource_config, + ) + + ray_init_options = {} + if args.ray_address: + ray_init_options["address"] = args.ray_address + print(f"Connecting to Ray cluster at {args.ray_address}") + + with RayEngine.from_options( + streaming_options, ray_init_options=ray_init_options + ) as engine: + print(f"RayEngine ranks: {engine.nranks}") + prepare_output_path(output) + start = time.time() + workflow_query.sink_parquet(output, engine=engine, mkdir=True) + elapsed = time.time() - start + statistics = engine.global_statistics() + + print(f"\nWorkflow join completed in {elapsed:.2f} seconds") + print(f" Output: {output}") + row_count = pl.scan_parquet(f"{output}/*.parquet").select(pl.len()).collect().item() + print(f" Result rows: {row_count}") + print("\nRapidsMPF statistics:") + print(statistics.report()) + + +if __name__ == "__main__": + main() diff --git a/Data-Gen/workflow_join_polars-1tb-ray.sh b/Data-Gen/workflow_join_polars-1tb-ray.sh new file mode 100755 index 0000000..e61275a --- /dev/null +++ b/Data-Gen/workflow_join_polars-1tb-ray.sh @@ -0,0 +1,130 @@ +#!/bin/bash +# Multi-node Ray launcher for workflow_join_polars-1tb-ray.py. +set -x +set -e + +export UCX_TLS=^ib,ud:aux +export UCX_NET_DEVICES=bond0 +export UCX_MAX_RNDV_RAILS=1 +export UCX_RNDV_PIPELINE_ERROR_HANDLING=y +export UCX_TCP_CM_REUSEADDR=y +export UCX_RNDV_MTYPE_WORKER_MAX_MEM=1G +export UCX_RNDV_MTYPE_WORKER_FC_ENABLE=y +export UCX_RNDV_FRAG_MEM_TYPES=cuda + +export PYTHONUNBUFFERED=1 +export POLARS_MAX_THREADS=1 +export RAY_ACCEL_ENV_VAR_OVERRIDE_ON_ZERO=0 +export CUDF_POLARS_LOG_TRACES=1 +export KVIKIO_TASK_SIZE=$((16 * 1024 * 1024)) +export CUFILE_LOGGING_LEVEL=INFO +export KVIKIO_COMPAT_MODE=OFF + +source /opt/conda/etc/profile.d/conda.sh +conda activate rapidsmpf + +PROJECT_DIR=${REAL_HOME:-$HOME}/bzaitlen/test-data/Data-Gen +SCHEDULER_DIR=${PROJECT_DIR}/scheduler +RESULTS_DIR=${PROJECT_DIR}/results +RAY_ADDR_FILE=${SCHEDULER_DIR}/ray-address-job${SLURM_JOB_ID}.txt +DONE_FLAG=${SCHEDULER_DIR}/ray-done-job${SLURM_JOB_ID}.flag +mkdir -p "$SCHEDULER_DIR" "$RESULTS_DIR" + +CURRENT_NODE=$(hostname -s) +echo "Node: $CURRENT_NODE Head: $HEAD_NODE NodeID: $SLURM_NODEID Procid: $SLURM_PROCID" + +NODE_GPUS=${SLURM_GPUS_ON_NODE:-4} +EXPECTED_GPUS=$((NODE_GPUS * SLURM_NNODES)) +echo "[$CURRENT_NODE] NODE_GPUS=$NODE_GPUS EXPECTED_GPUS=$EXPECTED_GPUS" +echo "[$CURRENT_NODE] CUDA_VISIBLE_DEVICES=${CUDA_VISIBLE_DEVICES:-unset}" + +HEAD_PORT=${RAY_HEAD_PORT:-6379} +RAY_TMP_DIR=/tmp/ray-${SLURM_JOB_ID} + +ray stop --force 2>/dev/null || true +killall -q raylet gcs_server 2>/dev/null || true +pkill -f "ray::" 2>/dev/null || true +rm -rf "$RAY_TMP_DIR" 2>/dev/null || true +sleep 2 + +if [[ "$CURRENT_NODE" == "$HEAD_NODE" ]]; then + rm -f "$RAY_ADDR_FILE" "$DONE_FLAG" + HEAD_IP=$(hostname -I | awk '{print $1}') + echo "[$CURRENT_NODE] Starting Ray head at ${HEAD_IP}:${HEAD_PORT}" + + ray start --head \ + --node-ip-address="$HEAD_IP" \ + --port="$HEAD_PORT" \ + --num-gpus="$NODE_GPUS" \ + --temp-dir="$RAY_TMP_DIR" \ + --disable-usage-stats + + echo "${HEAD_IP}:${HEAD_PORT}" > "$RAY_ADDR_FILE" + + echo "[$CURRENT_NODE] Waiting for $EXPECTED_GPUS GPUs to join cluster..." + python - <= ${EXPECTED_GPUS}: + break + time.sleep(2) +else: + print("Timed out waiting for GPUs", file=sys.stderr) + sys.exit(1) +ray.shutdown() +PY + + DATETIME=$(date +%Y-%m-%d_%H-%M-%S) + output_log="$RESULTS_DIR/workflow-join-polars-1tb-ray-${EXPECTED_GPUS}gpus-job-${SLURM_JOB_ID}-$DATETIME.txt" + dataset_path=${DATASET_PATH:-/scratch/prestouser/test-data/500000-1TB} + output_path=${OUTPUT_PATH:-${dataset_path}/workflow_join_polars_ray_output} + + cp "$PROJECT_DIR/workflow_join_polars-1tb-ray.py" "$RESULTS_DIR/workflow_join_polars-1tb-ray-driver-job${SLURM_JOB_ID}-$DATETIME.py" + cp "$PROJECT_DIR/workflow_join_polars-1tb-ray.sh" "$RESULTS_DIR/workflow_join_polars-1tb-ray-job${SLURM_JOB_ID}-$DATETIME.sh" + cp "$PROJECT_DIR/workflow_join_polars-1tb-ray.slurm" "$RESULTS_DIR/workflow_join_polars-1tb-ray-job${SLURM_JOB_ID}-$DATETIME.slurm" + + set +e + python "$PROJECT_DIR/workflow_join_polars-1tb-ray.py" \ + --path-prefix "$dataset_path" \ + --output "$output_path" \ + --ray-address "${HEAD_IP}:${HEAD_PORT}" \ + --num-py-executors "${NUM_PY_EXECUTORS:-8}" \ + --target-partition-size "${TARGET_PARTITION_SIZE:-3221225472}" \ + --spill-device-limit "${SPILL_DEVICE_LIMIT:-70%}" \ + 2>&1 | tee -a "$output_log" + DRIVER_RC=${PIPESTATUS[0]} + set -e + + touch "$DONE_FLAG" + ray stop --force || true + rm -f "$RAY_ADDR_FILE" + exit "$DRIVER_RC" +else + echo -n "[$CURRENT_NODE] Worker waiting for Ray address" + set +x + while [ ! -f "$RAY_ADDR_FILE" ]; do + echo -n "." + sleep 2 + done + echo " found" + set -x + + HEAD_ADDR=$(cat "$RAY_ADDR_FILE") + echo "[$CURRENT_NODE] Joining Ray at ${HEAD_ADDR} with $NODE_GPUS GPUs" + ray start \ + --address="$HEAD_ADDR" \ + --num-gpus="$NODE_GPUS" \ + --temp-dir="$RAY_TMP_DIR" \ + --disable-usage-stats + + while [ ! -f "$DONE_FLAG" ]; do + sleep 5 + done + ray stop --force || true +fi + +echo "[$CURRENT_NODE] Exiting." diff --git a/Data-Gen/workflow_join_polars-1tb-ray.slurm b/Data-Gen/workflow_join_polars-1tb-ray.slurm new file mode 100755 index 0000000..ef8363b --- /dev/null +++ b/Data-Gen/workflow_join_polars-1tb-ray.slurm @@ -0,0 +1,53 @@ +#!/bin/bash +#SBATCH --job-name=bz-workflow-join-polars-1tb-ray +#SBATCH --output=logs/%x_%j.out +#SBATCH --error=logs/%x_%j.err +#SBATCH --time=00:40:00 +#SBATCH --nodes=6 +#SBATCH --ntasks-per-node=1 +#SBATCH --cpus-per-task=144 +#SBATCH --mem=0 +#SBATCH --gres=gpu:4 +#SBATCH --exclusive +#SBATCH --nodelist=presto-gb200-gcn-[01-10] + +export DATASET_PATH=${DATASET_PATH:-/scratch/prestouser/test-data/500000-1TB} +export OUTPUT_PATH=${OUTPUT_PATH:-${DATASET_PATH}/workflow_join_polars_ray_output} + +export LIBCUDF_HW_DECOMPRESSION=${LIBCUDF_HW_DECOMPRESSION:-OFF} +export NUM_PY_EXECUTORS=${NUM_PY_EXECUTORS:-8} +export TARGET_PARTITION_SIZE=${TARGET_PARTITION_SIZE:-3221225472} +export SPILL_DEVICE_LIMIT=${SPILL_DEVICE_LIMIT:-70%} + +export NVIDIA_VISIBLE_DEVICES=all +export NVIDIA_DRIVER_CAPABILITIES=compute,utility +export REAL_HOME=$HOME + +LOCAL_CONTAINER_PATH=${LOCAL_CONTAINER_PATH:-/scratch/prestouser/images/mg-polars-tpch-nvl72-2026-05-23-arm64-cuda-13-1-ucxthrottling-nvcr.sqsh} +CONTAINER_MOUNTS="${HOME}:${HOME},/scratch:/scratch" + +export HEAD_NODE=$(scontrol show hostnames $SLURM_JOB_NODELIST | head -n 1) + +mkdir -p logs ${HOME}/bzaitlen/test-data/Data-Gen/scheduler + +echo "========================================" +echo "Job ID: $SLURM_JOB_ID" +echo "Nodes: $SLURM_JOB_NUM_NODES" +echo "Node list: $SLURM_JOB_NODELIST" +echo "Head node: $HEAD_NODE" +echo "Dataset: $DATASET_PATH" +echo "Output: $OUTPUT_PATH" +echo "HW decompression: $LIBCUDF_HW_DECOMPRESSION" +echo "Frontend: ray" +echo "Allocated GPUs: ${SLURM_GPUS_ON_NODE:-4} per node" +echo "Target partition: $TARGET_PARTITION_SIZE" +echo "========================================" + +srun \ + --export=ALL \ + --container-image=${LOCAL_CONTAINER_PATH} \ + --container-mounts=${CONTAINER_MOUNTS} \ + --container-remap-root \ + --container-writable \ + --pty \ + bash ${HOME}/bzaitlen/test-data/Data-Gen/workflow_join_polars-1tb-ray.sh diff --git a/Data-Gen/workflow_join_polars-50gb.py b/Data-Gen/workflow_join_polars-50gb.py new file mode 100644 index 0000000..046b5bc --- /dev/null +++ b/Data-Gen/workflow_join_polars-50gb.py @@ -0,0 +1,120 @@ +#!/usr/bin/env python3 +""" +Standalone workflow join test using Polars. + +Reads all tables from parquet (produced by new_datagen_faster.py + create_join_tables.py) +and performs the full chained 4-table left join workflow. +""" +import time +import shutil +from pathlib import Path + +import polars as pl +from cudf_polars.engine.options import StreamingOptions +from cudf_polars.engine.ray import RayEngine +from cudf_polars.utils.config import MemoryResourceConfig + +PATH_PREFIX = Path(__file__).resolve().parent / "test-data" / "500000-1GB" +PATH_PREFIX = "/scratch/prestouser/test-data/500000-50GB" +OUTPUT = Path(PATH_PREFIX) / "workflow_join_polars_output" + + +def prepare_output_path(path: Path) -> None: + if path.is_dir(): + shutil.rmtree(path) + elif path.exists(): + path.unlink() + +def scan_tables(path_prefix: str) -> dict[str, pl.LazyFrame]: + return { + "table_a": pl.scan_parquet(f"{path_prefix}/temp_forced_table_a/*.parquet"), + "table_b": pl.scan_parquet(f"{path_prefix}/temp_table_b/*.parquet"), + "table_c": pl.scan_parquet(f"{path_prefix}/temp_forced_table_c/*.parquet"), + "table_d": pl.scan_parquet(f"{path_prefix}/temp_forced_table_d/*.parquet"), + "table_e": pl.scan_parquet(f"{path_prefix}/temp_forced_table_e/*.parquet"), + } + + +def print_row_counts(tables: dict[str, pl.LazyFrame]) -> None: + for name, table in tables.items(): + row_count = table.select(pl.len()).collect().item() + print(f" {name}: {row_count} rows") + + +def build_workflow_join_query(tables: dict[str, pl.LazyFrame]) -> pl.LazyFrame: + table_a = tables["table_a"] + table_b = tables["table_b"] + table_c = tables["table_c"].with_columns(pl.col("col_c_11").cast(pl.Utf8)) + table_d = tables["table_d"] + table_e = tables["table_e"] + + return ( + table_a + .join( + table_b, + left_on=["col_a", "col_b", "col_c", "col_d"], + right_on=["col_b_8", "col_b_3", "col_b_9", "col_b_1"], + how="left", + ) + .join( + table_c, + left_on=["col_a", "col_b", "col_e"], + right_on=["col_c_10", "col_c_9", "col_c_11"], + how="left", + ) + .join( + table_d, + left_on=["col_a", "col_c"], + right_on=["col_d_0", "col_d_1"], + how="left", + ) + .join( + table_e, + left_on=["col_a"], + right_on=["col_e_0"], + how="left", + ) + ) + + +print("Reading tables from parquet...") +tables = scan_tables(PATH_PREFIX) +print_row_counts(tables) + +print("\nRunning workflow join: table_a → table_b → table_c → table_d → table_e ...") +workflow_query = build_workflow_join_query(tables) + +RMM_MEMORY_RESOURCE_CONFIG = MemoryResourceConfig( + qualname="rmm.mr.CudaAsyncMemoryResource", + options={"release_threshold": 160_000_000_000}, +) + +STREAMING_OPTIONS = StreamingOptions( + # RapidsMPF options + spill_device_limit="70%", + pinned_memory=True, + pinned_initial_pool_size=51_539_607_552, + statistics=True, + # Executor options + num_py_executors=8, + fallback_mode="silent", + target_partition_size=3_221_225_472, + # Engine options + memory_resource_config=RMM_MEMORY_RESOURCE_CONFIG, +) + +engine = RayEngine.from_options(STREAMING_OPTIONS) +try: + prepare_output_path(OUTPUT) + start = time.time() + workflow_query.sink_parquet(OUTPUT, engine=engine, mkdir=True) + elapsed = time.time() - start + statistics = engine.global_statistics() +finally: + engine.shutdown() + +print(f"\nWorkflow join completed in {elapsed:.2f} seconds") +print(f" Output: {OUTPUT}") +print(f" Result rows: {pl.scan_parquet(f'{OUTPUT}/*.parquet').select(pl.len()).collect().item()}") +print("\nRapidsMPF statistics:") +print(statistics.report()) diff --git a/Data-Gen/workflow_join_polars.py b/Data-Gen/workflow_join_polars.py new file mode 100644 index 0000000..c708daa --- /dev/null +++ b/Data-Gen/workflow_join_polars.py @@ -0,0 +1,80 @@ +#!/usr/bin/env python3 +""" +Standalone workflow join test using Polars. + +Reads all tables from parquet (produced by new_datagen_faster.py + create_join_tables.py) +and performs the full chained 4-table left join workflow. +""" +import time +from pathlib import Path + +import polars as pl + +PATH_PREFIX = Path(__file__).resolve().parent / "test-data" / "500000-1GB" + + +def scan_tables(path_prefix: str) -> dict[str, pl.LazyFrame]: + return { + "table_a": pl.scan_parquet(f"{path_prefix}/temp_forced_table_a/*.parquet"), + "table_b": pl.scan_parquet(f"{path_prefix}/temp_table_b/*.parquet"), + "table_c": pl.scan_parquet(f"{path_prefix}/temp_forced_table_c/*.parquet"), + "table_d": pl.scan_parquet(f"{path_prefix}/temp_forced_table_d/*.parquet"), + "table_e": pl.scan_parquet(f"{path_prefix}/temp_forced_table_e/*.parquet"), + } + + +def print_row_counts(tables: dict[str, pl.LazyFrame]) -> None: + for name, table in tables.items(): + row_count = table.select(pl.len()).collect().item() + print(f" {name}: {row_count} rows") + + +def build_workflow_join_query(tables: dict[str, pl.LazyFrame]) -> pl.LazyFrame: + table_a = tables["table_a"] + table_b = tables["table_b"] + table_c = tables["table_c"].with_columns(pl.col("col_c_11").cast(pl.Utf8)) + table_d = tables["table_d"] + table_e = tables["table_e"] + + return ( + table_a + .join( + table_b, + left_on=["col_a", "col_b", "col_c", "col_d"], + right_on=["col_b_8", "col_b_3", "col_b_9", "col_b_1"], + how="left", + ) + .join( + table_c, + left_on=["col_a", "col_b", "col_e"], + right_on=["col_c_10", "col_c_9", "col_c_11"], + how="left", + ) + .join( + table_d, + left_on=["col_a", "col_c"], + right_on=["col_d_0", "col_d_1"], + how="left", + ) + .join( + table_e, + left_on=["col_a"], + right_on=["col_e_0"], + how="left", + ) + ) + + +print("Reading tables from parquet...") +tables = scan_tables(PATH_PREFIX) +print_row_counts(tables) + +print("\nRunning workflow join: table_a → table_b → table_c → table_d → table_e ...") +workflow_query = build_workflow_join_query(tables) + +start = time.time() +workflow_result = workflow_query.collect() +elapsed = time.time() - start + +print(f"\nWorkflow join completed in {elapsed:.2f} seconds") +print(f" Result: {len(workflow_result)} rows, {len(workflow_result.columns)} columns") diff --git a/Data-Gen/workflow_join_spark-1tb-rapids.py b/Data-Gen/workflow_join_spark-1tb-rapids.py new file mode 100755 index 0000000..c7e28cb --- /dev/null +++ b/Data-Gen/workflow_join_spark-1tb-rapids.py @@ -0,0 +1,279 @@ +#!/usr/bin/env python3 +import argparse +import os +import time +from pathlib import Path + +from pyspark.sql import DataFrame, SparkSession +from pyspark.sql.functions import col + +PATH_PREFIX = "/scratch/prestouser/test-data/500000-1TB" +OUTPUT = f"{PATH_PREFIX}/workflow_join_spark_rapids_output" +SPARK_RAPIDS_JAR = "/opt/spark/sparkRapidsPlugin/rapids-4-spark.jar" + + +def env_bool(name: str, default: bool = False) -> bool: + value = os.environ.get(name) + if value is None: + return default + return value.lower() in {"1", "true", "yes", "on"} + + +def parse_args() -> argparse.Namespace: + parser = argparse.ArgumentParser( + description="Run the 1TB workflow join with Spark or Spark RAPIDS." + ) + parser.add_argument("--path-prefix", default=os.environ.get("DATASET_PATH", PATH_PREFIX)) + parser.add_argument("--output", default=os.environ.get("OUTPUT_PATH", OUTPUT)) + parser.add_argument("--master", default=os.environ.get("SPARK_MASTER_URL")) + parser.add_argument("--plugin", choices=("rapids", "cpu"), default=os.environ.get("SPARK_PLUGIN", "rapids")) + parser.add_argument("--app-name", default=os.environ.get("SPARK_APP_NAME", "WorkflowJoinSparkRapids1TB")) + parser.add_argument("--action", choices=("parquet", "count", "noop"), default=os.environ.get("WORKFLOW_ACTION", "parquet")) + parser.add_argument("--mode", default=os.environ.get("SPARK_WRITE_MODE", "overwrite")) + parser.add_argument("--output-partitions", type=int, default=int(os.environ.get("OUTPUT_PARTITIONS", "0"))) + parser.add_argument("--count-inputs", action="store_true", default=env_bool("COUNT_INPUTS", False)) + parser.add_argument("--count-result", action="store_true", default=env_bool("COUNT_RESULT", False)) + parser.add_argument("--explain", action="store_true", default=env_bool("SPARK_EXPLAIN", False)) + parser.add_argument("--event-log-dir", default=os.environ.get("SPARK_EVENT_LOG_DIR", "file:///scratch/prestouser/spark-events")) + parser.add_argument("--driver-host", default=os.environ.get("SPARK_DRIVER_HOST")) + parser.add_argument("--executor-instances", type=int, default=int(os.environ.get("SPARK_EXECUTOR_INSTANCES", "0"))) + parser.add_argument("--executor-cores", type=int, default=int(os.environ.get("SPARK_EXECUTOR_CORES", "8"))) + parser.add_argument("--executor-memory", default=os.environ.get("SPARK_EXECUTOR_MEMORY", "96g")) + parser.add_argument("--executor-memory-overhead", default=os.environ.get("SPARK_EXECUTOR_MEMORY_OVERHEAD", "64g")) + parser.add_argument("--driver-memory", default=os.environ.get("SPARK_DRIVER_MEMORY", "32g")) + parser.add_argument("--shuffle-partitions", type=int, default=int(os.environ.get("SPARK_SQL_SHUFFLE_PARTITIONS", "2400"))) + parser.add_argument("--files-max-partition-bytes", default=os.environ.get("SPARK_SQL_FILES_MAX_PARTITION_BYTES", "160m")) + parser.add_argument("--advisory-partition-size", default=os.environ.get("SPARK_SQL_ADVISORY_PARTITION_SIZE", "160mb")) + parser.add_argument("--min-partition-size", default=os.environ.get("SPARK_SQL_MIN_PARTITION_SIZE", "32mb")) + parser.add_argument("--rapids-concurrent-gpu-tasks", type=int, default=int(os.environ.get("SPARK_RAPIDS_CONCURRENT_GPU_TASKS", "1"))) + parser.add_argument("--task-gpu-amount", default=os.environ.get("SPARK_TASK_GPU_AMOUNT", "0.125")) + parser.add_argument("--rapids-pinned-pool-size", default=os.environ.get("SPARK_RAPIDS_PINNED_POOL_SIZE", "4g")) + parser.add_argument("--rapids-host-spill-size", default=os.environ.get("SPARK_RAPIDS_HOST_SPILL_SIZE", "128G")) + parser.add_argument("--rapids-batch-size-bytes", default=os.environ.get("SPARK_RAPIDS_SQL_BATCH_SIZE_BYTES", "536870912b")) + parser.add_argument("--rapids-shuffle-reader-threads", default=os.environ.get("SPARK_RAPIDS_SHUFFLE_READER_THREADS", "8")) + parser.add_argument("--rapids-shuffle-writer-threads", default=os.environ.get("SPARK_RAPIDS_SHUFFLE_WRITER_THREADS", "8")) + parser.add_argument("--rapids-shuffle-manager", default=os.environ.get("SPARK_RAPIDS_SHUFFLE_MANAGER", "com.nvidia.spark.rapids.spark358.RapidsShuffleManager")) + parser.add_argument("--rapids-explain", default=os.environ.get("SPARK_RAPIDS_SQL_EXPLAIN", "NONE")) + return parser.parse_args() + + +def config_if(builder: SparkSession.Builder, key: str, value: object | None) -> SparkSession.Builder: + if value not in (None, "", 0): + builder = builder.config(key, value) + return builder + + +def create_session(args: argparse.Namespace) -> SparkSession: + builder = SparkSession.builder.appName(args.app_name) + if args.master: + builder = builder.master(args.master) + + common_configs = { + "spark.eventLog.enabled": "true", + "spark.eventLog.dir": args.event_log_dir, + "spark.ui.prometheus.enabled": "true", + "spark.metrics.namespace": args.app_name, + "spark.submit.deployMode": "client", + "spark.driver.bindAddress": "0.0.0.0", + "spark.executor.extraJavaOptions": "-Djava.net.preferIPv4Stack=true", + "spark.driver.extraJavaOptions": "-Djava.net.preferIPv4Stack=true", + "spark.sql.debug.maxToStringFields": "1000", + "spark.executor.heartbeatInterval": "60s", + "spark.network.timeout": "300s", + "spark.sql.adaptive.enabled": "true", + "spark.sql.adaptive.coalescePartitions.parallelismFirst": "false", + "spark.sql.adaptive.skewJoin.enabled": "true", + "spark.sql.adaptive.advisoryPartitionSizeInBytes": args.advisory_partition_size, + "spark.sql.adaptive.coalescePartitions.minPartitionSize": args.min_partition_size, + "spark.sql.files.maxPartitionBytes": args.files_max_partition_bytes, + "spark.sql.shuffle.partitions": str(args.shuffle_partitions), + "spark.task.cpus": "1", + "spark.sql.legacy.parquet.datetimeRebaseModeInWrite": "CORRECTED", + "spark.sql.legacy.charVarcharAsString": "true", + "spark.locality.wait": "0", + "spark.executor.cores": str(args.executor_cores), + "spark.executor.memory": args.executor_memory, + "spark.executor.memoryOverhead": args.executor_memory_overhead, + "spark.driver.memory": args.driver_memory, + } + if args.driver_host: + common_configs["spark.driver.host"] = args.driver_host + if args.executor_instances: + common_configs["spark.executor.instances"] = str(args.executor_instances) + + for key, value in common_configs.items(): + builder = builder.config(key, value) + + if args.plugin == "rapids": + rapids_configs = { + "spark.driver.extraClassPath": SPARK_RAPIDS_JAR, + "spark.executor.extraClassPath": SPARK_RAPIDS_JAR, + "spark.plugins": "com.nvidia.spark.SQLPlugin", + "spark.rapids.sql.enabled": "true", + "spark.rapids.sql.explain": args.rapids_explain, + "spark.rapids.sql.exec.InMemoryTableScanExec": "true", + "spark.rapids.allowMultipleJars": "ALWAYS", + "spark.rapids.sql.allowMultipleJars": "ALWAYS", + "spark.rapids.filecache.enabled": "false", + "spark.rapids.memory.gpu.debug": "NONE", + "spark.rapids.memory.host.spillStorageSize": args.rapids_host_spill_size, + "spark.rapids.memory.pinnedPool.size": args.rapids_pinned_pool_size, + "spark.rapids.shuffle.mode": "MULTITHREADED", + "spark.rapids.shuffle.multiThreaded.reader.threads": args.rapids_shuffle_reader_threads, + "spark.rapids.shuffle.multiThreaded.writer.threads": args.rapids_shuffle_writer_threads, + "spark.rapids.sql.batchSizeBytes": args.rapids_batch_size_bytes, + "spark.rapids.sql.concurrentGpuTasks": str(args.rapids_concurrent_gpu_tasks), + "spark.shuffle.manager": args.rapids_shuffle_manager, + "spark.shuffle.compress": "true", + "spark.executor.resource.gpu.amount": "1", + "spark.task.resource.gpu.amount": str(args.task_gpu_amount), + "spark.executor.resource.gpu.vendor": "nvidia.com", + } + for key, value in rapids_configs.items(): + builder = builder.config(key, value) + else: + builder = builder.config("spark.rapids.sql.enabled", "false") + + return builder.getOrCreate() + + +def read_tables(spark: SparkSession, path_prefix: str) -> dict[str, DataFrame]: + return { + "table_a": spark.read.parquet(f"{path_prefix}/temp_forced_table_a"), + "table_b": spark.read.parquet(f"{path_prefix}/temp_table_b"), + "table_c": spark.read.parquet(f"{path_prefix}/temp_forced_table_c").withColumn("col_c_11", col("col_c_11").cast("string")), + "table_d": spark.read.parquet(f"{path_prefix}/temp_forced_table_d"), + "table_e": spark.read.parquet(f"{path_prefix}/temp_forced_table_e"), + } + + +def print_row_counts(tables: dict[str, DataFrame]) -> None: + for name, table in tables.items(): + print(f" {name}: {table.count()} rows", flush=True) + + +def build_workflow_join_query(tables: dict[str, DataFrame]) -> DataFrame: + table_a = tables["table_a"].alias("a") + table_b = tables["table_b"].alias("b") + table_c = tables["table_c"].alias("c") + table_d = tables["table_d"].alias("d") + table_e = tables["table_e"].alias("e") + + result = ( + table_a + .join( + table_b, + [ + col("a.col_a") == col("b.col_b_8"), + col("a.col_b") == col("b.col_b_3"), + col("a.col_c") == col("b.col_b_9"), + col("a.col_d") == col("b.col_b_1"), + ], + how="left", + ) + .drop("col_b_8", "col_b_3", "col_b_9", "col_b_1") + ) + result = ( + result + .join( + table_c, + [ + result["col_a"] == col("c.col_c_10"), + result["col_b"] == col("c.col_c_9"), + result["col_e"] == col("c.col_c_11"), + ], + how="left", + ) + .drop("col_c_10", "col_c_9", "col_c_11") + ) + result = ( + result + .join( + table_d, + [ + result["col_a"] == col("d.col_d_0"), + result["col_c"] == col("d.col_d_1"), + ], + how="left", + ) + .drop("col_d_0", "col_d_1") + ) + return result.join(table_e, result["col_a"] == col("e.col_e_0"), how="left").drop("col_e_0") + + +def print_effective_config(spark: SparkSession, args: argparse.Namespace) -> None: + keys = [ + "spark.master", + "spark.driver.host", + "spark.executor.instances", + "spark.executor.cores", + "spark.executor.memory", + "spark.executor.resource.gpu.amount", + "spark.task.resource.gpu.amount", + "spark.rapids.sql.enabled", + "spark.rapids.sql.concurrentGpuTasks", + "spark.rapids.memory.pinnedPool.size", + "spark.rapids.sql.batchSizeBytes", + "spark.shuffle.manager", + "spark.sql.shuffle.partitions", + "spark.sql.files.maxPartitionBytes", + ] + print("Effective Spark config:", flush=True) + for key in keys: + print(f" {key}: {spark.conf.get(key, '')}", flush=True) + print(f" plugin: {args.plugin}", flush=True) + + +def main() -> None: + args = parse_args() + path_prefix = str(Path(args.path_prefix)) + output = str(Path(args.output)) + + print("Starting Spark workflow join", flush=True) + print(f" path_prefix: {path_prefix}", flush=True) + print(f" output: {output}", flush=True) + print(f" action: {args.action}", flush=True) + + spark = create_session(args) + print_effective_config(spark, args) + + try: + print("Reading tables from parquet...", flush=True) + tables = read_tables(spark, path_prefix) + if args.count_inputs: + print("Input row counts:", flush=True) + print_row_counts(tables) + else: + print("Skipping input row counts.", flush=True) + + print("Building workflow join: table_a -> table_b -> table_c -> table_d -> table_e ...", flush=True) + workflow_result = build_workflow_join_query(tables) + if args.output_partitions > 0: + print(f"Repartitioning result to {args.output_partitions} partitions before action.", flush=True) + workflow_result = workflow_result.repartition(args.output_partitions) + if args.explain: + workflow_result.explain(mode="formatted") + + start = time.time() + result_rows = None + if args.action == "parquet": + workflow_result.write.mode(args.mode).parquet(output) + if args.count_result: + result_rows = spark.read.parquet(output).count() + elif args.action == "count": + result_rows = workflow_result.count() + else: + workflow_result.write.format("noop").mode("overwrite").save() + elapsed = time.time() - start + + print(f"\nWorkflow join completed in {elapsed:.2f} seconds", flush=True) + if args.action == "parquet": + print(f" Output: {output}", flush=True) + if result_rows is not None: + print(f" Result rows: {result_rows}", flush=True) + finally: + spark.stop() + + +if __name__ == "__main__": + main() diff --git a/Data-Gen/workflow_join_spark-1tb-rapids.sh b/Data-Gen/workflow_join_spark-1tb-rapids.sh new file mode 100755 index 0000000..af5bc83 --- /dev/null +++ b/Data-Gen/workflow_join_spark-1tb-rapids.sh @@ -0,0 +1,312 @@ +#!/bin/bash +# Multi-node Spark standalone launcher for workflow_join_spark-1tb-rapids.py. +set -euo pipefail +set -x + +source /etc/profile.d/spark-rapids.sh + +export PYTHONUNBUFFERED=1 +export NVIDIA_VISIBLE_DEVICES=${NVIDIA_VISIBLE_DEVICES:-all} +export NVIDIA_DRIVER_CAPABILITIES=${NVIDIA_DRIVER_CAPABILITIES:-compute,utility} + +PROJECT_DIR=${REAL_HOME:-$HOME}/bzaitlen/test-data/Data-Gen +SCHEDULER_DIR=${PROJECT_DIR}/scheduler +RESULTS_DIR=${PROJECT_DIR}/results +SPARK_MASTER_URL_FILE=${SCHEDULER_DIR}/spark-master-url-job${SLURM_JOB_ID}.txt +DONE_FLAG=${SCHEDULER_DIR}/spark-done-job${SLURM_JOB_ID}.flag +FAILED_FLAG=${SCHEDULER_DIR}/spark-failed-job${SLURM_JOB_ID}.flag +mkdir -p "$SCHEDULER_DIR" "$RESULTS_DIR" + +CURRENT_NODE=$(hostname -s) +NODE_GPUS=${SLURM_GPUS_ON_NODE:-4} +EXPECTED_WORKERS=${SLURM_NNODES:-1} +EXPECTED_GPUS=$((NODE_GPUS * EXPECTED_WORKERS)) + +SPARK_MASTER_PORT=${SPARK_MASTER_PORT:-7077} +SPARK_MASTER_WEBUI_PORT=${SPARK_MASTER_WEBUI_PORT:-18080} +SPARK_WORKER_WEBUI_PORT=${SPARK_WORKER_WEBUI_PORT:-18081} +SPARK_WORKER_CORES=${SPARK_WORKER_CORES:-${SLURM_CPUS_PER_TASK:-144}} +SPARK_WORKER_MEMORY=${SPARK_WORKER_MEMORY:-1400g} + +SPARK_BASE_DIR=${SPARK_BASE_DIR:-/scratch/prestouser/spark-tmp} +SPARK_NODE_DIR=${SPARK_BASE_DIR}/job-${SLURM_JOB_ID}/${CURRENT_NODE} +export SPARK_LOCAL_DIRS=${SPARK_LOCAL_DIRS:-${SPARK_NODE_DIR}/local} +export SPARK_WORKER_DIR=${SPARK_WORKER_DIR:-${SPARK_NODE_DIR}/worker} +export SPARK_LOG_DIR=${SPARK_LOG_DIR:-${SPARK_NODE_DIR}/logs} +export SPARK_PID_DIR=${SPARK_PID_DIR:-${SPARK_NODE_DIR}/pids} +export SPARK_RUN_DIR=${SPARK_RUN_DIR:-${SPARK_NODE_DIR}/run} +mkdir -p "$SPARK_LOCAL_DIRS" "$SPARK_WORKER_DIR" "$SPARK_LOG_DIR" "$SPARK_PID_DIR" "$SPARK_RUN_DIR" + +EVENT_LOG_ROOT=${SPARK_EVENT_LOG_ROOT:-/scratch/prestouser/spark-events} +EVENT_LOG_PATH=${EVENT_LOG_ROOT}/job-${SLURM_JOB_ID} +mkdir -p "$EVENT_LOG_PATH" +export SPARK_EVENT_LOG_DIR=${SPARK_EVENT_LOG_DIR:-file://${EVENT_LOG_PATH}} + +export SPARK_DAEMON_MEMORY=${SPARK_DAEMON_MEMORY:-4g} +export SPARK_DAEMON_JAVA_OPTS="${SPARK_DAEMON_JAVA_OPTS:-} -Djava.net.preferIPv4Stack=true" +export SPARK_MASTER_OPTS="${SPARK_MASTER_OPTS:-} -Dspark.deploy.defaultCores=$((EXPECTED_WORKERS * SPARK_WORKER_CORES)) -Dspark.master.rest.enabled=false" +export SPARK_WORKER_OPTS="${SPARK_WORKER_OPTS:-} -Djava.net.preferIPv4Stack=true -Dspark.worker.resource.gpu.amount=${NODE_GPUS} -Dspark.worker.resource.gpu.discoveryScript=${SPARK_HOME}/getGpusResources.sh" +export SPARK_PUBLIC_DNS=${SPARK_PUBLIC_DNS:-$CURRENT_NODE} + +echo "Node: $CURRENT_NODE Head: $HEAD_NODE NodeID: $SLURM_NODEID Procid: $SLURM_PROCID" +echo "[$CURRENT_NODE] NODE_GPUS=$NODE_GPUS EXPECTED_WORKERS=$EXPECTED_WORKERS EXPECTED_GPUS=$EXPECTED_GPUS" +echo "[$CURRENT_NODE] CUDA_VISIBLE_DEVICES=${CUDA_VISIBLE_DEVICES:-unset}" +echo "[$CURRENT_NODE] SPARK_LOCAL_DIRS=$SPARK_LOCAL_DIRS" + +cleanup_spark() ( + set +e + "${SPARK_HOME}/sbin/stop-worker.sh" >/dev/null 2>&1 + if [[ "${CURRENT_NODE}" == "${HEAD_NODE}" ]]; then + "${SPARK_HOME}/sbin/stop-master.sh" >/dev/null 2>&1 + fi + pkill -f "org.apache.spark.deploy.worker.Worker" >/dev/null 2>&1 + if [[ "${CURRENT_NODE}" == "${HEAD_NODE}" ]]; then + pkill -f "org.apache.spark.deploy.master.Master" >/dev/null 2>&1 + fi + true +) +trap cleanup_spark EXIT + +cleanup_spark +sleep 2 + +if [[ "$CURRENT_NODE" == "$HEAD_NODE" ]]; then + rm -f "$SPARK_MASTER_URL_FILE" "$DONE_FLAG" "$FAILED_FLAG" + export SPARK_MASTER_HOST=${SPARK_MASTER_HOST:-$HEAD_NODE} + echo "[$CURRENT_NODE] Starting Spark master at ${SPARK_MASTER_HOST}:${SPARK_MASTER_PORT}" + "${SPARK_HOME}/sbin/start-master.sh" \ + --host "$SPARK_MASTER_HOST" \ + --port "$SPARK_MASTER_PORT" \ + --webui-port "$SPARK_MASTER_WEBUI_PORT" + MASTER_URL="spark://${SPARK_MASTER_HOST}:${SPARK_MASTER_PORT}" + echo "$MASTER_URL" > "$SPARK_MASTER_URL_FILE" +else + echo -n "[$CURRENT_NODE] Waiting for Spark master URL" + set +x + while [[ ! -f "$SPARK_MASTER_URL_FILE" ]]; do + echo -n "." + sleep 2 + done + echo " found" + set -x +fi + +MASTER_URL=$(cat "$SPARK_MASTER_URL_FILE") +echo "[$CURRENT_NODE] Starting Spark worker against ${MASTER_URL}" +"${SPARK_HOME}/sbin/start-worker.sh" "$MASTER_URL" \ + --cores "$SPARK_WORKER_CORES" \ + --memory "$SPARK_WORKER_MEMORY" \ + --webui-port "$SPARK_WORKER_WEBUI_PORT" + +if [[ "$CURRENT_NODE" == "$HEAD_NODE" ]]; then + echo "[$CURRENT_NODE] Waiting for ${EXPECTED_WORKERS} Spark workers to register..." + python3 - <= expected: + break + except Exception as exc: + last = repr(exc) + print(f" Waiting for Spark master JSON endpoint: {last}", flush=True) + time.sleep(2) +else: + print(f"Timed out waiting for Spark workers; last status: {last}", file=sys.stderr) + sys.exit(1) +PY + + DATETIME=$(date +%Y-%m-%d_%H-%M-%S) + output_log="$RESULTS_DIR/workflow-join-spark-1tb-rapids-${EXPECTED_GPUS}gpus-job-${SLURM_JOB_ID}-$DATETIME.txt" + dataset_path=${DATASET_PATH:-/scratch/prestouser/test-data/500000-1TB} + output_path=${OUTPUT_PATH:-${dataset_path}/workflow_join_spark_rapids_output} + executor_instances=${SPARK_EXECUTOR_INSTANCES:-$EXPECTED_GPUS} + executor_cores=${SPARK_EXECUTOR_CORES:-8} + executor_memory=${SPARK_EXECUTOR_MEMORY:-96g} + executor_memory_overhead=${SPARK_EXECUTOR_MEMORY_OVERHEAD:-64g} + driver_memory=${SPARK_DRIVER_MEMORY:-32g} + shuffle_partitions=${SPARK_SQL_SHUFFLE_PARTITIONS:-2400} + driver_host=${SPARK_DRIVER_HOST:-$HEAD_NODE} + rapids_shuffle_manager=${SPARK_RAPIDS_SHUFFLE_MANAGER:-com.nvidia.spark.rapids.spark358.RapidsShuffleManager} + + if [[ "${SPARK_SMOKE_TEST:-0}" == "1" || "${SPARK_SMOKE_TEST:-false}" == "true" ]]; then + smoke_py="$RESULTS_DIR/spark_rapids_smoke_job${SLURM_JOB_ID}-$DATETIME.py" + cat > "$smoke_py" <<'PY' +from pyspark.sql import SparkSession + +spark = SparkSession.builder.appName("SparkRapidsSmoke").getOrCreate() +print("smoke_rapids_enabled", spark.conf.get("spark.rapids.sql.enabled", "unset"), flush=True) +print("smoke_executor_instances", spark.conf.get("spark.executor.instances", "unset"), flush=True) +result = ( + spark.range(0, 1_000_000) + .selectExpr("id % 17 AS k", "id AS v") + .groupBy("k") + .count() + .count() +) +print("smoke_result_rows", result, flush=True) +spark.stop() +PY + echo "[$CURRENT_NODE] Running Spark RAPIDS smoke test" | tee "$output_log" + set +e + "${SPARK_HOME}/bin/spark-submit" \ + --master "$MASTER_URL" \ + --deploy-mode client \ + --driver-memory "$driver_memory" \ + --conf "spark.submit.deployMode=client" \ + --conf "spark.eventLog.enabled=true" \ + --conf "spark.eventLog.dir=$SPARK_EVENT_LOG_DIR" \ + --conf "spark.driver.bindAddress=0.0.0.0" \ + --conf "spark.driver.host=$driver_host" \ + --conf "spark.driver.extraClassPath=${SPARK_HOME}/sparkRapidsPlugin/rapids-4-spark.jar" \ + --conf "spark.executor.extraClassPath=${SPARK_HOME}/sparkRapidsPlugin/rapids-4-spark.jar" \ + --conf "spark.executor.instances=$executor_instances" \ + --conf "spark.executor.cores=$executor_cores" \ + --conf "spark.executor.memory=$executor_memory" \ + --conf "spark.executor.memoryOverhead=$executor_memory_overhead" \ + --conf "spark.executor.resource.gpu.amount=1" \ + --conf "spark.task.resource.gpu.amount=${SPARK_TASK_GPU_AMOUNT:-0.125}" \ + --conf "spark.executor.resource.gpu.vendor=nvidia.com" \ + --conf "spark.plugins=com.nvidia.spark.SQLPlugin" \ + --conf "spark.rapids.sql.enabled=true" \ + --conf "spark.rapids.allowMultipleJars=ALWAYS" \ + --conf "spark.rapids.sql.allowMultipleJars=ALWAYS" \ + --conf "spark.rapids.sql.explain=${SPARK_RAPIDS_SQL_EXPLAIN:-NONE}" \ + --conf "spark.rapids.memory.pinnedPool.size=${SPARK_RAPIDS_PINNED_POOL_SIZE:-4g}" \ + --conf "spark.rapids.memory.host.spillStorageSize=${SPARK_RAPIDS_HOST_SPILL_SIZE:-128G}" \ + --conf "spark.rapids.sql.concurrentGpuTasks=${SPARK_RAPIDS_CONCURRENT_GPU_TASKS:-1}" \ + --conf "spark.rapids.shuffle.mode=MULTITHREADED" \ + --conf "spark.shuffle.manager=$rapids_shuffle_manager" \ + --conf "spark.sql.shuffle.partitions=$shuffle_partitions" \ + "$smoke_py" \ + 2>&1 | tee -a "$output_log" + DRIVER_RC=${PIPESTATUS[0]} + set -e + + if [[ "$DRIVER_RC" == "0" ]]; then + touch "$DONE_FLAG" + else + touch "$FAILED_FLAG" + fi + exit "$DRIVER_RC" + fi + + cp "$PROJECT_DIR/workflow_join_spark-1tb-rapids.py" "$RESULTS_DIR/workflow_join_spark-1tb-rapids-driver-job${SLURM_JOB_ID}-$DATETIME.py" + cp "$PROJECT_DIR/workflow_join_spark-1tb-rapids.sh" "$RESULTS_DIR/workflow_join_spark-1tb-rapids-job${SLURM_JOB_ID}-$DATETIME.sh" + cp "$PROJECT_DIR/workflow_join_spark-1tb-rapids.slurm" "$RESULTS_DIR/workflow_join_spark-1tb-rapids-job${SLURM_JOB_ID}-$DATETIME.slurm" + + driver_args=( + --path-prefix "$dataset_path" + --output "$output_path" + --master "$MASTER_URL" + --plugin "${SPARK_PLUGIN:-rapids}" + --action "${WORKFLOW_ACTION:-parquet}" + --mode "${SPARK_WRITE_MODE:-overwrite}" + --event-log-dir "$SPARK_EVENT_LOG_DIR" + --driver-host "$driver_host" + --executor-instances "$executor_instances" + --executor-cores "$executor_cores" + --executor-memory "$executor_memory" + --executor-memory-overhead "$executor_memory_overhead" + --driver-memory "$driver_memory" + --shuffle-partitions "$shuffle_partitions" + --files-max-partition-bytes "${SPARK_SQL_FILES_MAX_PARTITION_BYTES:-160m}" + --advisory-partition-size "${SPARK_SQL_ADVISORY_PARTITION_SIZE:-160mb}" + --min-partition-size "${SPARK_SQL_MIN_PARTITION_SIZE:-32mb}" + --rapids-concurrent-gpu-tasks "${SPARK_RAPIDS_CONCURRENT_GPU_TASKS:-1}" + --task-gpu-amount "${SPARK_TASK_GPU_AMOUNT:-0.125}" + --rapids-pinned-pool-size "${SPARK_RAPIDS_PINNED_POOL_SIZE:-4g}" + --rapids-host-spill-size "${SPARK_RAPIDS_HOST_SPILL_SIZE:-128G}" + --rapids-batch-size-bytes "${SPARK_RAPIDS_SQL_BATCH_SIZE_BYTES:-536870912b}" + --rapids-shuffle-reader-threads "${SPARK_RAPIDS_SHUFFLE_READER_THREADS:-8}" + --rapids-shuffle-writer-threads "${SPARK_RAPIDS_SHUFFLE_WRITER_THREADS:-8}" + --rapids-shuffle-manager "$rapids_shuffle_manager" + --rapids-explain "${SPARK_RAPIDS_SQL_EXPLAIN:-NONE}" + ) + if [[ "${COUNT_INPUTS:-0}" == "1" || "${COUNT_INPUTS:-false}" == "true" ]]; then + driver_args+=(--count-inputs) + fi + if [[ "${COUNT_RESULT:-0}" == "1" || "${COUNT_RESULT:-false}" == "true" ]]; then + driver_args+=(--count-result) + fi + if [[ "${SPARK_EXPLAIN:-0}" == "1" || "${SPARK_EXPLAIN:-false}" == "true" ]]; then + driver_args+=(--explain) + fi + if [[ "${OUTPUT_PARTITIONS:-0}" != "0" ]]; then + driver_args+=(--output-partitions "${OUTPUT_PARTITIONS}") + fi + + echo "[$CURRENT_NODE] Submitting Spark RAPIDS workflow join" | tee "$output_log" + set +e + "${SPARK_HOME}/bin/spark-submit" \ + --master "$MASTER_URL" \ + --deploy-mode client \ + --driver-memory "$driver_memory" \ + --conf "spark.submit.deployMode=client" \ + --conf "spark.eventLog.enabled=true" \ + --conf "spark.eventLog.dir=$SPARK_EVENT_LOG_DIR" \ + --conf "spark.driver.bindAddress=0.0.0.0" \ + --conf "spark.driver.host=$driver_host" \ + --conf "spark.driver.extraClassPath=${SPARK_HOME}/sparkRapidsPlugin/rapids-4-spark.jar" \ + --conf "spark.executor.extraClassPath=${SPARK_HOME}/sparkRapidsPlugin/rapids-4-spark.jar" \ + --conf "spark.executor.instances=$executor_instances" \ + --conf "spark.executor.cores=$executor_cores" \ + --conf "spark.executor.memory=$executor_memory" \ + --conf "spark.executor.memoryOverhead=$executor_memory_overhead" \ + --conf "spark.executor.resource.gpu.amount=1" \ + --conf "spark.task.resource.gpu.amount=${SPARK_TASK_GPU_AMOUNT:-0.125}" \ + --conf "spark.executor.resource.gpu.vendor=nvidia.com" \ + --conf "spark.plugins=com.nvidia.spark.SQLPlugin" \ + --conf "spark.rapids.sql.enabled=true" \ + --conf "spark.rapids.sql.explain=${SPARK_RAPIDS_SQL_EXPLAIN:-NONE}" \ + --conf "spark.rapids.memory.pinnedPool.size=${SPARK_RAPIDS_PINNED_POOL_SIZE:-4g}" \ + --conf "spark.rapids.memory.host.spillStorageSize=${SPARK_RAPIDS_HOST_SPILL_SIZE:-128G}" \ + --conf "spark.rapids.sql.batchSizeBytes=${SPARK_RAPIDS_SQL_BATCH_SIZE_BYTES:-536870912b}" \ + --conf "spark.rapids.sql.concurrentGpuTasks=${SPARK_RAPIDS_CONCURRENT_GPU_TASKS:-1}" \ + --conf "spark.rapids.shuffle.mode=MULTITHREADED" \ + --conf "spark.rapids.shuffle.multiThreaded.reader.threads=${SPARK_RAPIDS_SHUFFLE_READER_THREADS:-8}" \ + --conf "spark.rapids.shuffle.multiThreaded.writer.threads=${SPARK_RAPIDS_SHUFFLE_WRITER_THREADS:-8}" \ + --conf "spark.shuffle.manager=$rapids_shuffle_manager" \ + --conf "spark.shuffle.compress=true" \ + --conf "spark.sql.adaptive.enabled=true" \ + --conf "spark.sql.adaptive.coalescePartitions.parallelismFirst=false" \ + --conf "spark.sql.adaptive.skewJoin.enabled=true" \ + --conf "spark.sql.shuffle.partitions=$shuffle_partitions" \ + --conf "spark.sql.files.maxPartitionBytes=${SPARK_SQL_FILES_MAX_PARTITION_BYTES:-160m}" \ + --conf "spark.sql.adaptive.advisoryPartitionSizeInBytes=${SPARK_SQL_ADVISORY_PARTITION_SIZE:-160mb}" \ + --conf "spark.sql.adaptive.coalescePartitions.minPartitionSize=${SPARK_SQL_MIN_PARTITION_SIZE:-32mb}" \ + "$PROJECT_DIR/workflow_join_spark-1tb-rapids.py" \ + "${driver_args[@]}" \ + 2>&1 | tee -a "$output_log" + DRIVER_RC=${PIPESTATUS[0]} + set -e + + if [[ "$DRIVER_RC" == "0" ]]; then + touch "$DONE_FLAG" + else + touch "$FAILED_FLAG" + fi + exit "$DRIVER_RC" +else + while [[ ! -f "$DONE_FLAG" && ! -f "$FAILED_FLAG" ]]; do + sleep 5 + done + if [[ -f "$FAILED_FLAG" ]]; then + exit 1 + fi +fi + +echo "[$CURRENT_NODE] Exiting." diff --git a/Data-Gen/workflow_join_spark-1tb-rapids.slurm b/Data-Gen/workflow_join_spark-1tb-rapids.slurm new file mode 100755 index 0000000..7227d44 --- /dev/null +++ b/Data-Gen/workflow_join_spark-1tb-rapids.slurm @@ -0,0 +1,80 @@ +#!/bin/bash +#SBATCH --job-name=bz-workflow-join-spark-1tb-rapids +#SBATCH --output=logs/%x_%j.out +#SBATCH --error=logs/%x_%j.err +#SBATCH --time=02:00:00 +#SBATCH --nodes=6 +#SBATCH --ntasks-per-node=1 +#SBATCH --cpus-per-task=144 +#SBATCH --mem=0 +#SBATCH --gres=gpu:4 +#SBATCH --exclusive +#SBATCH --nodelist=presto-gb200-gcn-[01-16] + +export DATASET_PATH=${DATASET_PATH:-/scratch/prestouser/test-data/500000-1TB} +export OUTPUT_PATH=${OUTPUT_PATH:-${DATASET_PATH}/workflow_join_spark_rapids_output} +export WORKFLOW_ACTION=${WORKFLOW_ACTION:-parquet} + +export SPARK_PLUGIN=${SPARK_PLUGIN:-rapids} +export SPARK_EXECUTOR_CORES=${SPARK_EXECUTOR_CORES:-8} +export SPARK_EXECUTOR_MEMORY=${SPARK_EXECUTOR_MEMORY:-96g} +export SPARK_EXECUTOR_MEMORY_OVERHEAD=${SPARK_EXECUTOR_MEMORY_OVERHEAD:-64g} +export SPARK_DRIVER_MEMORY=${SPARK_DRIVER_MEMORY:-32g} +export SPARK_SQL_SHUFFLE_PARTITIONS=${SPARK_SQL_SHUFFLE_PARTITIONS:-2400} +export SPARK_SQL_FILES_MAX_PARTITION_BYTES=${SPARK_SQL_FILES_MAX_PARTITION_BYTES:-160m} +export SPARK_RAPIDS_CONCURRENT_GPU_TASKS=${SPARK_RAPIDS_CONCURRENT_GPU_TASKS:-1} +export SPARK_TASK_GPU_AMOUNT=${SPARK_TASK_GPU_AMOUNT:-0.125} +export SPARK_RAPIDS_PINNED_POOL_SIZE=${SPARK_RAPIDS_PINNED_POOL_SIZE:-4g} +export SPARK_RAPIDS_HOST_SPILL_SIZE=${SPARK_RAPIDS_HOST_SPILL_SIZE:-128G} +export SPARK_RAPIDS_SQL_BATCH_SIZE_BYTES=${SPARK_RAPIDS_SQL_BATCH_SIZE_BYTES:-536870912b} +export SPARK_RAPIDS_SHUFFLE_READER_THREADS=${SPARK_RAPIDS_SHUFFLE_READER_THREADS:-8} +export SPARK_RAPIDS_SHUFFLE_WRITER_THREADS=${SPARK_RAPIDS_SHUFFLE_WRITER_THREADS:-8} +export SPARK_RAPIDS_SHUFFLE_MANAGER=${SPARK_RAPIDS_SHUFFLE_MANAGER:-com.nvidia.spark.rapids.spark358.RapidsShuffleManager} + +export COUNT_INPUTS=${COUNT_INPUTS:-0} +export COUNT_RESULT=${COUNT_RESULT:-0} +export OUTPUT_PARTITIONS=${OUTPUT_PARTITIONS:-0} +export SPARK_SMOKE_TEST=${SPARK_SMOKE_TEST:-0} + +export NVIDIA_VISIBLE_DEVICES=all +export NVIDIA_DRIVER_CAPABILITIES=compute,utility +export REAL_HOME=$HOME + +LOCAL_CONTAINER_PATH=${LOCAL_CONTAINER_PATH:-/scratch/prestouser/images/test-data-spark-rapids-2026-05-28_19-03-04-arm64-cuda13-spark3.5.8-rapids26.04.2.sqsh} +CONTAINER_MOUNTS="${HOME}:${HOME},/scratch:/scratch" + +export HEAD_NODE=$(scontrol show hostnames "$SLURM_JOB_NODELIST" | head -n 1) +export SPARK_MASTER_URL=spark://${HEAD_NODE}:${SPARK_MASTER_PORT:-7077} +export SPARK_DRIVER_HOST=${SPARK_DRIVER_HOST:-$HEAD_NODE} +export SPARK_EXECUTOR_INSTANCES=${SPARK_EXECUTOR_INSTANCES:-$((SLURM_JOB_NUM_NODES * 4))} + +mkdir -p logs ${HOME}/bzaitlen/test-data/Data-Gen/scheduler + +echo "========================================" +echo "Job ID: $SLURM_JOB_ID" +echo "Nodes: $SLURM_JOB_NUM_NODES" +echo "Node list: $SLURM_JOB_NODELIST" +echo "Head node: $HEAD_NODE" +echo "Dataset: $DATASET_PATH" +echo "Output: $OUTPUT_PATH" +echo "Action: $WORKFLOW_ACTION" +echo "Spark plugin: $SPARK_PLUGIN" +echo "Container: $LOCAL_CONTAINER_PATH" +echo "Executor instances: $SPARK_EXECUTOR_INSTANCES" +echo "Executor cores: $SPARK_EXECUTOR_CORES" +echo "Executor memory: $SPARK_EXECUTOR_MEMORY" +echo "Shuffle partitions: $SPARK_SQL_SHUFFLE_PARTITIONS" +echo "RAPIDS pinned pool: $SPARK_RAPIDS_PINNED_POOL_SIZE" +echo "RAPIDS host spill: $SPARK_RAPIDS_HOST_SPILL_SIZE" +echo "RAPIDS batch bytes: $SPARK_RAPIDS_SQL_BATCH_SIZE_BYTES" +echo "RAPIDS shuffle mgr: $SPARK_RAPIDS_SHUFFLE_MANAGER" +echo "Smoke test: $SPARK_SMOKE_TEST" +echo "========================================" + +srun \ + --export=ALL \ + --container-image=${LOCAL_CONTAINER_PATH} \ + --container-mounts=${CONTAINER_MOUNTS} \ + --container-remap-root \ + --container-writable \ + bash ${HOME}/bzaitlen/test-data/Data-Gen/workflow_join_spark-1tb-rapids.sh