In [1]:
import time
import dask.dataframe as dd
from dask.distributed import Client

## Start Dask Scheduler and Workers

In [2]:
# Create a Dask scheduler and a number of Dask workers. 
# If no arguments are specified then it will autodetect the number of CPU cores your system has 
# and the amount of memory and create workers to appropriately fill that.
# It will also start the Dask Dashboard which is useful to 
# visualize the state of your cluster and computations.

client = Client()
client.restart()



0,1
Connection method: Cluster object,Cluster type: distributed.LocalCluster
Dashboard: http://127.0.0.1:8787/status,

0,1
Dashboard: http://127.0.0.1:8787/status,Workers: 4
Total threads: 4,Total memory: 15.62 GiB
Status: running,Using processes: True

0,1
Comm: tcp://127.0.0.1:37113,Workers: 4
Dashboard: http://127.0.0.1:8787/status,Total threads: 4
Started: Just now,Total memory: 15.62 GiB

0,1
Comm: tcp://127.0.0.1:45535,Total threads: 1
Dashboard: http://127.0.0.1:43185/status,Memory: 3.91 GiB
Nanny: tcp://127.0.0.1:34625,
Local directory: /tmp/dask-worker-space/worker-zhmjn6se,Local directory: /tmp/dask-worker-space/worker-zhmjn6se

0,1
Comm: tcp://127.0.0.1:44207,Total threads: 1
Dashboard: http://127.0.0.1:40325/status,Memory: 3.91 GiB
Nanny: tcp://127.0.0.1:39325,
Local directory: /tmp/dask-worker-space/worker-tsr0itf1,Local directory: /tmp/dask-worker-space/worker-tsr0itf1

0,1
Comm: tcp://127.0.0.1:42029,Total threads: 1
Dashboard: http://127.0.0.1:35501/status,Memory: 3.91 GiB
Nanny: tcp://127.0.0.1:44389,
Local directory: /tmp/dask-worker-space/worker-r83z_24l,Local directory: /tmp/dask-worker-space/worker-r83z_24l

0,1
Comm: tcp://127.0.0.1:43807,Total threads: 1
Dashboard: http://127.0.0.1:40331/status,Memory: 3.91 GiB
Nanny: tcp://127.0.0.1:44079,
Local directory: /tmp/dask-worker-space/worker-7fr9za74,Local directory: /tmp/dask-worker-space/worker-7fr9za74


## Read CSV

In [3]:
# Read CSV file: Dask doesn't load the data immediately. Dask has just read a few rows 
# at the start of the file, and inferred the column names and dtypes.

user_reviews_ddf = dd.read_csv('demo_data.csv')
user_reviews_ddf

Unnamed: 0_level_0,reviewerID,asin,reviewerName,helpful,reviewText,overall,summary,unixReviewTime,reviewTime
npartitions=55,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1
,object,object,object,object,object,float64,object,float64,object
,...,...,...,...,...,...,...,...,...
...,...,...,...,...,...,...,...,...,...
,...,...,...,...,...,...,...,...,...
,...,...,...,...,...,...,...,...,...


In [4]:
# visualize the task graph

user_reviews_ddf.visualize()

CytoscapeWidget(cytoscape_layout={'name': 'dagre', 'rankDir': 'BT', 'nodeSep': 10, 'edgeSep': 10, 'spacingFact…

## Mean computation

In [5]:
# Dask operations are evaluated lazily: Dask constructs the task graph of the 
# computation immediately but “evaluates” them only when necessary.

mean_graph = user_reviews_ddf["overall"].mean()

In [6]:
# visualize task graph

mean_graph.visualize()

CytoscapeWidget(cytoscape_layout={'name': 'dagre', 'rankDir': 'BT', 'nodeSep': 10, 'edgeSep': 10, 'spacingFact…

In [7]:
# trigger computation to calculate mean of column

result = mean_graph.compute()
print(f"mean of 'overall' attribute is {result}")

mean of 'overall' attribute is 4.163392099728361


## Implicit compute() for len(), head()

In [8]:
# Some functions like len and head also trigger a computation implicitly

user_reviews_ddf.head()

Unnamed: 0,reviewerID,asin,reviewerName,helpful,reviewText,overall,summary,unixReviewTime,reviewTime
0,A2T0RJ91B0PQ03,B0016CRVLW,Gerald DeWitt,"[0, 0]",Beware! This is NOT the original single versi...,1.0,Poor Quality Alternate Take,1400630000.0,"05 21, 2014"
1,A3TYW0XA8HSGWB,B00EKR5S0Q,Linda E. Larson,"[0, 0]",This is my new most favorite k-cup coffee. I c...,5.0,Vanilla Starbucks K-cups,1398557000.0,"04 27, 2014"
2,A2CME0TQU2IVVB,B001AUPJVO,L5Momma,"[1, 1]",This headset is great! It worked in our 2007 ...,5.0,Awesome!,1355875000.0,"12 19, 2012"
3,A2E5IDLX7R388S,B000055Y57,Jeff Andersen,"[0, 0]",Scofield is one of my favorite musicians and i...,5.0,Straight ahead Jazz with the Scofield twist,1402358000.0,"06 10, 2014"
4,A3CIEMYUGV6ZMR,0545265355,Adroit,"[0, 0]","Wonderful book! I cried, well teared up at a f...",5.0,Greatest Book Ever!!!,1334102000.0,"04 11, 2012"


In [10]:
# Calling len() will: 
# - load actual data, (that is, load data into multiple pandas DataFrames) 
# - find length of each pandas DataFrame (also known as a partition) 
# - combine the subtotals to give you the final grand total

len(user_reviews_ddf)

6158168

## Inspecting Individual Partitions

In [11]:
# Internally, a Dask DataFrame is split into many partitions, where each partition is one Pandas DataFrame. 

num_partitions = user_reviews_ddf.npartitions
print(f"Total number of partitions = {num_partitions}")

Total number of partitions = 55


In [12]:
partition1 = user_reviews_ddf.partitions[0].compute()
partition1

Unnamed: 0,reviewerID,asin,reviewerName,helpful,reviewText,overall,summary,unixReviewTime,reviewTime
0,A2T0RJ91B0PQ03,B0016CRVLW,Gerald DeWitt,"[0, 0]",Beware! This is NOT the original single versi...,1.0,Poor Quality Alternate Take,1.400630e+09,"05 21, 2014"
1,A3TYW0XA8HSGWB,B00EKR5S0Q,Linda E. Larson,"[0, 0]",This is my new most favorite k-cup coffee. I c...,5.0,Vanilla Starbucks K-cups,1.398557e+09,"04 27, 2014"
2,A2CME0TQU2IVVB,B001AUPJVO,L5Momma,"[1, 1]",This headset is great! It worked in our 2007 ...,5.0,Awesome!,1.355875e+09,"12 19, 2012"
3,A2E5IDLX7R388S,B000055Y57,Jeff Andersen,"[0, 0]",Scofield is one of my favorite musicians and i...,5.0,Straight ahead Jazz with the Scofield twist,1.402358e+09,"06 10, 2014"
4,A3CIEMYUGV6ZMR,0545265355,Adroit,"[0, 0]","Wonderful book! I cried, well teared up at a f...",5.0,Greatest Book Ever!!!,1.334102e+09,"04 11, 2012"
...,...,...,...,...,...,...,...,...,...
111952,A1N2ZAC86P26BF,6303823351,David,"[1, 1]",Help! is probably my favorite of the Beatles m...,4.0,The best of the Beatles films,9.982656e+08,"08 20, 2001"
111953,AUFN1J7VJZL83,B002OHE20G,Amanda Banks,"[0, 0]",This heater has worked out very well for a sma...,5.0,"eliable, SAFE Heat",1.402099e+09,"06 7, 2014"
111954,AGZK126DNQ2FN,1401340970,"Cy B. Hilterman ""Cy. Hilterman""","[2, 2]",As a person that has made many trips to Niagar...,5.0,Romance and adventure in the Niagara Falls area,1.250035e+09,"08 12, 2009"
111955,A1LA51JOIGGD45,1400071550,E.A. West,"[0, 0]",The battle between good and evil continues in ...,5.0,Heroic battle between good and evil,1.366589e+09,"04 22, 2013"


In [13]:
# The number of partitions is often automatically determined based on available 
# physical memory and the number of cores, but can also be manually specified.

user_reviews_repart_ddf = user_reviews_ddf.repartition(npartitions=10)  
user_reviews_repart_ddf.npartitions

10

## Map-Partition

In [14]:
# Apply Python function on each DataFrame partition.
# Here we apply a function with arguments and keywords to a DataFrame, resulting in a Series.

def myadd(df, a, b=1):
    return df.overall + a + b

res = user_reviews_ddf.map_partitions(myadd, 1, b=0)

In [15]:
res.compute()

0         2.0
1         6.0
2         6.0
3         6.0
4         6.0
         ... 
112342    6.0
112343    6.0
112344    6.0
112345    6.0
112346    5.0
Name: overall, Length: 6158168, dtype: float64

In [16]:
# Here we apply a function to a Series resulting in a Series

res = user_reviews_ddf.overall.map_partitions(lambda x: x+1)
df_new = user_reviews_ddf.assign(new_col=res)
df_new.head()

Unnamed: 0,reviewerID,asin,reviewerName,helpful,reviewText,overall,summary,unixReviewTime,reviewTime,new_col
0,A2T0RJ91B0PQ03,B0016CRVLW,Gerald DeWitt,"[0, 0]",Beware! This is NOT the original single versi...,1.0,Poor Quality Alternate Take,1400630000.0,"05 21, 2014",2.0
1,A3TYW0XA8HSGWB,B00EKR5S0Q,Linda E. Larson,"[0, 0]",This is my new most favorite k-cup coffee. I c...,5.0,Vanilla Starbucks K-cups,1398557000.0,"04 27, 2014",6.0
2,A2CME0TQU2IVVB,B001AUPJVO,L5Momma,"[1, 1]",This headset is great! It worked in our 2007 ...,5.0,Awesome!,1355875000.0,"12 19, 2012",6.0
3,A2E5IDLX7R388S,B000055Y57,Jeff Andersen,"[0, 0]",Scofield is one of my favorite musicians and i...,5.0,Straight ahead Jazz with the Scofield twist,1402358000.0,"06 10, 2014",6.0
4,A3CIEMYUGV6ZMR,0545265355,Adroit,"[0, 0]","Wonderful book! I cried, well teared up at a f...",5.0,Greatest Book Ever!!!,1334102000.0,"04 11, 2012",6.0


## Groupby

In [17]:
# The groupby() operation groups data by an attribute and performs operations on these groups
# Here we are computing the average rating of each ASIN.

# IMPORTANT:

# 1) The sort parameter sorts the group keys and is set to True by default, 
# We can get better performance by turning this off.

# 2) Dask assumes that the groupby reduction returns an object that is small enough to fit into memory (DRAM).
# By default, groupby methods return an object with only 1 partition. 
# If the returned object is large (such as the case when there are a large number of groups), it can result in a memory error.

product_means = user_reviews_ddf.groupby('asin', sort=False).overall.mean()
product_means.npartitions

1

In [18]:
# If the returned object is large (such as the case when there are a large number of groups), we can increase 
# the number of output partitions using the split_out argument.

product_means = user_reviews_ddf.groupby('asin', sort=False).overall.mean(split_out=4)
product_means.npartitions

4

In [21]:
res = product_means.compute()

In [22]:
res.head()

asin
0002115751    4.000000
0002226049    3.000000
0006492460    5.000000
0006514006    4.306818
0006540686    4.000000
Name: overall, dtype: float64

## Calling compute() On Related Operations Allows for Task Sharing

In [20]:
%%time
s1 = user_reviews_ddf.groupby("asin").overall.sum().compute()
s2 = user_reviews_ddf.groupby("asin").overall.mean().compute()

CPU times: user 7.51 s, sys: 1.39 s, total: 8.9 s
Wall time: 46.1 s


In [21]:
%%time
s1 = user_reviews_ddf.groupby("asin").overall.sum()
s2 = user_reviews_ddf.groupby("asin").overall.mean()

out = dd.compute(s1,s2)

CPU times: user 6.11 s, sys: 1.34 s, total: 7.45 s
Wall time: 36.3 s


In [22]:
print(out[0], out[1])

asin
0000031887      93.0
000100039X     239.0
0002007770    1828.0
0002115751       4.0
0002178559       8.0
               ...  
B00LHZ093U       5.0
B00LK80AYM       5.0
B00LKJU5XW       2.0
B00LMBWAP4       5.0
B00LS14LLY       4.0
Name: overall, Length: 2306233, dtype: float64 asin
0000031887    4.428571
000100039X    4.509434
0002007770    4.383693
0002115751    4.000000
0002178559    4.000000
                ...   
B00LHZ093U    5.000000
B00LK80AYM    5.000000
B00LKJU5XW    2.000000
B00LMBWAP4    5.000000
B00LS14LLY    4.000000
Name: overall, Length: 2306233, dtype: float64


## Do not call compute() multiple times

In [20]:
start = time.time()

agg_1 = user_reviews_ddf.groupby('asin').overall.sum()
agg_1.name = 'c1'
agg_2 = user_reviews_ddf.groupby('asin').overall.mean()
agg_2.name = 'c2'
final = dd.concat([agg_1, agg_2], axis=1)
submit = final.describe().compute()

end = time.time()
runtime = end-start

print(submit)
print(runtime)

We're assuming that the indices of each dataframes are 
 aligned. This assumption is not generally safe.


                 c1            c2
count  2.306233e+06  2.306233e+06
mean   1.111721e+01  4.152129e+00
std    4.247923e+01  1.149007e+00
min    1.000000e+00  1.000000e+00
25%    4.000000e+00  4.000000e+00
50%    5.000000e+00  4.666667e+00
75%    9.000000e+00  5.000000e+00
max    8.345000e+03  5.000000e+00
25.86827325820923


In [21]:
start = time.time()

agg_1 = user_reviews_ddf.groupby('asin').overall.sum().compute()
agg_1.name = 'c1'
agg_2 = user_reviews_ddf.groupby('asin').overall.mean().compute()
agg_2.name = 'c2'
final = dd.concat([agg_1, agg_2], axis=1)
submit = final.describe().compute()

end = time.time()
runtime = end-start

print(submit)
print(runtime)

                 c1            c2
count  2.306233e+06  2.306233e+06
mean   1.111721e+01  4.152129e+00
std    4.247923e+01  1.149007e+00
min    1.000000e+00  1.000000e+00
25%    4.000000e+00  4.000000e+00
50%    5.000000e+00  4.666667e+00
75%    9.000000e+00  5.000000e+00
max    8.345000e+03  5.000000e+00
57.539384603500366
