diff --git a/gpu_bdb/bdb_tools/utils.py b/gpu_bdb/bdb_tools/utils.py index 3e53cbbe..4b3dd09f 100755 --- a/gpu_bdb/bdb_tools/utils.py +++ b/gpu_bdb/bdb_tools/utils.py @@ -976,3 +976,33 @@ def train_clustering_model(training_df, n_clusters, max_iter, n_init): "cluster_centers": best_model.cluster_centers_, "nclusters": n_clusters, } + + +def get_web_clickstream_flist(basepath): + clickstreams_path = os.path.join(basepath, "web_clickstreams/*.parquet") + if clickstreams_path[:5] == "s3://": + import s3fs + fs = s3fs.S3FileSystem() + return ["s3://" + fn for fn in fs.glob(clickstreams_path)] + else: + return glob.glob(clickstreams_path) + + +def get_negative_sentiment(basepath): + def preprocess(f): + negativeSentiment = list(map(str.strip, f.readlines())) + # dedupe for one extra record in the source file + return list(set(negativeSentiment)) + + # This file comes from the official TPCx-BB kit + # We extracted it from bigbenchqueriesmr.jar + neg_sentiment_path = os.path.join(basepath, "sentiment_files", "negativeSentiment.txt") + if neg_sentiment_path[:5] == "s3://": + import s3fs + fs = s3fs.S3FileSystem() + with fs.open(neg_sentiment_path, 'r') as fh: + return preprocess(fh) + else: + with open(neg_sentiment_path) as fh: + return preprocess(fh) + diff --git a/gpu_bdb/queries/q03/gpu_bdb_query_03.py b/gpu_bdb/queries/q03/gpu_bdb_query_03.py index 68a04af4..9fd4e7bf 100755 --- a/gpu_bdb/queries/q03/gpu_bdb_query_03.py +++ b/gpu_bdb/queries/q03/gpu_bdb_query_03.py @@ -22,6 +22,7 @@ benchmark, gpubdb_argparser, run_query, + get_web_clickstream_flist, ) from bdb_tools.readers import build_reader @@ -232,7 +233,7 @@ def main(client, config): # this causes more memory pressures as we try to read the whole thing ( and spill that) # at once and then do filtration . - web_clickstream_flist = glob.glob(os.path.join(config["data_dir"], "web_clickstreams/*.parquet")) + web_clickstream_flist = get_web_clickstream_flist(config["data_dir"]) task_ls = [ delayed(pre_repartition_task)(fn, item_df.to_delayed()[0], wcs_tstamp_min) for fn in web_clickstream_flist diff --git a/gpu_bdb/queries/q05/gpu_bdb_query_05.py b/gpu_bdb/queries/q05/gpu_bdb_query_05.py index 290cf127..d0d4c78a 100755 --- a/gpu_bdb/queries/q05/gpu_bdb_query_05.py +++ b/gpu_bdb/queries/q05/gpu_bdb_query_05.py @@ -22,6 +22,7 @@ benchmark, gpubdb_argparser, run_query, + get_web_clickstream_flist, ) from bdb_tools.readers import build_reader @@ -184,7 +185,7 @@ def main(client, config): keep_cols = ["i_item_sk", "i_category_id", "clicks_in_category"] item_ddf = item_ddf[keep_cols] - web_clickstream_flist = glob.glob(os.path.join(config["data_dir"], "web_clickstreams/*.parquet")) + web_clickstream_flist = get_web_clickstream_flist(config["data_dir"]) n_workers = len(client.scheduler_info()["workers"]) batchsize = len(web_clickstream_flist) // n_workers if batchsize < 1: diff --git a/gpu_bdb/queries/q08/gpu_bdb_query_08.py b/gpu_bdb/queries/q08/gpu_bdb_query_08.py index 686ea05b..0301c2ae 100755 --- a/gpu_bdb/queries/q08/gpu_bdb_query_08.py +++ b/gpu_bdb/queries/q08/gpu_bdb_query_08.py @@ -23,6 +23,7 @@ gpubdb_argparser, run_query, convert_datestring_to_days, + get_web_clickstream_flist, ) from bdb_tools.readers import build_reader from bdb_tools.merge_util import hash_merge @@ -251,7 +252,7 @@ def main(client, config): web_page_newcols = ["wp_web_page_sk", "wp_type_codes"] web_page_df = web_page_df[web_page_newcols] - web_clickstream_flist = glob.glob(os.path.join(config["data_dir"], "web_clickstreams/*.parquet")) + web_clickstream_flist = get_web_clickstream_flist(config["data_dir"]) task_ls = [ delayed(etl_wcs)( diff --git a/gpu_bdb/queries/q10/gpu_bdb_query_10.py b/gpu_bdb/queries/q10/gpu_bdb_query_10.py index cb24ef88..86e504e7 100755 --- a/gpu_bdb/queries/q10/gpu_bdb_query_10.py +++ b/gpu_bdb/queries/q10/gpu_bdb_query_10.py @@ -56,10 +56,19 @@ def read_tables(config): def load_sentiment_words(filename, sentiment): import cudf - with open(filename) as fh: - sentiment_words = list(map(str.strip, fh.readlines())) + def preprocess(f): + sentiment_words = list(map(str.strip, f.readlines())) # dedupe for one extra record in the source file - sentiment_words = list(set(sentiment_words)) + return list(set(sentiment_words)) + + if filename[:5] == "s3://": + import s3fs + fs = s3fs.S3FileSystem() + with fs.open(filename, 'r') as fh: + sentiment_words = preprocess(fh) + else: + with open(filename) as fh: + sentiment_words = preprocess(fh) sent_df = cudf.DataFrame({"word": sentiment_words}) sent_df["sentiment"] = sentiment diff --git a/gpu_bdb/queries/q12/gpu_bdb_query_12.py b/gpu_bdb/queries/q12/gpu_bdb_query_12.py index e912c6f3..38a8f5a2 100755 --- a/gpu_bdb/queries/q12/gpu_bdb_query_12.py +++ b/gpu_bdb/queries/q12/gpu_bdb_query_12.py @@ -22,6 +22,7 @@ benchmark, gpubdb_argparser, run_query, + get_web_clickstream_flist, ) from bdb_tools.readers import build_reader @@ -176,7 +177,7 @@ def main(client, config): "wcs_click_date_sk": np.ones(1, dtype=np.int64), } meta_df = cudf.DataFrame(meta_d) - web_clickstream_flist = glob.glob(os.path.join(config["data_dir"], "web_clickstreams/*.parquet")) + web_clickstream_flist = get_web_clickstream_flist(config["data_dir"]) task_ls = [ delayed(filter_wcs_table)(fn, filtered_item_df.to_delayed()[0]) for fn in web_clickstream_flist diff --git a/gpu_bdb/queries/q18/gpu_bdb_query_18.py b/gpu_bdb/queries/q18/gpu_bdb_query_18.py index 899d1c86..424d0e01 100755 --- a/gpu_bdb/queries/q18/gpu_bdb_query_18.py +++ b/gpu_bdb/queries/q18/gpu_bdb_query_18.py @@ -24,6 +24,7 @@ gpubdb_argparser, left_semi_join, run_query, + get_negative_sentiment, ) from bdb_tools.readers import build_reader @@ -284,13 +285,7 @@ def main(client, config): sentences["sentence_tokenized_global_pos"] = sentences.x.cumsum() del sentences["x"] - # This file comes from the official TPCx-BB kit - # We extracted it from bigbenchqueriesmr.jar - sentiment_dir = os.path.join(config["data_dir"], "sentiment_files") - with open(os.path.join(sentiment_dir, "negativeSentiment.txt")) as fh: - negativeSentiment = list(map(str.strip, fh.readlines())) - # dedupe for one extra record in the source file - negativeSentiment = list(set(negativeSentiment)) + negativeSentiment = get_negative_sentiment(config["data_dir"]) word_df = sentences.map_partitions( create_words_from_sentences, diff --git a/gpu_bdb/queries/q19/gpu_bdb_query_19.py b/gpu_bdb/queries/q19/gpu_bdb_query_19.py index 8d4e29a2..d5034eb5 100755 --- a/gpu_bdb/queries/q19/gpu_bdb_query_19.py +++ b/gpu_bdb/queries/q19/gpu_bdb_query_19.py @@ -21,6 +21,7 @@ benchmark, gpubdb_argparser, run_query, + get_negative_sentiment, ) from bdb_tools.text import create_sentences_from_reviews, create_words_from_sentences @@ -161,13 +162,7 @@ def main(client, config): global_position_column="sentence_tokenized_global_pos", ) - # This file comes from the official TPCx-BB kit - # We extracted it from bigbenchqueriesmr.jar - sentiment_dir = os.path.join(config["data_dir"], "sentiment_files") - with open(os.path.join(sentiment_dir, "negativeSentiment.txt")) as fh: - negativeSentiment = list(map(str.strip, fh.readlines())) - # dedupe for one extra record in the source file - negativeSentiment = list(set(negativeSentiment)) + negativeSentiment = get_negative_sentiment(config["data_dir"]) sent_df = cudf.DataFrame({"word": negativeSentiment}) sent_df["sentiment"] = "NEG" diff --git a/gpu_bdb/queries/q30/gpu_bdb_query_30.py b/gpu_bdb/queries/q30/gpu_bdb_query_30.py index 5f9eaac5..5c788927 100755 --- a/gpu_bdb/queries/q30/gpu_bdb_query_30.py +++ b/gpu_bdb/queries/q30/gpu_bdb_query_30.py @@ -22,6 +22,7 @@ benchmark, gpubdb_argparser, run_query, + get_web_clickstream_flist, ) from bdb_tools.readers import build_reader from bdb_tools.sessionization import get_session_id, get_distinct_sessions, get_pairs @@ -106,7 +107,7 @@ def main(client, config): # this causes more memory pressures as we try to read the whole thing ( and spill that) # at once and then do filtration . - web_clickstream_flist = glob.glob(os.path.join(config["data_dir"], "web_clickstreams/*.parquet")) + web_clickstream_flist = get_web_clickstream_flist(config["data_dir"]) task_ls = [ delayed(pre_repartition_task)(fn, f_item_df.to_delayed()[0]) for fn in web_clickstream_flist