Distributed Data Science using NVTabular on Spark & Dask
NVTabular — Why, What, and How?
Why? Around 75–80% of the time is spent on ETL and feature engineering tasks by Data Engineers/AI practitioners instead of model building or fine-tuning of hyperparameters. With Big Data in Deep Learning-based systems, we experience performance bottlenecks majorly due to the following:
- Data ETL (extract-transform-load) takes a substantial quantity of time,
- Feature engineering investigations create and evaluate several groups of new features from existing ones for their efficacies,
- If not well optimized, data loading can become the input bottleneck at train time, which leads to wastage of high-throughput GPU computing,
- Comprehensive repeated experimentation for model selection or tuning of hyperparameters.
For training and deployment models on GPUs, the primary objective of NVTabular is simplifying and accelerating the tabular-data processing pipeline.
What? It is a collection of high-speed operators, on-GPU preprocessing, and feature engineering of tabular data with the capability of handling terabyte-scale datasets. The output can be made available to training frameworks such as HugeCTR, PyTorch, or TensorFlow at high throughput using NVTabular data loader extensions, eliminating the input bottleneck commonly found in GPU system training.
NVTabular focuses on the data preparation and data-loading phase. Here’s a common recommender system pipeline.
Recommender system training pipeline
Position compared to other popular DataFrame libraries
Features of NVTabular
- Fast Data loading, Preprocessing, and Feature Engineering
- Eliminating CPU bottlenecks by using GPU-acceleration
- Execution is done Out-of-core. No GPU memory limits and reduced I/O through lazy execution.
- Pytorch, TensorFlow, and HugeCTR compatible.
- Missing values or outliers can be filtered out.
- Inputting and filling in missing data.
- Discretization or bucketing of continuous features.
- Creating features by splitting or combining existing features.
- Normalizing numerical features to have zero mean and unit variance.
- Encoding discrete features using one-hot vectors or converting them to continuous integer indices.
- More features are added continuously.
To know more click here
NVTabular Vs CuDF, Pandas, Spark3
Feature comparison of NVT Vs Others
The three main advantages of NVTabular are:
- Performance: Up to 10x performance improvement is done by Lazy execution and GPU acceleration.
- Scale: Dataset operations can scale beyond GPU/CPU memory.
- Usability: To accomplish the same processing pipeline, fewer API calls are required.
How? The steps we need to follow to use NVT are given below along with the generic code:
- Specifying continuous and categorical columns.
- Defining an NVTabular workflow and supplying a set of train and validation files.
- Add preprocessing operations to the workflow.
- Executing by saving the processed data to disk.
import nvtabular as nvt
import glob# specify continuous feature names
cont_names = ["I"+str(x) for x in range(1, 14)]# specify categorical feature names
cat_names = ["C"+str(x) for x in range(1, 27)]# specify target feature
label_names = ["label"]# all feature names
columns = label_names + cat_names + cont_names# initialize workflow
proc = nvt.Worfklow(cat_names=cat_names, cont_names=cont_names, label_name=label_names)# create datsets from input files
train_files = glob.glob("./dataset/train/*.parquet")
valid_files = glob.glob("./dataset/valid/*.parquet")
train_dataset = nvt.dataset(train_files, gpu_memory_frac=0.1)
valid_dataset = nvt.dataset(valid_files, gpu_memory_frac=0.1)# add feature engineering and preprocessing ops to workflow
proc.add_cont_feature([nvt.ops.ZeroFill(), nvt.ops.LogOp()])
proc.add_cont_preprocess(nvt.ops.Normalize())
proc.add_cat_preprocess(nvt.ops.Categorify(use_frequency=True, freq_threshold=15))# compute statistics, transform data, export to disk
proc.apply(train_dataset, shuffle=True, output_path="./processed_data/train", num_out_files=len(train_files))proc.apply(valid_dataset, shuffle=False, output_path="./processed_data/valid", num_out_files=len(valid_files)
How to run multi-node NVTabular preprocessing using DASK
A combination of the scheduler, workers, and clients is called the Dask cluster which enables distributed execution of Python functions. Dask enables easily scheduled tasks for multiple workers: multi-GPU or multi-node. We just need to initialize a Dask cluster (Like LocalCUDACluster
) and NVTabular will use the cluster to execute the predefined workflow. Dask enables us to build up a graph of the computation that we want to perform and then executes it in parallel.
We start a cluster to enable distributed parallelism and connect to it to run the application by following the below steps:
- At first, we start
dask-scheduler
. - Second start the workers
dask-cuda-worker schedulerIP:schedulerPort
. - Finally, run the NVTabular application where the NVTabular
Workflow
has been initialized as described in the Multi-GPU Support section.
For Example: Perform SSH on a list of machines and set up a Dask cluster. We can make use of all three remote machines as workers as the scheduler will likely use far fewer resources.
import nvtabular as nvt
from dask.distributed import SSHCluster, Clientcluster = SSHCluster(["localhost", "MachineA", "MachineB", "MachineC"])
client = Client(cluster)
workflow = nvt.Workflow(features, client=client)
Other ways to deploy a “cluster” for Dask can be found in this article. The dask_cuda.LocalCUDACluster
API is typically used for a single machine with multiple GPUs.
How to load data into DL libraries using various data loaders of NVTabular
The data loading process while training pipelines with TensorFlow/PyTorch is another bottleneck in deep learning recommender systems. GPU doesn’t get fully utilized as the data loader is comparatively much slower in preparing the next batch. With NVTabular highly customized tabular data loader, for accelerating existing pipelines in TensorFlow, a speed-up by 9x is observed experimentally.
The main features of the data loader are:
- removal of the item-by-item data loading bottleneck
- asynchronously read data in batches directly into GPU memory instead of CPU-GPU communication
- Horovod multi-GPU training support
Use NVTabular data loader with TensorFlow Keras
import os
import glob# Set input data folder path
INPUT_DIR = os.environ.get(
"INPUT_DATA_DIR", os.path.expanduser("~/project/data/")
)# Define glob parquet data path
TRAINING_PATHS = sorted(glob.glob(os.path.join(INPUT_DIR,"train/*.parquet")))
The KerasSequenceLoader manages to shuffle by loading in chunks of data from different parts of the full dataset, concatenating them, and then shuffling, then iterating through this super-chunk sequentially in batches.
More information is given in this blogpost.
from nvtabular.loader.tensorflow import KerasSequenceLoader# Load data using tensorflow keras
train_tf = KerasSequenceLoader(
TRAINING_PATHS,
batch_size=BATCH_SIZE,
label_names=LABEL_COLUMN,
cat_names=CATEGORICAL_COLUMNS,
cont_names=NUMERIC_COLUMNS,
engine="parquet", # specifying the type of read engine to use
shuffle=True, # whether to shuffle chunks of batches
buffer_size=0.06, # (0,1), fraction of total GPU mem to occupy
parts_per_chunk=1,# Dataset partitions to load and concatenate
)
Similar to training(train_df) we can load the validation/test dataset.
Use NVTabular data loader with PyTorch
TorchAsyncItr class creates batches of tensors. Each batch size is specified by the user. The data input requires an NVTabular dataset. Handles spillover to ensure all batches are the specified size until the final batch. The class DLDataLoader is an extension of the torch data loader. It is required to support the FastAI framework.
Read more here
import torch
import nvtabular as nvt
from nvtabular.loader.torch import TorchAsyncItr, DLDataLoadertrain_dataset = TorchAsyncItr(
nvt.Dataset(TRAINING_PATHS),
batch_size=BATCH_SIZE,
cats=CATEGORICAL_COLUMNS + CATEGORICAL_MH_COLUMNS,
conts=NUMERIC_COLUMNS,
labels=LABEL_COLUMN,
)train_loader = DLDataLoader(
train_dataset,
batch_size=None,
collate_fn=lambda x: x,
pin_memory=False,
num_workers=0
)
Distributed NVTabular On Databricks
- Use the below docker image in the Databricks cluster or (OPTIONAL) pull it for editing
docker pull allxu/nvt-061:test
[Note: NVTabular is already present in the above docker image]
2. (OPTIONAL) Edit the docker file if you want to add new packages
3. (OPTIONAL) Build the docker image
docker build -t <USERNAME>/<REPONAME>:<TAG> .
[Note: For any dependency related error during build please refer to the above link to the docker file to install required packages]
4. (OPTIONAL) Push the docker image to our own docker hub and set the repository to the public
5. Start a Databricks cluster and use the docker image URL mentioned above and use the below configuration
Cluster configuration
6. Get the notebook from the below GitHub gist link and upload it to Databricks and attach the cluster created in step 5.
Criteo Spark Keras Training Example using Horovod
7. Clone or download the zip file containing sample data from the github link below
GitHub — Tauhait/Horovod_TestSample_Data: Criteo Spark Keras Training Example using Horovod
You can’t perform that action at this time. You signed in with another tab or window. You signed out in another tab or…
8. Set up Databricks CLI in your command line from the below link to upload sample data to Databricks File System
Databricks CLI
Run using the appropriate version of for your Python installation. To list the version of the CLI that is currently…
9. Follow the step by step process to upload the sample data to Databricks using dbfs from the below gist
Upload sample data using Databricks CLI
10. Run the notebook and check results
Pandas Vs NVTabular performance comparison in Databricks
Here we perform ETL along with Join and Lambda operations on the same data using Pandas and NVTabular in Databricks cluster (as in above step 5).
For all operations, we can see that there is a minimum 6x speedup using NVT compared to Pandas when we process huge data.
Here is the notebook which shows execution time comparison on every operation using the movielens dataset and code.
Horovod
Horovod is a distributed deep learning training framework for TensorFlow, Keras, PyTorch, and Apache MXNet. For faster distributed deep learning we can make use of Horovod.
Distributed NVTabular + Horovod on Spark
Follow the below steps to set up distributed NVTabular on Spark and also note that the demonstration is done based on DGX-2 machine(with V100 GPUs).
1. Download and create our own dockerfile from below
Dockerfile to run Spark on NVTabular
2. Build the docker image
docker build -t <USERNAME>/<REPONAME>:<TAG> .
3. Docker run
nvidia-docker run \
--network host \
--device /dev/infiniband \
--privileged \
-v /<PATH>:/data/parquet \
-it <USERNAME>/<REPONAME>:<TAG> bash
4. Start spark inside the docker container
cd /workspace
./start-spark.sh
5. Set up Notebook
SPARK_HOME= $PATH_TO_SPARK_HOME
SPARK_URL=spark://$SPARK_MASTER_IP:7077
export PYSPARK_DRIVER_PYTHON=jupyter
export PYSPARK_DRIVER_PYTHON_OPTS='notebook'$SPARK_HOME/bin/pyspark --master $SPARK_URL --deploy-mode client \
--driver-memory 20G \
--executor-memory 50G \
--executor-cores 6 \
--conf spark.cores.max=96 \
--conf spark.task.cpus=6 \
--conf spark.locality.wait=0 \
--conf spark.yarn.maxAppAttempts=1 \
--conf spark.sql.shuffle.partitions=4 \
--conf spark.sql.files.maxPartitionBytes=1024m \
--conf spark.sql.warehouse.dir=$OUT \
--conf spark.task.resource.gpu.amount=0.08 \
--conf spark.executor.resource.gpu.amount=1 \
--conf spark.executor.resource.gpu.discoveryScript=./getGpusResources.sh \
--files $SPARK_HOME/examples/src/main/scripts/getGpusResources.sh
To set up Spark refer to the following -
spark-env.sh
: configuration script
start-spark.sh
: launch Spark script
submit.sh
: commands used to submit the job