Dask is a free and open-source library for parallel computing in Python.
More specifically, we can use Dask to manipulate 100GB+ datasets on a single laptop or 1TB+ datasets on a workstation, and it can be easily scaled to thousand-node clusters for very large datasets.
It can use following convenient ways to installs Dask and all its common dependencies.
You can choose the default channel or conda-forge to install Dask if you are conda users, Anaconda or Miniconda.
conda install dask
Or
conda install dask -c conda-forge
For classical Python users, use pip
to install Dask and all its common dependencies.
conda install dask
Dask has three types of data structures: Dask Arrays, Dask Bags and Dask DateFrames. We will discuss the Dask DataFrames in this article due to the popularity and easy-to-use features of DataFrames.
Dask DataFrame is a collection of many Pandas's DataFrames, which operate similarly as Pandas' DataFrames and support a large subset of the Pandas API. However, unlike Pandas's DataFrame, Dask DataFrames not only can compute DataFrames of big data by using all the available cores of a single machine, but also can be stored and processed in parallel on the distributed remote machines.
We can easily create a Dask DataFrame from various data storage formats such as CSV, HDF, Apache Parquet, SQL, JSON, and more.
.csv
Files¶Similar to Pandas, we can read one or more .csv
files using read_csv
with Dask DataFrames. To practice the examples in this section, please download the dataset from Kaggle. If you are interested in how to download datasets from kaggle, please read this article or another article.
.csv
file¶Let's see how to read a single .csv
file. First, we need to import Dask DataFrame. Similar as import pandas as pd
to use pandas DataFrame, it just changes to import dask.dataframe as dd
to use Dask DataFrame.
# import pandas as pd
import dask.dataframe as dd
df = dd.read_csv('./data/yellow_tripdata_2019-01.csv')
Just like Pandas, we can display the first five rows as follows:
df.head()
VendorID | tpep_pickup_datetime | tpep_dropoff_datetime | passenger_count | trip_distance | RatecodeID | store_and_fwd_flag | PULocationID | DOLocationID | payment_type | fare_amount | extra | mta_tax | tip_amount | tolls_amount | improvement_surcharge | total_amount | congestion_surcharge | |
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
0 | 1 | 2019-01-01 00:46:40 | 2019-01-01 00:53:20 | 1 | 1.5 | 1 | N | 151 | 239 | 1 | 7.0 | 0.5 | 0.5 | 1.65 | 0.0 | 0.3 | 9.95 | NaN |
1 | 1 | 2019-01-01 00:59:47 | 2019-01-01 01:18:59 | 1 | 2.6 | 1 | N | 239 | 246 | 1 | 14.0 | 0.5 | 0.5 | 1.00 | 0.0 | 0.3 | 16.30 | NaN |
2 | 2 | 2018-12-21 13:48:30 | 2018-12-21 13:52:40 | 3 | 0.0 | 1 | N | 236 | 236 | 1 | 4.5 | 0.5 | 0.5 | 0.00 | 0.0 | 0.3 | 5.80 | NaN |
3 | 2 | 2018-11-28 15:52:25 | 2018-11-28 15:55:45 | 5 | 0.0 | 1 | N | 193 | 193 | 2 | 3.5 | 0.5 | 0.5 | 0.00 | 0.0 | 0.3 | 7.55 | NaN |
4 | 2 | 2018-11-28 15:56:57 | 2018-11-28 15:58:33 | 5 | 0.0 | 2 | N | 193 | 193 | 2 | 52.0 | 0.0 | 0.5 | 0.00 | 0.0 | 0.3 | 55.55 | NaN |
From the above output, we can see that the Dask DataFrame looks nothing different from Pandas DataFrame.
.csv
files¶In the last article, we have already experimented data loading speed with three .csv
files. The results showed that it took 48.634 seconds for Pandas to read these three files, while it just took 24.772 seconds for Modin with Dask and 15.785 seconds for Modin with Ray to load the same datasets.
In this following example, let's compare the loading performance or execution speed of Dask on the same files. Besides, unlike Pandas and Modin, Dask DataFrame provides much easier methods to read multiple data files. It just creates a reading data file list, and then read the list.
Here, we use %%time
magic in Jupyter notebook or JupyterLab to calculate the data loading speed. If you use other Python IEDs, you need to use Python time
module as we did in the last article.
%%time
data_list = ['./yellow_taxi_data/yellow_tripdata_2019-01.csv',
'./yellow_taxi_data/yellow_tripdata_2019-02.csv',
'./yellow_taxi_data/yellow_tripdata_2019-03.csv']
ddf1 = dd.read_csv(data_list)
Wall time: 23.8 ms
Or
ddf2 = dd.read_csv(['./yellow_taxi_data/yellow_tripdata_2019-01.csv',
'./yellow_taxi_data/yellow_tripdata_2019-02.csv',
'./yellow_taxi_data/yellow_tripdata_2019-03.csv'])
From the above result, we clearly see that Dask is extremely fast. It takes only 23.8 ms for Dask to read the same files that takes Pandas 48.634 seconds to load on my computer. Besides, Dask has much easier method to read multiple files.
In this example, we read all the*.csv
files in the yellow_taxi_data
folder.
df_full = dd.read_csv('./yellow_taxi_data/*.csv')
There are different types of files in the data
folder of my current working directory (CWD). For example, we read 18 .csv
dataset files with total size of 8.71 GB from yellow_tripdata_2019-01
to yellow_tripdata_2020-06
. Let's read it with Dask DataFrame and see its loading speed of these datasets.
%%time
ddf3 = dd.read_csv(['./data/yellow_tripdata_2019-*.csv',
'./data/yellow_tripdata_2020-*.csv'])
Wall time: 19 ms
It is unbelievable fast that Dask only needs 19 ms to read this larger dataset. However, my computer was frozen when using Pandas to read them. When I tried to use Modin, it showed many warnings and OSError: [Errno 28] No space left on device
.
We can easily break up a single large file with the blocksize
parameter. For example, we break the file into 25 MB chunks to read green_tripdata_2019-03m.csv
with 2.03 GB. This dataset is the combined version of the three csv files used in the Section 4.1 (2). The easy way to get this combined data file is that you use the method in the Section 5.1 below first to save the files to a single csv file.
The next question is What the chunk size we should split. According to Dask Document, a good rule is to keep the partitions less than 100 MB in size.
df_block = dd.read_csv('./data/green_tripdata_2019-01-03m.csv', blocksize=25e6)
df_block.head()
VendorID | tpep_pickup_datetime | tpep_dropoff_datetime | passenger_count | trip_distance | RatecodeID | store_and_fwd_flag | PULocationID | DOLocationID | payment_type | fare_amount | extra | mta_tax | tip_amount | tolls_amount | improvement_surcharge | total_amount | congestion_surcharge | |
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
0 | 1 | 2019-01-01 00:46:40 | 2019-01-01 00:53:20 | 1 | 1.5 | 1 | N | 151 | 239 | 1 | 7.0 | 0.5 | 0.5 | 1.65 | 0.0 | 0.3 | 9.95 | NaN |
1 | 1 | 2019-01-01 00:59:47 | 2019-01-01 01:18:59 | 1 | 2.6 | 1 | N | 239 | 246 | 1 | 14.0 | 0.5 | 0.5 | 1.00 | 0.0 | 0.3 | 16.30 | NaN |
2 | 2 | 2018-12-21 13:48:30 | 2018-12-21 13:52:40 | 3 | 0.0 | 1 | N | 236 | 236 | 1 | 4.5 | 0.5 | 0.5 | 0.00 | 0.0 | 0.3 | 5.80 | NaN |
3 | 2 | 2018-11-28 15:52:25 | 2018-11-28 15:55:45 | 5 | 0.0 | 1 | N | 193 | 193 | 2 | 3.5 | 0.5 | 0.5 | 0.00 | 0.0 | 0.3 | 7.55 | NaN |
4 | 2 | 2018-11-28 15:56:57 | 2018-11-28 15:58:33 | 5 | 0.0 | 2 | N | 193 | 193 | 2 | 52.0 | 0.0 | 0.5 | 0.00 | 0.0 | 0.3 | 55.55 | NaN |
csv
online¶Similar to Pandas, it can easily read csv
online, for example, from one of my GitHub repositories.
url = 'https://raw.githubusercontent.com/shoukewei/data/main/data-1dwt/930-data-export.csv'
ddf4 = dd.read_csv(url)
ddf4.head()
Region Code | Timestamp (Hour Ending) | CAL Demand (MWh) | CAR Demand (MWh) | CENT Demand (MWh) | FLA Demand (MWh) | MIDA Demand (MWh) | MIDW Demand (MWh) | NW Demand (MWh) | SE Demand (MWh) | SW Demand (MWh) | |
---|---|---|---|---|---|---|---|---|---|---|---|
0 | US48 | 8/9/2022 12 a.m. EDT | 47119 | 29809 | 38993 | 31602 | 112854 | 91266 | 52342 | 29482 | 18988 |
1 | US48 | 8/9/2022 1 a.m. EDT | 45732 | 27658 | 36487 | 29235 | 107920 | 86230 | 49108 | 27310 | 17922 |
2 | US48 | 8/9/2022 2 a.m. EDT | 43457 | 26187 | 34575 | 27383 | 103539 | 82753 | 45233 | 26329 | 16545 |
3 | US48 | 8/9/2022 3 a.m. EDT | 40527 | 25172 | 33135 | 26180 | 99702 | 79285 | 41682 | 25426 | 15300 |
4 | US48 | 8/9/2022 4 a.m. EDT | 37771 | 24397 | 32155 | 25423 | 97137 | 77026 | 39198 | 24759 | 14397 |
Apache Parquet is an open source, column-oriented data file format, which was designed for efficient data compression and encoding schemes with enhanced performance to handle complex data in bulk.
Similar as reading .csv
files, we can easily read one or more Parquet files using read_parquet()
. You can download one or more Parquet data files from the TLC DATA HUB to follow the example in this section.
ddf_p = dd.read_parquet('./data/green_tripdata_2022-11.parquet')
ddf_p.head()
VendorID | lpep_pickup_datetime | lpep_dropoff_datetime | store_and_fwd_flag | RatecodeID | PULocationID | DOLocationID | passenger_count | trip_distance | fare_amount | extra | mta_tax | tip_amount | tolls_amount | ehail_fee | improvement_surcharge | total_amount | payment_type | trip_type | congestion_surcharge | |
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
0 | 2 | 2022-11-01 00:28:37 | 2022-11-01 00:31:56 | N | 1.0 | 223 | 223 | 1.0 | 0.71 | 4.5 | 0.5 | 0.5 | 1.45 | 0.0 | None | 0.3 | 7.25 | 1.0 | 1.0 | 0.00 |
1 | 2 | 2022-11-01 00:51:02 | 2022-11-01 01:12:50 | N | 5.0 | 80 | 90 | 2.0 | 6.86 | 45.0 | 0.0 | 0.0 | 9.61 | 0.0 | None | 0.3 | 57.66 | 1.0 | 2.0 | 2.75 |
2 | 2 | 2022-11-01 00:51:50 | 2022-11-01 00:55:38 | N | 1.0 | 244 | 244 | 2.0 | 0.58 | 4.5 | 0.5 | 0.5 | 0.00 | 0.0 | None | 0.3 | 5.80 | 2.0 | 1.0 | 0.00 |
3 | 2 | 2022-11-01 00:03:32 | 2022-11-01 00:12:28 | N | 1.0 | 116 | 74 | 1.0 | 2.74 | 10.5 | 0.5 | 0.5 | 0.00 | 0.0 | None | 0.3 | 11.80 | 2.0 | 1.0 | 0.00 |
4 | 2 | 2022-11-01 00:17:46 | 2022-11-01 00:22:03 | N | 1.0 | 134 | 134 | 1.0 | 0.91 | 5.0 | 0.5 | 0.5 | 1.58 | 0.0 | None | 0.3 | 7.88 | 1.0 | 1.0 | 0.00 |
We can read only a few columns from a dataset to avoid unnecessary data loading for a large dataset. For example, we only read VendorID and trip_distance.
ddf_p2 = dd.read_parquet('./data/green_tripdata_2022-11.parquet',
columns=['VendorID', 'trip_distance'])
ddf_p2.head()
VendorID | trip_distance | |
---|---|---|
0 | 2 | 0.71 |
1 | 2 | 6.86 |
2 | 2 | 0.58 |
3 | 2 | 2.74 |
4 | 2 | 0.91 |
Hierarchical Data Format (HDF) is a set of file formats (HDF4, HDF5) designed by the National Center for Super- computing Applications (NCSA), which is used to store and organize large amounts of data.
To read HDF files into a Dask DataFrame, we simply use read_hdf
. If you have not HDF file, you might read Section 5.3 below to create one first and then come back to practice this section.
ddf_c = dd.read_hdf('./data/yellow_tripdata_2019-01.hdf5','/hdfdata')
ddf_c.head()
VendorID | tpep_pickup_datetime | tpep_dropoff_datetime | passenger_count | trip_distance | RatecodeID | store_and_fwd_flag | PULocationID | DOLocationID | payment_type | fare_amount | extra | mta_tax | tip_amount | tolls_amount | improvement_surcharge | total_amount | congestion_surcharge | |
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
0 | 1 | 2019-01-01 00:46:40 | 2019-01-01 00:53:20 | 1 | 1.5 | 1 | N | 151 | 239 | 1 | 7.0 | 0.5 | 0.5 | 1.65 | 0.0 | 0.3 | 9.95 | NaN |
1 | 1 | 2019-01-01 00:59:47 | 2019-01-01 01:18:59 | 1 | 2.6 | 1 | N | 239 | 246 | 1 | 14.0 | 0.5 | 0.5 | 1.00 | 0.0 | 0.3 | 16.30 | NaN |
2 | 2 | 2018-12-21 13:48:30 | 2018-12-21 13:52:40 | 3 | 0.0 | 1 | N | 236 | 236 | 1 | 4.5 | 0.5 | 0.5 | 0.00 | 0.0 | 0.3 | 5.80 | NaN |
3 | 2 | 2018-11-28 15:52:25 | 2018-11-28 15:55:45 | 5 | 0.0 | 1 | N | 193 | 193 | 2 | 3.5 | 0.5 | 0.5 | 0.00 | 0.0 | 0.3 | 7.55 | NaN |
4 | 2 | 2018-11-28 15:56:57 | 2018-11-28 15:58:33 | 5 | 0.0 | 2 | N | 193 | 193 | 2 | 52.0 | 0.0 | 0.5 | 0.00 | 0.0 | 0.3 | 55.55 | NaN |
If you are interested in more about other data format files that Dask DataFrames can read from, please refer to the Create and Store Dask DataFrames in the Dask Documents.
Oppositely, We can easily save/store the Dask DataFrame into different data format, such as CSV, Parquet, HDF, etc. Let's see few examples as follows.
For example, we save the DataFrame that we read from the three files above in Section 4.1 (2) into CSV files. Similar to Pandas, it uses to_csv
.
ddf2.to_csv('./data/green_tripdata_2019-01m-03m.csv',index=False)
['D:\\big_data\\data\\green_tripdata_2019-01m-03m.csv\\00.part', 'D:\\big_data\\data\\green_tripdata_2019-01m-03m.csv\\01.part', 'D:\\big_data\\data\\green_tripdata_2019-01m-03m.csv\\02.part', 'D:\\big_data\\data\\green_tripdata_2019-01m-03m.csv\\03.part', 'D:\\big_data\\data\\green_tripdata_2019-01m-03m.csv\\04.part', 'D:\\big_data\\data\\green_tripdata_2019-01m-03m.csv\\05.part', 'D:\\big_data\\data\\green_tripdata_2019-01m-03m.csv\\06.part', 'D:\\big_data\\data\\green_tripdata_2019-01m-03m.csv\\07.part', 'D:\\big_data\\data\\green_tripdata_2019-01m-03m.csv\\08.part', 'D:\\big_data\\data\\green_tripdata_2019-01m-03m.csv\\09.part', 'D:\\big_data\\data\\green_tripdata_2019-01m-03m.csv\\10.part', 'D:\\big_data\\data\\green_tripdata_2019-01m-03m.csv\\11.part', 'D:\\big_data\\data\\green_tripdata_2019-01m-03m.csv\\12.part', 'D:\\big_data\\data\\green_tripdata_2019-01m-03m.csv\\13.part', 'D:\\big_data\\data\\green_tripdata_2019-01m-03m.csv\\14.part', 'D:\\big_data\\data\\green_tripdata_2019-01m-03m.csv\\15.part', 'D:\\big_data\\data\\green_tripdata_2019-01m-03m.csv\\16.part', 'D:\\big_data\\data\\green_tripdata_2019-01m-03m.csv\\17.part', 'D:\\big_data\\data\\green_tripdata_2019-01m-03m.csv\\18.part', 'D:\\big_data\\data\\green_tripdata_2019-01m-03m.csv\\19.part', 'D:\\big_data\\data\\green_tripdata_2019-01m-03m.csv\\20.part', 'D:\\big_data\\data\\green_tripdata_2019-01m-03m.csv\\21.part', 'D:\\big_data\\data\\green_tripdata_2019-01m-03m.csv\\22.part', 'D:\\big_data\\data\\green_tripdata_2019-01m-03m.csv\\23.part', 'D:\\big_data\\data\\green_tripdata_2019-01m-03m.csv\\24.part', 'D:\\big_data\\data\\green_tripdata_2019-01m-03m.csv\\25.part', 'D:\\big_data\\data\\green_tripdata_2019-01m-03m.csv\\26.part', 'D:\\big_data\\data\\green_tripdata_2019-01m-03m.csv\\27.part', 'D:\\big_data\\data\\green_tripdata_2019-01m-03m.csv\\28.part', 'D:\\big_data\\data\\green_tripdata_2019-01m-03m.csv\\29.part', 'D:\\big_data\\data\\green_tripdata_2019-01m-03m.csv\\30.part']
Whereas, unlike Pandas, it automatically creates a folder named with the saved file's name and save the file into several partitions into the folder. In this example, it divides the CSV file into 30 parts, and save them into the folder of green_tripdata_2019-01m-03m
in the CWD.
If you want to save the file into one single file, specify argument single_file=True
. You should delete the above saved files and its folder. If you want to keep them, you have to use a new variable name for this signal file, say green_tripdata_2019-01-03m.csv
in the following example.
ddf2.to_csv('./data/green_tripdata_2019-01-03m.csv',index=False,single_file=True)
['D:\\big_data\\data\\green_tripdata_2019-01-03m.csv']
Similarly, we can easily save DataFrame into one or more Parquet files using to_parquet
.
ddf2.to_parquet('./data/yellow_tripdata_2019-01.parquet')
The example is to show how to conveniently save DataFrame into HDF files. Similar to Pandas, a parameter key
has to be specified for HDF file. For Pandas and Dask to_hdf
, the key
parameter is the name of the object stored in the HDF file, because it can store multiple objects (dataframes) in a single HDF file and each object should have a name.
ddf2.to_hdf('./data/yellow_tripdata_2019-01.hdf5','/hdfdata')
['./data/yellow_tripdata_2019-01.hdf5', './data/yellow_tripdata_2019-01.hdf5', './data/yellow_tripdata_2019-01.hdf5', './data/yellow_tripdata_2019-01.hdf5', './data/yellow_tripdata_2019-01.hdf5', './data/yellow_tripdata_2019-01.hdf5', './data/yellow_tripdata_2019-01.hdf5', './data/yellow_tripdata_2019-01.hdf5', './data/yellow_tripdata_2019-01.hdf5', './data/yellow_tripdata_2019-01.hdf5', './data/yellow_tripdata_2019-01.hdf5', './data/yellow_tripdata_2019-01.hdf5', './data/yellow_tripdata_2019-01.hdf5', './data/yellow_tripdata_2019-01.hdf5', './data/yellow_tripdata_2019-01.hdf5', './data/yellow_tripdata_2019-01.hdf5', './data/yellow_tripdata_2019-01.hdf5', './data/yellow_tripdata_2019-01.hdf5', './data/yellow_tripdata_2019-01.hdf5', './data/yellow_tripdata_2019-01.hdf5', './data/yellow_tripdata_2019-01.hdf5', './data/yellow_tripdata_2019-01.hdf5', './data/yellow_tripdata_2019-01.hdf5', './data/yellow_tripdata_2019-01.hdf5', './data/yellow_tripdata_2019-01.hdf5', './data/yellow_tripdata_2019-01.hdf5', './data/yellow_tripdata_2019-01.hdf5', './data/yellow_tripdata_2019-01.hdf5', './data/yellow_tripdata_2019-01.hdf5', './data/yellow_tripdata_2019-01.hdf5', './data/yellow_tripdata_2019-01.hdf5']
If you are interested in more other data files for storing Dask DataFrames, please refer to the Create and Store Dask DataFrames in the Dask Documents.
This article introduces generally about the Dask, and uses specific examples to display how to use Dask DateFrame to read and store DataFrames in place of Pandas when the datasets become large. The comparison results of reading CSV files confirm that Dask is extremely fast (in million seconds) to read the large dataset that Pandas and Modin are hard to handle. Furthermore, the examples also show that Dask has very similar and easy to use API as Pandas. In many cases, Dask has much easier method than Pandas, for example reading multiple files that was used in this article.