Gregor von Laszewski (laszewski@gmail.com)
This the multi-page printable view of this section. Click here to print.
Other
- 1: Word Count with Parallel Python
- 2: NumPy
- 3: Scipy
- 4: Scikit-learn
- 5: Dask - Random Forest Feature Detection
- 6: Parallel Computing in Python
- 7: Dask
1 - Word Count with Parallel Python
Gregor von Laszewski (laszewski@gmail.com)
We will demonstrate Python’s multiprocessing
API for parallel
computation by writing a program that counts how many times each word in
a collection of documents appear.
Generating a Document Collection
Before we begin, let us write a script that will generate document collections by specifying the number of documents and the number of words per document. This will make benchmarking straightforward.
To keep it simple, the vocabulary of the document collection will consist of random numbers rather than the words of an actual language:
'''Usage: generate_nums.py [-h] NUM_LISTS INTS_PER_LIST MIN_INT MAX_INT DEST_DIR
Generate random lists of integers and save them
as 1.txt, 2.txt, etc.
Arguments:
NUM_LISTS The number of lists to create.
INTS_PER_LIST The number of integers in each list.
MIN_NUM Each generated integer will be >= MIN_NUM.
MAX_NUM Each generated integer will be <= MAX_NUM.
DEST_DIR A directory where the generated numbers will be stored.
Options:
-h --help
'''
import os, random, logging
from docopt import docopt
def generate_random_lists(num_lists,
ints_per_list, min_int, max_int):
return [[random.randint(min_int, max_int) \
for i in range(ints_per_list)] for i in range(num_lists)]
if __name__ == '__main__':
args = docopt(__doc__)
num_lists, ints_per_list, min_int, max_int, dest_dir = [
int(args['NUM_LISTS']),
int(args['INTS_PER_LIST']),
int(args['MIN_INT']),
int(args['MAX_INT']),
args['DEST_DIR']
]
if not os.path.exists(dest_dir):
os.makedirs(dest_dir)
lists = generate_random_lists(num_lists,
ints_per_list,
min_int,
max_int)
curr_list = 1
for lst in lists:
with open(os.path.join(dest_dir, '%d.txt' % curr_list), 'w') as f:
f.write(os.linesep.join(map(str, lst)))
curr_list += 1
logging.debug('Numbers written.')
Notice that we are using the docopt module that you should be familiar with from the Section [Python DocOpts](#s-python-docopts} to make the script easy to run from the command line.
You can generate a document collection with this script as follows:
python generate_nums.py 1000 10000 0 100 docs-1000-10000
Serial Implementation
A first serial implementation of wordcount is straightforward:
'''Usage: wordcount.py [-h] DATA_DIR
Read a collection of .txt documents and count how many times each word
appears in the collection.
Arguments:
DATA_DIR A directory with documents (.txt files).
Options:
-h --help
'''
import os, glob, logging
from docopt import docopt
logging.basicConfig(level=logging.DEBUG)
def wordcount(files):
counts = {}
for filepath in files:
with open(filepath, 'r') as f:
words = [word.strip() for word in f.read().split()]
for word in words:
if word not in counts:
counts[word] = 0
counts[word] += 1
return counts
if __name__ == '__main__':
args = docopt(__doc__)
if not os.path.exists(args['DATA_DIR']):
raise ValueError('Invalid data directory: %s' % args['DATA_DIR'])
counts = wordcount(glob.glob(os.path.join(args['DATA_DIR'], '*.txt')))
logging.debug(counts)
Serial Implementation Using map and reduce
We can improve the serial implementation in anticipation of parallelizing
the program by making use of Python’s map
and reduce
functions.
In short, you can use map
to apply the same function to the members of
a collection. For example, to convert a list of numbers to strings, you
could do:
import random
nums = [random.randint(1, 2) for _ in range(10)]
print(nums)
[2, 1, 1, 1, 2, 2, 2, 2, 2, 2]
print(map(str, nums))
['2', '1', '1', '1', '2', '2', '2', '2', '2', '2']
We can use reduce to apply the same function cumulatively to the items
of a sequence. For example, to find the total of the numbers in our
list, we could use reduce
as follows:
def add(x, y):
return x + y
print(reduce(add, nums))
17
We can simplify this even more by using a lambda function:
print(reduce(lambda x, y: x + y, nums))
17
You can read more about Python’s lambda function in the docs.
With this in mind, we can reimplement the wordcount example as follows:
'''Usage: wordcount_mapreduce.py [-h] DATA_DIR
Read a collection of .txt documents and count how
many times each word
appears in the collection.
Arguments:
DATA_DIR A directory with documents (.txt files).
Options:
-h --help
'''
import os, glob, logging
from docopt import docopt
logging.basicConfig(level=logging.DEBUG)
def count_words(filepath):
counts = {}
with open(filepath, 'r') as f:
words = [word.strip() for word in f.read().split()]
for word in words:
if word not in counts:
counts[word] = 0
counts[word] += 1
return counts
def merge_counts(counts1, counts2):
for word, count in counts2.items():
if word not in counts1:
counts1[word] = 0
counts1[word] += counts2[word]
return counts1
if __name__ == '__main__':
args = docopt(__doc__)
if not os.path.exists(args['DATA_DIR']):
raise ValueError('Invalid data directory: %s' % args['DATA_DIR'])
per_doc_counts = map(count_words,
glob.glob(os.path.join(args['DATA_DIR'],
'*.txt')))
counts = reduce(merge_counts, [{}] + per_doc_counts)
logging.debug(counts)
Parallel Implementation
Drawing on the previous implementation using map
and reduce
, we can
parallelize the implementation using Python’s multiprocessing
API:
'''Usage: wordcount_mapreduce_parallel.py [-h] DATA_DIR NUM_PROCESSES
Read a collection of .txt documents and count, in parallel, how many
times each word appears in the collection.
Arguments:
DATA_DIR A directory with documents (.txt files).
NUM_PROCESSES The number of parallel processes to use.
Options:
-h --help
'''
import os, glob, logging
from docopt import docopt
from wordcount_mapreduce import count_words, merge_counts
from multiprocessing import Pool
logging.basicConfig(level=logging.DEBUG)
if __name__ == '__main__':
args = docopt(__doc__)
if not os.path.exists(args['DATA_DIR']):
raise ValueError('Invalid data directory: %s' % args['DATA_DIR'])
num_processes = int(args['NUM_PROCESSES'])
pool = Pool(processes=num_processes)
per_doc_counts = pool.map(count_words,
glob.glob(os.path.join(args['DATA_DIR'],
'*.txt')))
counts = reduce(merge_counts, [{}] + per_doc_counts)
logging.debug(counts)
Benchmarking
To time each of the examples, enter it into its own Python file
and use Linux’s time
command:
$ time python wordcount.py docs-1000-10000
The output contains the real run time and the user run time. real is wall clock time - time from start to finish of the call. user is the amount of CPU time spent in user-mode code (outside the kernel) within the process, that is, only actual CPU time used in executing the process.
Excersises
E.python.wordcount.1:
Run the three different programs (serial, serial w/ map and reduce, parallel) and answer the following questions:
- Is there any performance difference between the different versions of the program?
- Does user time significantly differ from real time for any of the versions of the program?
- Experiment with different numbers of processes for the parallel example, starting with 1. What is the performance gain when you goal from 1 to 2 processes? From 2 to 3? When do you stop seeing improvement? (this will depend on your machine architecture)
References
2 - NumPy
Gregor von Laszewski (laszewski@gmail.com)
NumPy is a popular library that is used by many other Python packages such as Pandas, SciPy, and scikit-learn. It provides a fast, simple-to-use way of interacting with numerical data organized in vectors and matrices. In this section, we will provide a short introduction to NumPy.
Installing NumPy
The most common way of installing NumPy, if it wasn’t included with your Python installation, is to install it via pip:
$ pip install numpy
If NumPy has already been installed, you can update to the most recent version using:
$ pip install -U numpy
You can verify that NumPy is installed by trying to use it in a Python program:
import numpy as np
Note that, by convention, we import NumPy using the alias ‘np’ - whenever you see ‘np’ sprinkled in example Python code, it’s a good bet that it is using NumPy.
NumPy Basics
At its core, NumPy is a container for n-dimensional data. Typically, 1-dimensional data is called an array and 2-dimensional data is called a matrix. Beyond 2-dimensions would be considered a multidimensional array. Examples, where you’ll encounter these dimensions, may include:
- 1 Dimensional: time-series data such as audio, stock prices, or a single observation in a dataset.
- 2 Dimensional: connectivity data between network nodes, user-product recommendations, and database tables.
- 3+ Dimensional: network latency between nodes over time, video (RGB+time), and version-controlled datasets.
All of these data can be placed into NumPy’s array object, just with varying dimensions.
Data Types: The Basic Building Blocks
Before we delve into arrays and matrices, we will start with the most basic element of those: a single value. NumPy can represent data utilizing many different standard datatypes such as uint8 (an 8-bit usigned integer), float64 (a 64-bit float), or str (a string). An exhaustive listing can be found at:
Before moving on, it is important to know about the tradeoff made when using different datatypes. For example, a uint8 can only contain values between 0 and 255. This, however, contrasts with float64 which can express any value from +/- 1.80e+308. So why wouldn’t we just always use float64s? Though they allow us to be more expressive in terms of numbers, they also consume more memory. If we were working with a 12-megapixel image, for example, storing that image using uint8 values would require 3000 * 4000 * 8 = 96 million bits, or 91.55 MB of memory. If we were to store the same image utilizing float64, our image would consume 8 times as much memory: 768 million bits or 732.42 MB. It is important to use the right data type for the job to avoid consuming unnecessary resources or slowing down processing.
Finally, while NumPy will conveniently convert between datatypes, one must be aware of overflows when using smaller data types. For example:
a = np.array([6], dtype=np.uint8)
print(a)
>>>[6]
a = a + np.array([7], dtype=np.uint8)
print(a)
>>>[13]
a = a + np.array([245], dtype=np.uint8)
print(a)
>>>[2]
In this example, it makes sense that 6+7=13. But how does 13+245=2? Put simply, the object type (uint8) simply ran out of space to store the value and wrapped back around to the beginning. An 8-bit number is only capable of storing 2^8, or 256, unique values. An operation that results in a value above that range will ‘overflow’ and cause the value to wrap back around to zero. Likewise, anything below that range will ‘underflow’ and wrap back around to the end. In our example, 13+245 became 258, which was too large to store in 8 bits and wrapped back around to 0 and ended up at 2.
NumPy will, generally, try to avoid this situation by dynamically retyping to whatever datatype will support the result:
a = a + 260
print(test)
>>>[262]
Here, our addition caused our array, ‘a,’ to be upscaled to use uint16 instead of uint8. Finally, NumPy offers convenience functions akin to Python’s range() function to create arrays of sequential numbers:
X = np.arange(0.2,1,.1)
print(X)
>>>array([0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9], dtype=float32)
We can use this function to also generate parameters spaces that can be iterated on:
P = 10.0 ** np.arange(-7,1,1)
print(P)
for x,p in zip(X,P):
print('%f, %f' % (x, p))
Arrays: Stringing Things Together
With our knowledge of datatypes in hand, we can begin to explore arrays. Simply put, arrays can be thought of as a sequence of values (not neccesarily numbers). Arrays are 1 dimensional and can be created and accessed simply:
a = np.array([1, 2, 3])
print(type(a))
>>><class 'numpy.ndarray'>
print(a)
>>>[1 2 3]
print(a.shape)
>>>(3,)
a[0]
>>>1
Arrays (and, later, matrices) are zero-indexed. This makes it convenient when, for example, using Python’s range() function to iterate through an array:
for i in range(3):
print(a[i])
>>>1
>>>2
>>>3
Arrays are, also, mutable and can be changed easily:
a[0] = 42
print(a)
>>>array([42, 2, 3])
NumPy also includes incredibly powerful broadcasting features. This makes it very simple to perform mathematical operations on arrays that also makes intuitive sense:
a * 3
>>>array([3, 6, 9])
a**2
>>>array([1, 4, 9], dtype=int32)
Arrays can also interact with other arrays:
b = np.array([2, 3, 4])
print(a * b)
>>>array([ 2, 6, 12])
In this example, the result of multiplying together two arrays is to take the element-wise product while multiplying by a constant will multiply each element in the array by that constant. NumPy supports all of the basic mathematical operations: addition, subtraction, multiplication, division, and powers. It also includes an extensive suite of mathematical functions, such as log() and max(), which are covered later.
Matrices: An Array of Arrays
Matrices can be thought of as an extension of arrays - rather than having one dimension, matrices have 2 (or more). Much like arrays, matrices can be created easily within NumPy:
m = np.array([[1, 2], [3, 4]])
print(m)
>>>[[1 2]
>>> [3 4]]
Accessing individual elements is similar to how we did it for arrays. We simply need to pass in a number of arguments equal to the number of dimensions:
m[1][0]
>>>3
In this example, our first index selected the row and the second selected the column - giving us our result of 3. Matrices can be extending out to any number of dimensions by simply using more indices to access specific elements (though use-cases beyond 4 may be somewhat rare).
Matrices support all of the normal mathematial functions such as +, -, *, and /. A special note: the * operator will result in an element-wise multiplication. Using @ or np.matmul() for matrix multiplication:
print(m-m)
print(m*m)
print(m/m)
More complex mathematical functions can typically be found within the NumPy library itself:
print(np.sin(x))
print(np.sum(x))
A full listing can be found at: https://docs.scipy.org/doc/numpy/reference/routines.math.html
Slicing Arrays and Matrices
As one can imagine, accessing elements one-at-a-time is both slow and can potentially require many lines of code to iterate over every dimension in the matrix. Thankfully, NumPy incorporate a very powerful slicing engine that allows us to access ranges of elements easily:
m[1, :]
>>>array([3, 4])
The ‘:’ value tells NumPy to select all elements in the given dimension. Here, we’ve requested all elements in the first row. We can also use indexing to request elements within a given range:
a = np.arange(0, 10, 1)
print(a)
>>>[0 1 2 3 4 5 6 7 8 9]
a[4:8]
>>>array([4, 5, 6, 7])
Here, we asked NumPy to give us elements 4 through 7 (ranges in Python are inclusive at the start and non-inclusive at the end). We can even go backwards:
a[-5:]
>>>array([5, 6, 7, 8, 9])
In the previous example, the negative value is asking NumPy to return the last 5 elements of the array. Had the argument been ‘:-5,’ NumPy would’ve returned everything BUT the last five elements:
a[:-5]
>>>array([0, 1, 2, 3, 4])
Becoming more familiar with NumPy’s accessor conventions will allow you write more efficient, clearer code as it is easier to read a simple one-line accessor than it is a multi-line, nested loop when extracting values from an array or matrix.
Useful Functions
The NumPy library provides several convenient mathematical functions that users can use. These functions provide several advantages to code written by users:
- They are open source typically have multiple contributors checking for errors.
- Many of them utilize a C interface and will run much faster than native Python code.
- They’re written to very flexible.
NumPy arrays and matrices contain many useful aggregating functions such as max(), min(), mean(), etc These functions are usually able to run an order of magnitude faster than looping through the object, so it’s important to understand what functions are available to avoid ‘reinventing the wheel.’ In addition, many of the functions are able to sum or average across axes, which make them extremely useful if your data has inherent grouping. To return to a previous example:
m = np.array([[1, 2], [3, 4]])
print(m)
>>>[[1 2]
>>> [3 4]]
m.sum()
>>>10
m.sum(axis=1)
>>>[3, 7]
m.sum(axis=0)
>>>[4, 6]
In this example, we created a 2x2 matrix containing the numbers 1 through 4. The sum of the matrix returned the element-wise addition of the entire matrix. Summing across axis 0 (rows) returned a new array with the element-wise addition across each row. Likewise, summing across axis 1 (columns) returned the columnar summation.
Linear Algebra
Perhaps one of the most important uses for NumPy is its robust support for Linear Algebra functions. Like the aggregation functions described in the previous section, these functions are optimized to be much faster than user implementations and can utilize processesor level features to provide very quick computations. These functions can be accessed very easily from the NumPy package:
a = np.array([[1, 2], [3, 4]])
b = np.array([[5, 6], [7, 8]])
print(np.matmul(a, b))
>>>[[19 22]
[43 50]]
Included in within np.linalg are functions for calculating the Eigendecomposition of square matrices and symmetric matrices. Finally, to give a quick example of how easy it is to implement algorithms in NumPy, we can easily use it to calculate the cost and gradient when using simple Mean-Squared-Error (MSE):
cost = np.power(Y - np.matmul(X, weights)), 2).mean(axis=1)
gradient = np.matmul(X.T, np.matmul(X, weights) - y)
Finally, more advanced functions are easily available to users via the linalg library of NumPy as:
from numpy import linalg
A = np.diag((1,2,3))
w,v = linalg.eig(A)
print ('w =', w)
print ('v =', v)
NumPy Resources
3 - Scipy
Gregor von Laszewski (laszewski@gmail.com)
SciPy is a library built around NumPy and has a number of off-the-shelf algorithms and operations implemented. These include algorithms from calculus (such as integration), statistics, linear algebra, image-processing, signal processing, machine learning.
To achieve this, SciPy bundles a number of useful open-source software for mathematics, science, and engineering. It includes the following packages:
- NumPy,
-
for managing N-dimensional arrays
- SciPy library,
-
to access fundamental scientific computing capabilities
- Matplotlib,
-
to conduct 2D plotting
- IPython,
-
for an Interactive console (see jupyter)
- Sympy,
-
for symbolic mathematics
- pandas,
-
for providing data structures and analysis
Introduction
First, we add the usual scientific computing modules with the typical abbreviations, including sp for scipy. We could invoke scipy’s statistical package as sp.stats, but for the sake of laziness, we abbreviate that too.
import numpy as np # import numpy
import scipy as sp # import scipy
from scipy import stats # refer directly to stats rather than sp.stats
import matplotlib as mpl # for visualization
from matplotlib import pyplot as plt # refer directly to pyplot
# rather than mpl.pyplot
Now we create some random data to play with. We generate 100 samples from a Gaussian distribution centered at zero.
s = sp.randn(100)
How many elements are in the set?
print ('There are',len(s),'elements in the set')
What is the mean (average) of the set?
print ('The mean of the set is',s.mean())
What is the minimum of the set?
print ('The minimum of the set is',s.min())
What is the maximum of the set?
print ('The maximum of the set is',s.max())
We can use the scipy functions too. What’s the median?
print ('The median of the set is',sp.median(s))
What about the standard deviation and variance?
print ('The standard deviation is',sp.std(s),
'and the variance is',sp.var(s))
Isn’t the variance the square of the standard deviation?
print ('The square of the standard deviation is',sp.std(s)**2)
How close are the measures? The differences are close as the following calculation shows
print ('The difference is',abs(sp.std(s)**2 - sp.var(s)))
print ('And in decimal form, the difference is %0.16f' %
(abs(sp.std(s)**2 - sp.var(s))))
How does this look as a histogram? See Figure 1, Figure 2, Figure 3
plt.hist(s) # yes, one line of code for a histogram
plt.show()
Figure 1: Histogram 1
Let us add some titles.
plt.clf() # clear out the previous plot
plt.hist(s)
plt.title("Histogram Example")
plt.xlabel("Value")
plt.ylabel("Frequency")
plt.show()
Figure 2: Histogram 2
Typically we do not include titles when we prepare images for inclusion in LaTeX. There we use the caption to describe what the figure is about.
plt.clf() # clear out the previous plot
plt.hist(s)
plt.xlabel("Value")
plt.ylabel("Frequency")
plt.show()
Figure 3: Histogram 3
Let us try out some linear regression or curve fitting. See @#fig:scipy-output_30_0
import random
def F(x):
return 2*x - 2
def add_noise(x):
return x + random.uniform(-1,1)
X = range(0,10,1)
Y = []
for i in range(len(X)):
Y.append(add_noise(X[i]))
plt.clf() # clear out the old figure
plt.plot(X,Y,'.')
plt.show()
Figure 4: Result 1
Now let’s try linear regression to fit the curve.
m, b, r, p, est_std_err = stats.linregress(X,Y)
What is the slope and y-intercept of the fitted curve?
print ('The slope is',m,'and the y-intercept is', b)
def Fprime(x): # the fitted curve
return m*x + b
Now let’s see how well the curve fits the data. We’ll call the fitted curve F'.
X = range(0,10,1)
Yprime = []
for i in range(len(X)):
Yprime.append(Fprime(X[i]))
plt.clf() # clear out the old figure
# the observed points, blue dots
plt.plot(X, Y, '.', label='observed points')
# the interpolated curve, connected red line
plt.plot(X, Yprime, 'r-', label='estimated points')
plt.title("Linear Regression Example") # title
plt.xlabel("x") # horizontal axis title
plt.ylabel("y") # vertical axis title
# legend labels to plot
plt.legend(['obsered points', 'estimated points'])
# comment out so that you can save the figure
#plt.show()
To save images into a PDF file for inclusion into LaTeX documents you
can save the images as follows. Other formats such as png are also
possible, but the quality is naturally not sufficient for inclusion in
papers and documents. For that, you certainly want to use PDF. The save
of the figure has to occur before you use the show()
command. See Figure 5
plt.savefig("regression.pdf", bbox_inches='tight')
plt.savefig('regression.png')
plt.show()
Figure 5: Result 2
References
For more information about SciPy we recommend that you visit the following link
https://www.scipy.org/getting-started.html#learning-to-work-with-scipy
Additional material and inspiration for this section are from
- [] “Getting Started guide” https://www.scipy.org/getting-started.html
[![No
- [] Prasanth. “Simple statistics with SciPy.” Comfort at 1 AU. February
[![No 28, 2011. https://oneau.wordpress.com/2011/02/28/simple-statistics-with-scipy/.
- [] SciPy Cookbook. Lasted updated: 2015.
[![No http://scipy-cookbook.readthedocs.io/.
create bibtex entries
No
4 - Scikit-learn
Gregor von Laszewski (laszewski@gmail.com)
Learning Objectives
- Exploratory data analysis
- Pipeline to prepare data
- Full learning pipeline
- Fine tune the model
- Significance tests
Introduction to Scikit-learn
Scikit learn is a Machine Learning specific library used in Python. Library can be used for data mining and analysis. It is built on top of NumPy, matplotlib and SciPy. Scikit Learn features Dimensionality reduction, clustering, regression and classification algorithms. It also features model selection using grid search, cross validation and metrics.
Scikit learn also enables users to preprocess the data which can then be used for machine learning using modules like preprocessing and feature extraction.
In this section we demonstrate how simple it is to use k-means in scikit learn.
Installation
If you already have a working installation of numpy and scipy, the easiest way to install scikit-learn is using pip
$ pip install numpy
$ pip install scipy -U
$ pip install -U scikit-learn
Supervised Learning
Supervised Learning is used in machine learning when we already know a set of output predictions based on input characteristics and based on that we need to predict the target for a new input. Training data is used to train the model which then can be used to predict the output from a bounded set.
Problems can be of two types
- Classification : Training data belongs to three or four classes/categories and based on the label we want to predict the class/category for the unlabeled data.
- Regression : Training data consists of vectors without any corresponding target values. Clustering can be used for these type of datasets to determine discover groups of similar examples. Another way is density estimation which determine the distribution of data within the input space. Histogram is the most basic form.
Unsupervised Learning
Unsupervised Learning is used in machine learning when we have the training set available but without any corresponding target. The outcome of the problem is to discover groups within the provided input. It can be done in many ways.
Few of them are listed here
- Clustering : Discover groups of similar characteristics.
- Density Estimation : Finding the distribution of data within the provided input or changing the data from a high dimensional space to two or three dimension.
Building a end to end pipeline for Supervised machine learning using Scikit-learn
A data pipeline is a set of processing components that are sequenced to produce meaningful data. Pipelines are commonly used in Machine learning, since there is lot of data transformation and manipulation that needs to be applied to make data useful for machine learning. All components are sequenced in a way that the output of one component becomes input for the next and each of the component is self contained. Components interact with each other using data.
Even if a component breaks, the downstream component can run normally using the last output. Sklearn provide the ability to build pipelines that can be transformed and modeled for machine learning.
Steps for developing a machine learning model
- Explore the domain space
- Extract the problem definition
- Get the data that can be used to make the system learn to solve the problem definition.
- Discover and Visualize the data to gain insights
- Feature engineering and prepare the data
- Fine tune your model
- Evaluate your solution using metrics
- Once proven launch and maintain the model.
Exploratory Data Analysis
Example project = Fraud detection system
First step is to load the data into a dataframe in order for a proper analysis to be done on the attributes.
data = pd.read_csv('dataset/data_file.csv')
data.head()
Perform the basic analysis on the data shape and null value information.
print(data.shape)
print(data.info())
data.isnull().values.any()
Here is the example of few of the visual data analysis methods.
Bar plot
A bar chart or graph is a graph with rectangular bars or bins that are used to plot categorical values. Each bar in the graph represents a categorical variable and the height of the bar is proportional to the value represented by it.
Bar graphs are used:
To make comparisons between variables To visualize any trend in the data, i.e., they show the dependence of one variable on another Estimate values of a variable
plt.ylabel('Transactions')
plt.xlabel('Type')
data.type.value_counts().plot.bar()
Figure 1: Example of scikit-learn barplots
Correlation between attributes
Attributes in a dataset can be related based on differnt aspects.
Examples include attributes dependent on another or could be loosely or tightly coupled. Also example includes two variables can be associated with a third one.
In order to understand the relationship between attributes, correlation represents the best visual way to get an insight. Positive correlation meaning both attributes moving into the same direction. Negative correlation refers to opposte directions. One attributes values increase results in value decrease for other. Zero correlation is when the attributes are unrelated.
# compute the correlation matrix
corr = data.corr()
# generate a mask for the lower triangle
mask = np.zeros_like(corr, dtype=np.bool)
mask[np.triu_indices_from(mask)] = True
# set up the matplotlib figure
f, ax = plt.subplots(figsize=(18, 18))
# generate a custom diverging color map
cmap = sns.diverging_palette(220, 10, as_cmap=True)
# draw the heatmap with the mask and correct aspect ratio
sns.heatmap(corr, mask=mask, cmap=cmap, vmax=.3,
square=True,
linewidths=.5, cbar_kws={"shrink": .5}, ax=ax);
Figure 2: scikit-learn correlation array
Histogram Analysis of dataset attributes
A histogram consists of a set of counts that represent the number of times some event occurred.
%matplotlib inline
data.hist(bins=30, figsize=(20,15))
plt.show()
Figure 3: scikit-learn
Box plot Analysis
Box plot analysis is useful in detecting whether a distribution is skewed and detect outliers in the data.
fig, axs = plt.subplots(2, 2, figsize=(10, 10))
tmp = data.loc[(data.type == 'TRANSFER'), :]
a = sns.boxplot(x = 'isFlaggedFraud', y = 'amount', data = tmp, ax=axs[0][0])
axs[0][0].set_yscale('log')
b = sns.boxplot(x = 'isFlaggedFraud', y = 'oldbalanceDest', data = tmp, ax=axs[0][1])
axs[0][1].set(ylim=(0, 0.5e8))
c = sns.boxplot(x = 'isFlaggedFraud', y = 'oldbalanceOrg', data=tmp, ax=axs[1][0])
axs[1][0].set(ylim=(0, 3e7))
d = sns.regplot(x = 'oldbalanceOrg', y = 'amount', data=tmp.loc[(tmp.isFlaggedFraud ==1), :], ax=axs[1][1])
plt.show()
Figure 4: scikit-learn
Scatter plot Analysis
The scatter plot displays values of two numerical variables as Cartesian coordinates.
plt.figure(figsize=(12,8))
sns.pairplot(data[['amount', 'oldbalanceOrg', 'oldbalanceDest', 'isFraud']], hue='isFraud')
Figure 5: scikit-learn scatter plots
Data Cleansing - Removing Outliers
If the transaction amount is lower than 5 percent of the all the transactions AND does not exceed USD 3000, we will exclude it from our analysis to reduce Type 1 costs If the transaction amount is higher than 95 percent of all the transactions AND exceeds USD 500000, we will exclude it from our analysis, and use a blanket review process for such transactions (similar to isFlaggedFraud column in original dataset) to reduce Type 2 costs
low_exclude = np.round(np.minimum(fin_samp_data.amount.quantile(0.05), 3000), 2)
high_exclude = np.round(np.maximum(fin_samp_data.amount.quantile(0.95), 500000), 2)
###Updating Data to exclude records prone to Type 1 and Type 2 costs
low_data = fin_samp_data[fin_samp_data.amount > low_exclude]
data = low_data[low_data.amount < high_exclude]
Pipeline Creation
Machine learning pipeline is used to help automate machine learning workflows. They operate by enabling a sequence of data to be transformed and correlated together in a model that can be tested and evaluated to achieve an outcome, whether positive or negative.
Defining DataFrameSelector to separate Numerical and Categorical attributes
Sample function to seperate out Numerical and categorical attributes.
from sklearn.base import BaseEstimator, TransformerMixin
# Create a class to select numerical or categorical columns
# since Scikit-Learn doesn't handle DataFrames yet
class DataFrameSelector(BaseEstimator, TransformerMixin):
def __init__(self, attribute_names):
self.attribute_names = attribute_names
def fit(self, X, y=None):
return self
def transform(self, X):
return X[self.attribute_names].values
Feature Creation / Additional Feature Engineering
During EDA we identified that there are transactions where the balances do not tally after the transaction is completed.We believe this could potentially be cases where fraud is occurring. To account for this error in the transactions, we define two new features"errorBalanceOrig" and “errorBalanceDest,” calculated by adjusting the amount with the before and after balances for the Originator and Destination accounts.
Below, we create a function that allows us to create these features in a pipeline.
from sklearn.base import BaseEstimator, TransformerMixin
# column index
amount_ix, oldbalanceOrg_ix, newbalanceOrig_ix, oldbalanceDest_ix, newbalanceDest_ix = 0, 1, 2, 3, 4
class CombinedAttributesAdder(BaseEstimator, TransformerMixin):
def __init__(self): # no *args or **kargs
pass
def fit(self, X, y=None):
return self # nothing else to do
def transform(self, X, y=None):
errorBalanceOrig = X[:,newbalanceOrig_ix] + X[:,amount_ix] - X[:,oldbalanceOrg_ix]
errorBalanceDest = X[:,oldbalanceDest_ix] + X[:,amount_ix]- X[:,newbalanceDest_ix]
return np.c_[X, errorBalanceOrig, errorBalanceDest]
Creating Training and Testing datasets
Training set includes the set of input examples that the model will be fit into or trained on by adjusting the parameters. Testing dataset is critical to test the generalizability of the model . By using this set, we can get the working accuracy of our model.
Testing set should not be exposed to model unless model training has not been completed. This way the results from testing will be more reliable.
from sklearn.model_selection import train_test_split
X_train, X_test, y_train, y_test = train_test_split(X,y,test_size=0.30, random_state=42, stratify=y)
Creating pipeline for numerical and categorical attributes
Identifying columns with Numerical and Categorical characteristics.
X_train_num = X_train[["amount","oldbalanceOrg", "newbalanceOrig", "oldbalanceDest", "newbalanceDest"]]
X_train_cat = X_train[["type"]]
X_model_col = ["amount","oldbalanceOrg", "newbalanceOrig", "oldbalanceDest", "newbalanceDest","type"]
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler
from sklearn.preprocessing import Imputer
num_attribs = list(X_train_num)
cat_attribs = list(X_train_cat)
num_pipeline = Pipeline([
('selector', DataFrameSelector(num_attribs)),
('attribs_adder', CombinedAttributesAdder()),
('std_scaler', StandardScaler())
])
cat_pipeline = Pipeline([
('selector', DataFrameSelector(cat_attribs)),
('cat_encoder', CategoricalEncoder(encoding="onehot-dense"))
])
Selecting the algorithm to be applied
Algorithim selection primarily depends on the objective you are trying to solve and what kind of dataset is available. There are differnt type of algorithms which can be applied and we will look into few of them here.
Linear Regression
This algorithm can be applied when you want to compute some continuous value. To predict some future value of a process which is currently running, you can go with regression algorithm.
Examples where linear regression can used are :
- Predict the time taken to go from one place to another
- Predict the sales for a future month
- Predict sales data and improve yearly projections.
from sklearn.linear_model import LinearRegression
from sklearn.preprocessing import StandardScaler
import time
scl= StandardScaler()
X_train_std = scl.fit_transform(X_train)
X_test_std = scl.transform(X_test)
start = time.time()
lin_reg = LinearRegression()
lin_reg.fit(X_train_std, y_train) #SKLearn's linear regression
y_train_pred = lin_reg.predict(X_train_std)
train_time = time.time()-start
Logistic Regression
This algorithm can be used to perform binary classification. It can be used if you want a probabilistic framework. Also in case you expect to receive more training data in the future that you want to be able to quickly incorporate into your model.
- Customer churn prediction.
- Credit Scoring & Fraud Detection which is our example problem which we are trying to solve in this chapter.
- Calculating the effectiveness of marketing campaigns.
from sklearn.linear_model import LogisticRegression
from sklearn.model_selection import train_test_split
X_train, _, y_train, _ = train_test_split(X_train, y_train, stratify=y_train, train_size=subsample_rate, random_state=42)
X_test, _, y_test, _ = train_test_split(X_test, y_test, stratify=y_test, train_size=subsample_rate, random_state=42)
model_lr_sklearn = LogisticRegression(multi_class="multinomial", C=1e6, solver="sag", max_iter=15)
model_lr_sklearn.fit(X_train, y_train)
y_pred_test = model_lr_sklearn.predict(X_test)
acc = accuracy_score(y_test, y_pred_test)
results.loc[len(results)] = ["LR Sklearn", np.round(acc, 3)]
results
Decision trees
Decision trees handle feature interactions and they’re non-parametric. Doesnt support online learning and the entire tree needs to be rebuild when new traning dataset comes in. Memory consumption is very high.
Can be used for the following cases
- Investment decisions
- Customer churn
- Banks loan defaulters
- Build vs Buy decisions
- Sales lead qualifications
from sklearn.tree import DecisionTreeRegressor
dt = DecisionTreeRegressor()
start = time.time()
dt.fit(X_train_std, y_train)
y_train_pred = dt.predict(X_train_std)
train_time = time.time() - start
start = time.time()
y_test_pred = dt.predict(X_test_std)
test_time = time.time() - start
K Means
This algorithm is used when we are not aware of the labels and one needs to be created based on the features of objects. Example will be to divide a group of people into differnt subgroups based on common theme or attribute.
The main disadvantage of K-mean is that you need to know exactly the number of clusters or groups which is required. It takes a lot of iteration to come up with the best K.
from sklearn.neighbors import KNeighborsClassifier
from sklearn.model_selection import train_test_split, GridSearchCV, PredefinedSplit
from sklearn.metrics import accuracy_score
X_train, _, y_train, _ = train_test_split(X_train, y_train, stratify=y_train, train_size=subsample_rate, random_state=42)
X_test, _, y_test, _ = train_test_split(X_test, y_test, stratify=y_test, train_size=subsample_rate, random_state=42)
model_knn_sklearn = KNeighborsClassifier(n_jobs=-1)
model_knn_sklearn.fit(X_train, y_train)
y_pred_test = model_knn_sklearn.predict(X_test)
acc = accuracy_score(y_test, y_pred_test)
results.loc[len(results)] = ["KNN Arbitary Sklearn", np.round(acc, 3)]
results
Support Vector Machines
SVM is a supervised ML technique and used for pattern recognition and classification problems when your data has exactly two classes. Its popular in text classification problems.
Few cases where SVM can be used is
- Detecting persons with common diseases.
- Hand-written character recognition
- Text categorization
- Stock market price prediction
Naive Bayes
Naive Bayes is used for large datasets.This algoritm works well even when we have a limited CPU and memory available. This works by calculating bunch of counts. It requires less training data. The algorthim cant learn interation between features.
Naive Bayes can be used in real-world applications such as:
- Sentiment analysis and text classification
- Recommendation systems like Netflix, Amazon
- To mark an email as spam or not spam
- Face recognition
Random Forest
Ranmdon forest is similar to Decision tree. Can be used for both regression and classification problems with large data sets.
Few case where it can be applied.
- Predict patients for high risks.
- Predict parts failures in manufacturing.
- Predict loan defaulters.
from sklearn.ensemble import RandomForestRegressor
forest = RandomForestRegressor(n_estimators = 400, criterion='mse',random_state=1, n_jobs=-1)
start = time.time()
forest.fit(X_train_std, y_train)
y_train_pred = forest.predict(X_train_std)
train_time = time.time() - start
start = time.time()
y_test_pred = forest.predict(X_test_std)
test_time = time.time() - start
Neural networks
Neural network works based on weights of connections between neurons. Weights are trained and based on that the neural network can be utilized to predict the class or a quantity. They are resource and memory intensive.
Few cases where it can be applied.
- Applied to unsupervised learning tasks, such as feature extraction.
- Extracts features from raw images or speech with much less human intervention
Deep Learning using Keras
Keras is most powerful and easy-to-use Python libraries for developing and evaluating deep learning models. It has the efficient numerical computation libraries Theano and TensorFlow.
XGBoost
XGBoost stands for eXtreme Gradient Boosting. XGBoost is an implementation of gradient boosted decision trees designed for speed and performance. It is engineered for efficiency of compute time and memory resources.
Scikit Cheat Sheet
Scikit learning has put a very indepth and well explained flow chart to help you choose the right algorithm that I find very handy.
Figure 6: scikit-learn
Parameter Optimization
Machine learning models are parameterized so that their behavior can be tuned for a given problem. These models can have many parameters and finding the best combination of parameters can be treated as a search problem.
A parameter is a configurationthat is part of the model and values can be derived from the given data.
- Required by the model when making predictions.
- Values define the skill of the model on your problem.
- Estimated or learned from data.
- Often not set manually by the practitioner.
- Often saved as part of the learned model.
Hyperparameter optimization/tuning algorithms
Grid search is an approach to hyperparameter tuning that will methodically build and evaluate a model for each combination of algorithm parameters specified in a grid.
Random search provide a statistical distribution for each hyperparameter from which values may be randomly sampled.
Experiments with Keras (deep learning), XGBoost, and SVM (SVC) compared to Logistic Regression(Baseline)
Creating a parameter grid
grid_param = [
[{ #LogisticRegression
'model__penalty':['l1','l2'],
'model__C': [0.01, 1.0, 100]
}],
[{#keras
'model__optimizer': optimizer,
'model__loss': loss
}],
[{ #SVM
'model__C' :[0.01, 1.0, 100],
'model__gamma': [0.5, 1],
'model__max_iter':[-1]
}],
[{ #XGBClassifier
'model__min_child_weight': [1, 3, 5],
'model__gamma': [0.5],
'model__subsample': [0.6, 0.8],
'model__colsample_bytree': [0.6],
'model__max_depth': [3]
}]
]
Implementing Grid search with models and also creating metrics from each of the model.
Pipeline(memory=None,
steps=[('preparation', FeatureUnion(n_jobs=None,
transformer_list=[('num_pipeline', Pipeline(memory=None,
steps=[('selector', DataFrameSelector(attribute_names=['amount', 'oldbalanceOrg', 'newbalanceOrig', 'oldbalanceDest', 'newbalanceDest'])), ('attribs_adder', CombinedAttributesAdder()...penalty='l2', random_state=None, solver='warn',
tol=0.0001, verbose=0, warm_start=False))])
from sklearn.metrics import mean_squared_error
from sklearn.metrics import classification_report
from sklearn.metrics import f1_score
from xgboost.sklearn import XGBClassifier
from sklearn.svm import SVC
test_scores = []
#Machine Learning Algorithm (MLA) Selection and Initialization
MLA = [
linear_model.LogisticRegression(),
keras_model,
SVC(),
XGBClassifier()
]
#create table to compare MLA metrics
MLA_columns = ['Name', 'Score', 'Accuracy_Score','ROC_AUC_score','final_rmse','Classification_error','Recall_Score','Precision_Score', 'mean_test_score', 'mean_fit_time', 'F1_Score']
MLA_compare = pd.DataFrame(columns = MLA_columns)
Model_Scores = pd.DataFrame(columns = ['Name','Score'])
row_index = 0
for alg in MLA:
#set name and parameters
MLA_name = alg.__class__.__name__
MLA_compare.loc[row_index, 'Name'] = MLA_name
#MLA_compare.loc[row_index, 'Parameters'] = str(alg.get_params())
full_pipeline_with_predictor = Pipeline([
("preparation", full_pipeline), # combination of numerical and categorical pipelines
("model", alg)
])
grid_search = GridSearchCV(full_pipeline_with_predictor, grid_param[row_index], cv=4, verbose=2, scoring='f1', return_train_score=True)
grid_search.fit(X_train[X_model_col], y_train)
y_pred = grid_search.predict(X_test)
MLA_compare.loc[row_index, 'Accuracy_Score'] = np.round(accuracy_score(y_pred, y_test), 3)
MLA_compare.loc[row_index, 'ROC_AUC_score'] = np.round(metrics.roc_auc_score(y_test, y_pred),3)
MLA_compare.loc[row_index,'Score'] = np.round(grid_search.score(X_test, y_test),3)
negative_mse = grid_search.best_score_
scores = np.sqrt(-negative_mse)
final_mse = mean_squared_error(y_test, y_pred)
final_rmse = np.sqrt(final_mse)
MLA_compare.loc[row_index, 'final_rmse'] = final_rmse
confusion_matrix_var = confusion_matrix(y_test, y_pred)
TP = confusion_matrix_var[1, 1]
TN = confusion_matrix_var[0, 0]
FP = confusion_matrix_var[0, 1]
FN = confusion_matrix_var[1, 0]
MLA_compare.loc[row_index,'Classification_error'] = np.round(((FP + FN) / float(TP + TN + FP + FN)), 5)
MLA_compare.loc[row_index,'Recall_Score'] = np.round(metrics.recall_score(y_test, y_pred), 5)
MLA_compare.loc[row_index,'Precision_Score'] = np.round(metrics.precision_score(y_test, y_pred), 5)
MLA_compare.loc[row_index,'F1_Score'] = np.round(f1_score(y_test,y_pred), 5)
MLA_compare.loc[row_index, 'mean_test_score'] = grid_search.cv_results_['mean_test_score'].mean()
MLA_compare.loc[row_index, 'mean_fit_time'] = grid_search.cv_results_['mean_fit_time'].mean()
Model_Scores.loc[row_index,'MLA Name'] = MLA_name
Model_Scores.loc[row_index,'ML Score'] = np.round(metrics.roc_auc_score(y_test, y_pred),3)
#Collect Mean Test scores for statistical significance test
test_scores.append(grid_search.cv_results_['mean_test_score'])
row_index+=1
Results table from the Model evaluation with metrics.
Figure 7: scikit-learn
ROC AUC Score
AUC - ROC curve is a performance measurement for classification problem at various thresholds settings. ROC is a probability curve and AUC represents degree or measure of separability. It tells how much model is capable of distinguishing between classes. Higher the AUC, better the model is at predicting 0s as 0s and 1s as 1s.
Figure 8: scikit-learn
Figure 9: scikit-learn
K-means in scikit learn.
Import
K-means Algorithm
In this section we demonstrate how simple it is to use k-means in scikit learn.
Import
from time import time
import numpy as np
import matplotlib.pyplot as plt
from sklearn import metrics
from sklearn.cluster import KMeans
from sklearn.datasets import load_digits
from sklearn.decomposition import PCA
from sklearn.preprocessing import scale
Create samples
np.random.seed(42)
digits = load_digits()
data = scale(digits.data)
Create samples
np.random.seed(42)
digits = load_digits()
data = scale(digits.data)
n_samples, n_features = data.shape
n_digits = len(np.unique(digits.target))
labels = digits.target
sample_size = 300
print("n_digits: %d, \t n_samples %d, \t n_features %d" % (n_digits, n_samples, n_features))
print(79 * '_')
print('% 9s' % 'init' ' time inertia homo compl v-meas ARI AMI silhouette')
print("n_digits: %d, \t n_samples %d, \t n_features %d"
% (n_digits, n_samples, n_features))
print(79 * '_')
print('% 9s' % 'init'
' time inertia homo compl v-meas ARI AMI silhouette')
def bench_k_means(estimator, name, data):
t0 = time()
estimator.fit(data)
print('% 9s %.2fs %i %.3f %.3f %.3f %.3f %.3f %.3f'
% (name, (time() - t0), estimator.inertia_,
metrics.homogeneity_score(labels, estimator.labels_),
metrics.completeness_score(labels, estimator.labels_),
metrics.v_measure_score(labels, estimator.labels_),
metrics.adjusted_rand_score(labels, estimator.labels_),
metrics.adjusted_mutual_info_score(labels, estimator.labels_),
metrics.silhouette_score(data, estimator.labels_,metric='euclidean',sample_size=sample_size)))
bench_k_means(KMeans(init='k-means++', n_clusters=n_digits, n_init=10), name="k-means++", data=data)
bench_k_means(KMeans(init='random', n_clusters=n_digits, n_init=10), name="random", data=data)
metrics.silhouette_score(data, estimator.labels_,
metric='euclidean',
sample_size=sample_size)))
bench_k_means(KMeans(init='k-means++', n_clusters=n_digits, n_init=10),
name="k-means++", data=data)
bench_k_means(KMeans(init='random', n_clusters=n_digits, n_init=10),
name="random", data=data)
# in this case the seeding of the centers is deterministic, hence we run the
# kmeans algorithm only once with n_init=1
pca = PCA(n_components=n_digits).fit(data)
bench_k_means(KMeans(init=pca.components_,n_clusters=n_digits, n_init=1),name="PCA-based", data=data)
print(79 * '_')
Visualize
See Figure 10
bench_k_means(KMeans(init=pca.components_,
n_clusters=n_digits, n_init=1),
name="PCA-based",
data=data)
print(79 * '_')
Visualize
See Figure 10
reduced_data = PCA(n_components=2).fit_transform(data)
kmeans = KMeans(init='k-means++', n_clusters=n_digits, n_init=10)
kmeans.fit(reduced_data)
# Step size of the mesh. Decrease to increase the quality of the VQ.
h = .02 # point in the mesh [x_min, x_max]x[y_min, y_max].
# Plot the decision boundary. For that, we will assign a color to each
x_min, x_max = reduced_data[:, 0].min() - 1, reduced_data[:, 0].max() + 1
y_min, y_max = reduced_data[:, 1].min() - 1, reduced_data[:, 1].max() + 1
xx, yy = np.meshgrid(np.arange(x_min, x_max, h), np.arange(y_min, y_max, h))
# Obtain labels for each point in mesh. Use last trained model.
Z = kmeans.predict(np.c_[xx.ravel(), yy.ravel()])
# Put the result into a color plot
Z = Z.reshape(xx.shape)
plt.figure(1)
plt.clf()
plt.imshow(Z, interpolation='nearest',
extent=(xx.min(), xx.max(), yy.min(), yy.max()),
cmap=plt.cm.Paired,
aspect='auto', origin='lower')
plt.plot(reduced_data[:, 0], reduced_data[:, 1], 'k.', markersize=2)
# Plot the centroids as a white X
centroids = kmeans.cluster_centers_
plt.scatter(centroids[:, 0], centroids[:, 1],
marker='x', s=169, linewidths=3,
color='w', zorder=10)
plt.title('K-means clustering on the digits dataset (PCA-reduced data)\n'
'Centroids are marked with white cross')
plt.xlim(x_min, x_max)
plt.ylim(y_min, y_max)
plt.xticks(())
plt.yticks(())
plt.show()
Figure 10: Result
5 - Dask - Random Forest Feature Detection
Gregor von Laszewski (laszewski@gmail.com)
Setup
First we need our tools. pandas gives us the DataFrame, very similar to R’s DataFrames. The DataFrame is a structure that allows us to work with our data more easily. It has nice features for slicing and transformation of data, and easy ways to do basic statistics.
numpy has some very handy functions that work on DataFrames.
Dataset
We are using a dataset about the wine quality dataset, archived at UCI’s Machine Learning Repository (http://archive.ics.uci.edu/ml/index.php).
import pandas as pd
import numpy as np
Now we will load our data. pandas makes it easy!
# red wine quality data, packed in a DataFrame
red_df = pd.read_csv('winequality-red.csv',sep=';',header=0, index_col=False)
# white wine quality data, packed in a DataFrame
white_df = pd.read_csv('winequality-white.csv',sep=';',header=0,index_col=False)
# rose? other fruit wines? plum wine? :(
Like in R, there is a .describe() method that gives basic statistics for every column in the dataset.
# for red wines
red_df.describe()
<style>
.dataframe thead tr:only-child th {
text-align: right;
}
.dataframe thead th {
text-align: left;
}
.dataframe tbody tr th {
vertical-align: top;
}
</style>
<table border="1" class="dataframe">
<thead>
<tr style="text-align: right;">
<th>
</th>
<th>
fixed acidity
</th>
<th>
volatile acidity
</th>
<th>
citric acid
</th>
<th>
residual sugar
</th>
<th>
chlorides
</th>
<th>
free sulfur dioxide
</th>
<th>
total sulfur dioxide
</th>
<th>
density
</th>
<th>
pH
</th>
<th>
sulphates
</th>
<th>
alcohol
</th>
<th>
quality
</th>
</tr>
</thead>
<tbody>
<tr>
<th>
count
</th>
<td>
1599.000000
</td>
<td>
1599.000000
</td>
<td>
1599.000000
</td>
<td>
1599.000000
</td>
<td>
1599.000000
</td>
<td>
1599.000000
</td>
<td>
1599.000000
</td>
<td>
1599.000000
</td>
<td>
1599.000000
</td>
<td>
1599.000000
</td>
<td>
1599.000000
</td>
<td>
1599.000000
</td>
</tr>
<tr>
<th>
mean
</th>
<td>
8.319637
</td>
<td>
0.527821
</td>
<td>
0.270976
</td>
<td>
2.538806
</td>
<td>
0.087467
</td>
<td>
15.874922
</td>
<td>
46.467792
</td>
<td>
0.996747
</td>
<td>
3.311113
</td>
<td>
0.658149
</td>
<td>
10.422983
</td>
<td>
5.636023
</td>
</tr>
<tr>
<th>
std
</th>
<td>
1.741096
</td>
<td>
0.179060
</td>
<td>
0.194801
</td>
<td>
1.409928
</td>
<td>
0.047065
</td>
<td>
10.460157
</td>
<td>
32.895324
</td>
<td>
0.001887
</td>
<td>
0.154386
</td>
<td>
0.169507
</td>
<td>
1.065668
</td>
<td>
0.807569
</td>
</tr>
<tr>
<th>
min
</th>
<td>
4.600000
</td>
<td>
0.120000
</td>
<td>
0.000000
</td>
<td>
0.900000
</td>
<td>
0.012000
</td>
<td>
1.000000
</td>
<td>
6.000000
</td>
<td>
0.990070
</td>
<td>
2.740000
</td>
<td>
0.330000
</td>
<td>
8.400000
</td>
<td>
3.000000
</td>
</tr>
<tr>
<th>
25%
</th>
<td>
7.100000
</td>
<td>
0.390000
</td>
<td>
0.090000
</td>
<td>
1.900000
</td>
<td>
0.070000
</td>
<td>
7.000000
</td>
<td>
22.000000
</td>
<td>
0.995600
</td>
<td>
3.210000
</td>
<td>
0.550000
</td>
<td>
9.500000
</td>
<td>
5.000000
</td>
</tr>
<tr>
<th>
50%
</th>
<td>
7.900000
</td>
<td>
0.520000
</td>
<td>
0.260000
</td>
<td>
2.200000
</td>
<td>
0.079000
</td>
<td>
14.000000
</td>
<td>
38.000000
</td>
<td>
0.996750
</td>
<td>
3.310000
</td>
<td>
0.620000
</td>
<td>
10.200000
</td>
<td>
6.000000
</td>
</tr>
<tr>
<th>
75%
</th>
<td>
9.200000
</td>
<td>
0.640000
</td>
<td>
0.420000
</td>
<td>
2.600000
</td>
<td>
0.090000
</td>
<td>
21.000000
</td>
<td>
62.000000
</td>
<td>
0.997835
</td>
<td>
3.400000
</td>
<td>
0.730000
</td>
<td>
11.100000
</td>
<td>
6.000000
</td>
</tr>
<tr>
<th>
max
</th>
<td>
15.900000
</td>
<td>
1.580000
</td>
<td>
1.000000
</td>
<td>
15.500000
</td>
<td>
0.611000
</td>
<td>
72.000000
</td>
<td>
289.000000
</td>
<td>
1.003690
</td>
<td>
4.010000
</td>
<td>
2.000000
</td>
<td>
14.900000
</td>
<td>
8.000000
</td>
</tr>
</tbody>
</table>
# for white wines
white_df.describe()
<style>
.dataframe thead tr:only-child th {
text-align: right;
}
.dataframe thead th {
text-align: left;
}
.dataframe tbody tr th {
vertical-align: top;
}
</style>
<table border="1" class="dataframe">
<thead>
<tr style="text-align: right;">
<th>
</th>
<th>
fixed acidity
</th>
<th>
volatile acidity
</th>
<th>
citric acid
</th>
<th>
residual sugar
</th>
<th>
chlorides
</th>
<th>
free sulfur dioxide
</th>
<th>
total sulfur dioxide
</th>
<th>
density
</th>
<th>
pH
</th>
<th>
sulphates
</th>
<th>
alcohol
</th>
<th>
quality
</th>
</tr>
</thead>
<tbody>
<tr>
<th>
count
</th>
<td>
4898.000000
</td>
<td>
4898.000000
</td>
<td>
4898.000000
</td>
<td>
4898.000000
</td>
<td>
4898.000000
</td>
<td>
4898.000000
</td>
<td>
4898.000000
</td>
<td>
4898.000000
</td>
<td>
4898.000000
</td>
<td>
4898.000000
</td>
<td>
4898.000000
</td>
<td>
4898.000000
</td>
</tr>
<tr>
<th>
mean
</th>
<td>
6.854788
</td>
<td>
0.278241
</td>
<td>
0.334192
</td>
<td>
6.391415
</td>
<td>
0.045772
</td>
<td>
35.308085
</td>
<td>
138.360657
</td>
<td>
0.994027
</td>
<td>
3.188267
</td>
<td>
0.489847
</td>
<td>
10.514267
</td>
<td>
5.877909
</td>
</tr>
<tr>
<th>
std
</th>
<td>
0.843868
</td>
<td>
0.100795
</td>
<td>
0.121020
</td>
<td>
5.072058
</td>
<td>
0.021848
</td>
<td>
17.007137
</td>
<td>
42.498065
</td>
<td>
0.002991
</td>
<td>
0.151001
</td>
<td>
0.114126
</td>
<td>
1.230621
</td>
<td>
0.885639
</td>
</tr>
<tr>
<th>
min
</th>
<td>
3.800000
</td>
<td>
0.080000
</td>
<td>
0.000000
</td>
<td>
0.600000
</td>
<td>
0.009000
</td>
<td>
2.000000
</td>
<td>
9.000000
</td>
<td>
0.987110
</td>
<td>
2.720000
</td>
<td>
0.220000
</td>
<td>
8.000000
</td>
<td>
3.000000
</td>
</tr>
<tr>
<th>
25%
</th>
<td>
6.300000
</td>
<td>
0.210000
</td>
<td>
0.270000
</td>
<td>
1.700000
</td>
<td>
0.036000
</td>
<td>
23.000000
</td>
<td>
108.000000
</td>
<td>
0.991723
</td>
<td>
3.090000
</td>
<td>
0.410000
</td>
<td>
9.500000
</td>
<td>
5.000000
</td>
</tr>
<tr>
<th>
50%
</th>
<td>
6.800000
</td>
<td>
0.260000
</td>
<td>
0.320000
</td>
<td>
5.200000
</td>
<td>
0.043000
</td>
<td>
34.000000
</td>
<td>
134.000000
</td>
<td>
0.993740
</td>
<td>
3.180000
</td>
<td>
0.470000
</td>
<td>
10.400000
</td>
<td>
6.000000
</td>
</tr>
<tr>
<th>
75%
</th>
<td>
7.300000
</td>
<td>
0.320000
</td>
<td>
0.390000
</td>
<td>
9.900000
</td>
<td>
0.050000
</td>
<td>
46.000000
</td>
<td>
167.000000
</td>
<td>
0.996100
</td>
<td>
3.280000
</td>
<td>
0.550000
</td>
<td>
11.400000
</td>
<td>
6.000000
</td>
</tr>
<tr>
<th>
max
</th>
<td>
14.200000
</td>
<td>
1.100000
</td>
<td>
1.660000
</td>
<td>
65.800000
</td>
<td>
0.346000
</td>
<td>
289.000000
</td>
<td>
440.000000
</td>
<td>
1.038980
</td>
<td>
3.820000
</td>
<td>
1.080000
</td>
<td>
14.200000
</td>
<td>
9.000000
</td>
</tr>
</tbody>
</table>
Sometimes it is easier to understand the data visually. A histogram of the white wine quality data citric acid samples is shown next. You can of course visualize other columns' data or other datasets. Just replace the DataFrame and column name (see Figure 1).
import matplotlib.pyplot as plt
def extract_col(df,col_name):
return list(df[col_name])
col = extract_col(white_df,'citric acid') # can replace with another dataframe or column
plt.hist(col)
#TODO: add axes and such to set a good example
plt.show()
Figure 1: Histogram
Detecting Features
Let us try out a some elementary machine learning models. These models are not always for prediction. They are also useful to find what features are most predictive of a variable of interest. Depending on the classifier you use, you may need to transform the data pertaining to that variable.
Data Preparation
Let us assume we want to study what features are most correlated
with pH. pH of course is real-valued, and continuous. The classifiers
we want to use usually need labeled or integer data. Hence, we will
transform the pH data, assigning wines with pH higher than average
as hi
(more basic or alkaline) and wines with pH lower than
average as lo
(more acidic).
# refresh to make Jupyter happy
red_df = pd.read_csv('winequality-red.csv',sep=';',header=0, index_col=False)
white_df = pd.read_csv('winequality-white.csv',sep=';',header=0,index_col=False)
#TODO: data cleansing functions here, e.g. replacement of NaN
# if the variable you want to predict is continuous, you can map ranges of values
# to integer/binary/string labels
# for example, map the pH data to 'hi' and 'lo' if a pH value is more than or
# less than the mean pH, respectively
M = np.mean(list(red_df['pH'])) # expect inelegant code in these mappings
Lf = lambda p: int(p < M)*'lo' + int(p >= M)*'hi' # some C-style hackery
# create the new classifiable variable
red_df['pH-hi-lo'] = map(Lf,list(red_df['pH']))
# and remove the predecessor
del red_df['pH']
Now we specify which dataset and variable you want to predict by
assigning vlues to SELECTED_DF
and TARGET_VAR
, respectively.
We like to keep a parameter file where we specify data sources and such. This lets me create generic analytics code that is easy to reuse.
After we have specified what dataset we want to study, we split the training and test datasets. We then scale (normalize) the data, which makes most classifiers run better.
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
from sklearn import metrics
# make selections here without digging in code
SELECTED_DF = red_df # selected dataset
TARGET_VAR = 'pH-hi-lo' # the predicted variable
# generate nameless data structures
df = SELECTED_DF
target = np.array(df[TARGET_VAR]).ravel()
del df[TARGET_VAR] # no cheating
#TODO: data cleansing function calls here
# split datasets for training and testing
X_train, X_test, y_train, y_test = train_test_split(df,target,test_size=0.2)
# set up the scaler
scaler = StandardScaler()
scaler.fit(X_train)
# apply the scaler
X_train = scaler.transform(X_train)
X_test = scaler.transform(X_test)
Now we pick a classifier. As you can see, there are many to try out, and even more in scikit-learn’s documentation and many examples and tutorials. Random Forests are data science workhorses. They are the go-to method for most data scientists. Be careful relying on them though–they tend to overfit. We try to avoid overfitting by separating the training and test datasets.
Random Forest
# pick a classifier
from sklearn.tree import DecisionTreeClassifier,DecisionTreeRegressor,ExtraTreeClassifier,ExtraTreeRegressor
from sklearn.ensemble import RandomForestClassifier,ExtraTreesClassifier
clf = RandomForestClassifier()
Now we will test it out with the default parameters.
Note that this code is boilerplate. You can use it interchangeably for most scikit-learn models.
# test it out
model = clf.fit(X_train,y_train)
pred = clf.predict(X_test)
conf_matrix = metrics.confusion_matrix(y_test,pred)
var_score = clf.score(X_test,y_test)
# the results
importances = clf.feature_importances_
indices = np.argsort(importances)[::-1]
Now output the results. For Random Forests, we get a feature ranking. Relative importances usually exponentially decay. The first few highly-ranked features are usually the most important.
# for the sake of clarity
num_features = X_train.shape[1]
features = map(lambda x: df.columns[x],indices)
feature_importances = map(lambda x: importances[x],indices)
print 'Feature ranking:\n'
for i in range(num_features):
feature_name = features[i]
feature_importance = feature_importances[i]
print '%s%f' % (feature_name.ljust(30), feature_importance)
Feature ranking:
fixed acidity 0.269778
citric acid 0.171337
density 0.089660
volatile acidity 0.088965
chlorides 0.082945
alcohol 0.080437
total sulfur dioxide 0.067832
sulphates 0.047786
free sulfur dioxide 0.042727
residual sugar 0.037459
quality 0.021075
Sometimes it’s easier to visualize. We’ll use a bar chart. See Figure 2
plt.clf()
plt.bar(range(num_features),feature_importances)
plt.xticks(range(num_features),features,rotation=90)
plt.ylabel('relative importance (a.u.)')
plt.title('Relative importances of most predictive features')
plt.show()
Figure 2: Result
import dask.dataframe as dd
red_df = dd.read_csv('winequality-red.csv',sep=';',header=0)
white_df = dd.read_csv('winequality-white.csv',sep=';',header=0)
Acknowledgement
This notebook was developed by Juliette Zerick and Gregor von Laszewski
6 - Parallel Computing in Python
Gregor von Laszewski (laszewski@gmail.com)
In this module, we will review the available Python modules that can be used for parallel computing. Parallel computing can be in the form of either multi-threading or multi-processing. In multi-threading approach, the threads run in the same shared memory heap whereas in case of multi-processing, the memory heaps of processes are separate and independent, therefore the communication between the processes are a little bit more complex.
Multi-threading in Python
Threading in Python is perfect for I/O operations where the process is expected to be idle regularly, e.g. web scraping. This is a very useful feature because several applications and scripts might spend the majority of their runtime waiting for network or data I/O. In several cases, e.g. web scraping, the resources, i.e. downloading from different websites, are most of the time-independent. Therefore the processor can download in parallel and join the result at the end.
Thread
vs Threading
There are two built-in modules in Python that are related to
threading, namely thread
and threading
. The former module is
deprecated for some time in Python 2
, and in Python 3
it is renamed
to _thread
for the sake of backward incompatibilities. The
_thread
module provides low-level threading API for multi-threading
in Python, whereas the module threading
builds a high-level
threading interface on top of it.
The Thread()
is the main method of the threading
module, the two
important arguments of which are target
, for specifying the callable
object, and args
to pass the arguments for the target callable. We
illustrate these in the following example:
import threading
def hello_thread(thread_num):
print ("Hello from Thread ", thread_num)
if __name__ == '__main__':
for thread_num in range(5):
t = threading.Thread(target=hello_thread,arg=(thread_num,))
t.start()
This is the output of the previous example:
In [1]: %run threading.py
Hello from Thread 0
Hello from Thread 1
Hello from Thread 2
Hello from Thread 3
Hello from Thread 4
In case you are not familiar with the if __name__ == '__main__:'
statement, what it does is making sure that the code nested
under this condition will be run only if you run your module as a
program and it will not run in case your module is imported into another
file.
Locks
As mentioned prior, the memory space is shared between the
threads. This is at the same time beneficial and problematic: it is
beneficial in a sense that the communication between the threads
becomes easy, however, you might experience a strange outcome if you let
several threads change the same variable without caution, e.g. thread 2
changes variable x
while thread 1 is working with it. This is when
lock
comes into play. Using lock
, you can allow only one thread to
work with a variable. In other words, only a single thread can hold
the lock
. If the other threads need to work with that variable, they
have to wait until the other thread is done and the variable is
“unlocked.”
We illustrate this with a simple example:
import threading
global counter
counter = 0
def incrementer1():
global counter
for j in range(2):
for i in range(3):
counter += 1
print("Greeter 1 incremented the counter by 1")
print ("Counter is %d"%counter)
def incrementer2():
global counter
for j in range(2):
for i in range(3):
counter += 1
print("Greeter 2 incremented the counter by 1")
print ("Counter is now %d"%counter)
if __name__ == '__main__':
t1 = threading.Thread(target = incrementer1)
t2 = threading.Thread(target = incrementer2)
t1.start()
t2.start()
Suppose we want to print multiples of 3 between 1 and 12, i.e. 3, 6, 9
and 12. For the sake of argument, we try to do this using 2 threads
and a nested for loop. Then we create a global variable called counter
and we initialize it with 0. Then whenever each of the incrementer1
or incrementer2
functions are called, the counter
is incremented
by 3 twice (counter is incremented by 6 in each function call). If you
run the previous code, you should be really lucky if you get the
following as part of your output:
Counter is now 3
Counter is now 6
Counter is now 9
Counter is now 12
The reason is the conflict that happens between threads
while incrementing the counter
in the nested for loop. As you
probably noticed, the first level for loop is equivalent to adding 3
to the counter and the conflict that might happen is not effective on
that level but the nested for loop. Accordingly, the output of the
previous code is different in every run. This is an example output:
$ python3 lock_example.py
Greeter 1 incremented the counter by 1
Greeter 1 incremented the counter by 1
Greeter 1 incremented the counter by 1
Counter is 4
Greeter 2 incremented the counter by 1
Greeter 2 incremented the counter by 1
Greeter 1 incremented the counter by 1
Greeter 2 incremented the counter by 1
Greeter 1 incremented the counter by 1
Counter is 8
Greeter 1 incremented the counter by 1
Greeter 2 incremented the counter by 1
Counter is 10
Greeter 2 incremented the counter by 1
Greeter 2 incremented the counter by 1
Counter is 12
We can fix this issue using a lock
: whenever one of the function is
going to increment the value by 3, it will acquire()
the lock and
when it is done the function will release()
the lock. This mechanism
is illustrated in the following code:
import threading
increment_by_3_lock = threading.Lock()
global counter
counter = 0
def incrementer1():
global counter
for j in range(2):
increment_by_3_lock.acquire(True)
for i in range(3):
counter += 1
print("Greeter 1 incremented the counter by 1")
print ("Counter is %d"%counter)
increment_by_3_lock.release()
def incrementer2():
global counter
for j in range(2):
increment_by_3_lock.acquire(True)
for i in range(3):
counter += 1
print("Greeter 2 incremented the counter by 1")
print ("Counter is %d"%counter)
increment_by_3_lock.release()
if __name__ == '__main__':
t1 = threading.Thread(target = incrementer1)
t2 = threading.Thread(target = incrementer2)
t1.start()
t2.start()
No matter how many times you run this code, the output would always be in the correct order:
$ python3 lock_example.py
Greeter 1 incremented the counter by 1
Greeter 1 incremented the counter by 1
Greeter 1 incremented the counter by 1
Counter is 3
Greeter 1 incremented the counter by 1
Greeter 1 incremented the counter by 1
Greeter 1 incremented the counter by 1
Counter is 6
Greeter 2 incremented the counter by 1
Greeter 2 incremented the counter by 1
Greeter 2 incremented the counter by 1
Counter is 9
Greeter 2 incremented the counter by 1
Greeter 2 incremented the counter by 1
Greeter 2 incremented the counter by 1
Counter is 12
Using the Threading
module increases both the overhead associated
with thread management as well as the complexity of the program and
that is why in many situations, employing multiprocessing
module
might be a better approach.
Multi-processing in Python
We already mentioned that multi-threading might not be sufficient in
many applications and we might need to use multiprocessing
sometimes,
or better to say most of the time. That is why we are dedicating this
subsection to this particular module. This module provides you with an
API for spawning processes the way you spawn threads using threading
module. Moreover, some functionalities are not even
available in threading
module, e.g. the Pool
class which allows
you to run a batch of jobs using a pool of worker processes.
Process
Similar to threading
module which was employing thread
(aka
_thread
) under the hood, multiprocessing
employs the Process
class. Consider the following example:
from multiprocessing import Process
import os
def greeter (name):
proc_idx = os.getpid()
print ("Process {0}: Hello {1}!".format(proc_idx,name))
if __name__ == '__main__':
name_list = ['Harry', 'George', 'Dirk', 'David']
process_list = []
for name_idx, name in enumerate(name_list):
current_process = Process(target=greeter, args=(name,))
process_list.append(current_process)
current_process.start()
for process in process_list:
process.join()
In this example, after importing the Process
module we created a
greeter()
function that takes a name
and greets that person. It
also prints the pid
(process identifier) of the process that is
running it. Note that we used the os
module to get the pid
. In the
bottom of the code after checking the __name__='__main__'
condition,
we create a series of Process
es and start
them. Finally in the
last for loop and using the join
method, we tell Python to wait for
the processes to terminate. This is one of the possible outputs of the
code:
$ python3 process_example.py
Process 23451: Hello Harry!
Process 23452: Hello George!
Process 23453: Hello Dirk!
Process 23454: Hello David!
Pool
Consider the Pool
class as a pool of worker processes. There are
several ways for assigning jobs to the Pool
class and we will
introduce the most important ones in this section. These methods are
categorized as blocking
or non-blocking. The former means that after calling the API, it blocks the thread/process until it has the result or answer ready and the control returns only when the call completes. In the
non-blockin` on the other hand, the control returns
immediately.
Synchronous Pool.map()
We illustrate the Pool.map
method by re-implementing our previous
greeter example using Pool.map
:
from multiprocessing import Pool
import os
def greeter(name):
pid = os.getpid()
print("Process {0}: Hello {1}!".format(pid,name))
if __name__ == '__main__':
names = ['Jenna', 'David','Marry', 'Ted','Jerry','Tom','Justin']
pool = Pool(processes=3)
sync_map = pool.map(greeter,names)
print("Done!")
As you can see, we have seven names here but we do not want to
dedicate each greeting to a separate process. Instead, we do the whole
job of “greeting seven people” using “two processes.” We create a pool
of 3 processes with Pool(processes=3)
syntax and then we map an
iterable called names
to the greeter
function using
pool.map(greeter,names)
. As we expected, the greetings in the
output will be printed from three different processes:
$ python poolmap_example.py
Process 30585: Hello Jenna!
Process 30586: Hello David!
Process 30587: Hello Marry!
Process 30585: Hello Ted!
Process 30585: Hello Jerry!
Process 30587: Hello Tom!
Process 30585: Hello Justin!
Done!
Note that Pool.map()
is in blocking
category and does not return
the control to your script until it is done calculating the
results. That is why Done!
is printed after all of the greetings are
over.
Asynchronous Pool.map_async()
As the name implies, you can use the map_async
method, when you want
assign many function calls to a pool of worker processes
asynchronously. Note that unlike map
, the order of the results is
not guaranteed (as oppose to map
) and the control is returned
immediately. We now implement the previous example using map_async
:
from multiprocessing import Pool
import os
def greeter(name):
pid = os.getpid()
print("Process {0}: Hello {1}!".format(pid,name))
if __name__ == '__main__':
names = ['Jenna', 'David','Marry', 'Ted','Jerry','Tom','Justin']
pool = Pool(processes=3)
async_map = pool.map_async(greeter,names)
print("Done!")
async_map.wait()
As you probably noticed, the only difference (clearly apart from the
map_async
method name) is calling the wait()
method in the last
line. The wait()
method tells your script to wait for the result of
map_async
before terminating:
$ python poolmap_example.py
Done!
Process 30740: Hello Jenna!
Process 30741: Hello David!
Process 30740: Hello Ted!
Process 30742: Hello Marry!
Process 30740: Hello Jerry!
Process 30741: Hello Tom!
Process 30742: Hello Justin!
Note that the order of the results are not preserved. Moreover,
Done!
is printer before any of the results, meaning that if we do
not use the wait()
method, you probably will not see the result at
all.
Locks
The way multiprocessing
module implements locks is almost identical
to the way the threading
module does. After importing Lock
from
multiprocessing
all you need to do is to acquire
it, do some
computation and then release
the lock. We will clarify the use of
Lock
by providing an example in next section about process
communication.
Process Communication
Process communication in multiprocessing
is one of the most
important, yet complicated, features for better use of this module. As
oppose to threading
, the Process
objects will not have access to
any shared variable by default, i.e. no shared memory space between
the processes by default. This effect is illustrated in the following
example:
from multiprocessing import Process, Lock, Value
import time
global counter
counter = 0
def incrementer1():
global counter
for j in range(2):
for i in range(3):
counter += 1
print ("Greeter1: Counter is %d"%counter)
def incrementer2():
global counter
for j in range(2):
for i in range(3):
counter += 1
print ("Greeter2: Counter is %d"%counter)
if __name__ == '__main__':
t1 = Process(target = incrementer1 )
t2 = Process(target = incrementer2 )
t1.start()
t2.start()
Probably you already noticed that this is almost identical to our
example in threading
section. Now, take a look at the strange
output:
$ python communication_example.py
Greeter1: Counter is 3
Greeter1: Counter is 6
Greeter2: Counter is 3
Greeter2: Counter is 6
As you can see, it is as if the processes does not see each other. Instead of having two processes one counting to 6 and the other counting from 6 to 12, we have two processes counting to 6.
Nevertheless, there are several ways that Process
es from
multiprocessing
can communicate with each other, including Pipe
,
Queue
, Value
, Array
and Manager
. Pipe
and Queue
are
appropriate for inter-process message passing. To be more specific,
Pipe
is useful for process-to-process scenarios while Queue
is
more appropriate for processes-toprocesses ones. Value
and
Array
are both used to provide synchronized access to a shared
data (very much like shared memory) and Managers
can be used on
different data types. In the following sub-sections, we cover both
Value
and Array
since they are both lightweight, yet useful,
approach.
Value
The following example re-implements the broken example in the previous
section. We fix the strange output, by using both Lock
and Value
:
from multiprocessing import Process, Lock, Value
import time
increment_by_3_lock = Lock()
def incrementer1(counter):
for j in range(3):
increment_by_3_lock.acquire(True)
for i in range(3):
counter.value += 1
time.sleep(0.1)
print ("Greeter1: Counter is %d"%counter.value)
increment_by_3_lock.release()
def incrementer2(counter):
for j in range(3):
increment_by_3_lock.acquire(True)
for i in range(3):
counter.value += 1
time.sleep(0.05)
print ("Greeter2: Counter is %d"%counter.value)
increment_by_3_lock.release()
if __name__ == '__main__':
counter = Value('i',0)
t1 = Process(target = incrementer1, args=(counter,))
t2 = Process(target = incrementer2 , args=(counter,))
t2.start()
t1.start()
The usage of Lock
object in this example is identical to the example
in threading
section. The usage of counter
is on the other hand
the novel part. First, note that counter is not a global variable
anymore and instead it is a Value
which returns a ctypes
object
allocated from a shared memory between the processes. The first
argument 'i'
indicates a signed integer, and the second argument
defines the initialization value. In this case we are assigning a
signed integer in the shared memory initialized to size 0 to the
counter
variable. We then modified our two functions and pass this
shared variable as an argument. Finally, we change the way we
increment the counter
since the counter is not a Python integer anymore
but a ctypes
signed integer where we can access its value using the
value
attribute. The output of the code is now as we expected:
$ python mp_lock_example.py
Greeter2: Counter is 3
Greeter2: Counter is 6
Greeter1: Counter is 9
Greeter1: Counter is 12
The last example related to parallel processing, illustrates the use
of both Value
and Array
, as well as a technique to pass multiple
arguments to a function. Note that the Process
object does not
accept multiple arguments for a function and therefore we need this or
similar techniques for passing multiple arguments. Also, this
technique can also be used when you want to pass multiple arguments to
map
or map_async
:
from multiprocessing import Process, Lock, Value, Array
import time
from ctypes import c_char_p
increment_by_3_lock = Lock()
def incrementer1(counter_and_names):
counter= counter_and_names[0]
names = counter_and_names[1]
for j in range(2):
increment_by_3_lock.acquire(True)
for i in range(3):
counter.value += 1
time.sleep(0.1)
name_idx = counter.value//3 -1
print ("Greeter1: Greeting {0}! Counter is {1}".format(names.value[name_idx],counter.value))
increment_by_3_lock.release()
def incrementer2(counter_and_names):
counter= counter_and_names[0]
names = counter_and_names[1]
for j in range(2):
increment_by_3_lock.acquire(True)
for i in range(3):
counter.value += 1
time.sleep(0.05)
name_idx = counter.value//3 -1
print ("Greeter2: Greeting {0}! Counter is {1}".format(names.value[name_idx],counter.value))
increment_by_3_lock.release()
if __name__ == '__main__':
counter = Value('i',0)
names = Array (c_char_p,4)
names.value = ['James','Tom','Sam', 'Larry']
t1 = Process(target = incrementer1, args=((counter,names),))
t2 = Process(target = incrementer2 , args=((counter,names),))
t2.start()
t1.start()
In this example, we created a multiprocessing.Array()
object and
assigned it to a variable called names
. As we mentioned before, the
first argument is the ctype
data type and since we want to create an
array of strings with a length of 4 (second argument), we imported the
c_char_p
and passed it as the first argument.
Instead of passing the arguments separately, we merged both the
Value
and Array
objects in a tuple and passed the tuple to the
functions. We then modified the functions to unpack the objects in the
first two lines in both functions. Finally, we changed the print
statement in a way that each process greets a particular name. The
output of the example is:
$ python3 mp_lock_example.py
Greeter2: Greeting James! Counter is 3
Greeter2: Greeting Tom! Counter is 6
Greeter1: Greeting Sam! Counter is 9
Greeter1: Greeting Larry! Counter is 12
7 - Dask
Gregor von Laszewski (laszewski@gmail.com)
Dask is a python-based parallel computing library for analytics. Parallel computing is a type of computation in which many calculations or the execution of processes are carried out simultaneously. Large problems can often be divided into smaller ones, which can then be solved concurrently.
Dask is composed of two components:
- Dynamic task scheduling optimized for computation. This is similar to Airflow, Luigi, Celery, or Make, but optimized for interactive computational workloads.
- Big Data collections like parallel arrays, dataframes, and lists that extend common interfaces like NumPy, Pandas, or Python iterators to larger-than-memory or distributed environments. These parallel collections run on top of the dynamic task schedulers.
Dask emphasizes the following virtues:
- Familiar: Provides parallelized NumPy array and Pandas DataFrame objects.
- Flexible: Provides a task scheduling interface for more custom workloads and integration with other projects.
- Native: Enables distributed computing in Pure Python with access to the PyData stack.
- Fast: Operates with low overhead, low latency, and minimal serialization necessary for fast numerical algorithms
- Scales up: Runs resiliently on clusters with 1000s of cores
- Scales down: Trivial to set up and run on a laptop in a single process
- Responsive: Designed with interactive computing in mind it provides rapid feedback and diagnostics to aid humans
The section is structured in a number of subsections addressing the following topics:
- Foundations:
-
an explanation of what Dask is, how it works, and how to use lower level primitives to set up computations. Casual users may wish to skip this section, although we consider it useful knowledge for all users.
- Distributed Features:
-
information on running Dask on the distributed scheduler, which enables scale-up to distributed settings and enhanced monitoring of task operations. The distributed scheduler is now generally the recommended engine for executing task work, even on single workstations or laptops.
- Collections:
-
convenient abstractions giving a familiar feel to big data.
- Bags:
-
Python iterators with a functional paradigm, such as found in func/iter-tools and toolz - generalize lists/generators to big data; this will seem very familiar to users of PySpark’s RDD
- Array:
-
massive multi-dimensional numerical data, with Numpy functionality
- Dataframe:
-
massive tabular data, with Pandas functionality
How Dask Works
Dask is a computation tool for larger-than-memory datasets, parallel execution or delayed/background execution.
We can summarize the basics of Dask as follows:
- process data that does not fit into memory by breaking it into blocks and specifying task chains
- parallelize execution of tasks across cores and even nodes of a cluster
- move computation to the data rather than the other way around, to minimize communication overheads
We use for-loops to build basic tasks, Python iterators, and the Numpy (array) and Pandas (dataframe) functions for multi-dimensional or tabular data, respectively.
Dask allows us to construct a prescription for the calculation we want to carry out. A module named Dask.delayed lets us parallelize custom code. It is useful whenever our problem doesn’t quite fit a high-level parallel object like dask.array or dask.dataframe but could still benefit from parallelism. Dask.delayed works by delaying our function evaluations and putting them into a dask graph. Here is a small example:
from dask import delayed
@delayed
def inc(x):
return x + 1
@delayed
def add(x, y):
return x + y
Here we have used the delayed annotation to show that we want these functions to operate lazily - to save the set of inputs and execute only on demand.
Dask Bag
Dask-bag excels in processing data that can be represented as a sequence of arbitrary inputs. We’ll refer to this as “messy” data, because it can contain complex nested structures, missing fields, mixtures of data types, etc. The functional programming style fits very nicely with standard Python iteration, such as can be found in the itertools module.
Messy data is often encountered at the beginning of data processing pipelines when large volumes of raw data are first consumed. The initial set of data might be JSON, CSV, XML, or any other format that does not enforce strict structure and datatypes. For this reason, the initial data massaging and processing is often done with Python lists, dicts, and sets.
These core data structures are optimized for general-purpose storage and processing. Adding streaming computation with iterators/generator expressions or libraries like itertools or toolz let us process large volumes in a small space. If we combine this with parallel processing then we can churn through a fair amount of data.
Dask.bag is a high level Dask collection to automate common workloads of this form. In a nutshell
dask.bag = map, filter, toolz + parallel execution
You can create a Bag from a Python sequence, from files, from data on S3, etc.
# each element is an integer
import dask.bag as db
b = db.from_sequence([1, 2, 3, 4, 5, 6, 7, 8, 9, 10])
# each element is a text file of JSON lines
import os
b = db.read_text(os.path.join('data', 'accounts.*.json.gz'))
# Requires `s3fs` library
# each element is a remote CSV text file
b = db.read_text('s3://dask-data/nyc-taxi/2015/yellow_tripdata_2015-01.csv')
Bag objects hold the standard functional API found in projects like the Python standard library, toolz, or pyspark, including map, filter, groupby, etc.
As with Array and DataFrame objects, operations on Bag objects create new bags. Call the .compute() method to trigger execution.
def is_even(n):
return n % 2 == 0
b = db.from_sequence([1, 2, 3, 4, 5, 6, 7, 8, 9, 10])
c = b.filter(is_even).map(lambda x: x ** 2)
c
# blocking form: wait for completion (which is very fast in this case)
c.compute()
For more details on Dask Bag check https://dask.pydata.org/en/latest/bag.html
Concurrency Features
Dask supports a real-time task framework that extends Python’s concurrent.futures interface. This interface is good for arbitrary task scheduling, like dask.delayed, but is immediate rather than lazy, which provides some more flexibility in situations where the computations may evolve. These features depend on the second-generation task scheduler found in dask.distributed (which, despite its name, runs very well on a single machine).
Dask allows us to simply construct graphs of tasks with dependencies. We can find that graphs can also be created automatically for us using functional, Numpy, or Pandas syntax on data collections. None of this would be very useful if there weren’t also a way to execute these graphs, in a parallel and memory-aware way. Dask comes with four available schedulers:
dask.threaded.get
: a scheduler backed by a thread pooldask.multiprocessing.get
: a scheduler backed by a process pooldask.async.get_sync
: a synchronous scheduler, good for debuggingdistributed.Client.get
: a distributed scheduler for executing graphs on multiple machines.
Here is a simple program for dask.distributed library:
from dask.distributed import Client
client = Client('scheduler:port')
futures = []
for fn in filenames:
future = client.submit(load, fn)
futures.append(future)
summary = client.submit(summarize, futures)
summary.result()
For more details on Concurrent Features by Dask check https://dask.pydata.org/en/latest/futures.html
Dask Array
Dask arrays implement a subset of the NumPy interface on large arrays using blocked algorithms and task scheduling. These behave like numpy arrays, but break a massive job into tasks that are then executed by a scheduler. The default scheduler uses threading but you can also use multiprocessing or distributed or even serial processing (mainly for debugging). You can tell the dask array how to break the data into chunks for processing.
import dask.array as da
f = h5py.File('myfile.hdf5')
x = da.from_array(f['/big-data'], chunks=(1000, 1000))
x - x.mean(axis=1).compute()
For more details on Dask Array check https://dask.pydata.org/en/latest/array.html
Dask DataFrame
A Dask DataFrame is a large parallel dataframe composed of many smaller Pandas dataframes, split along the index. These pandas dataframes may live on disk for larger-than-memory computing on a single machine, or on many different machines in a cluster. Dask.dataframe implements a commonly used subset of the Pandas interface including elementwise operations, reductions, grouping operations, joins, timeseries algorithms, and more. It copies the Pandas interface for these operations exactly and so should be very familiar to Pandas users. Because Dask.dataframe operations merely coordinate Pandas operations they usually exhibit similar performance characteristics as are found in Pandas. To run the following code, save ‘student.csv’ file in your machine.
import pandas as pd
df = pd.read_csv('student.csv')
d = df.groupby(df.HID).Serial_No.mean()
print(d)
ID
101 1
102 2
104 3
105 4
106 5
107 6
109 7
111 8
201 9
202 10
Name: Serial_No, dtype: int64
import dask.dataframe as dd
df = dd.read_csv('student.csv')
dt = df.groupby(df.HID).Serial_No.mean().compute()
print (dt)
ID
101 1.0
102 2.0
104 3.0
105 4.0
106 5.0
107 6.0
109 7.0
111 8.0
201 9.0
202 10.0
Name: Serial_No, dtype: float64
For more details on Dask DataFrame check https://dask.pydata.org/en/latest/dataframe.html
Dask DataFrame Storage
Efficient storage can dramatically improve performance, particularly when operating repeatedly from disk.
Decompressing text and parsing CSV files is expensive. One of the most effective strategies with medium data is to use a binary storage format like HDF5.
# be sure to shut down other kernels running distributed clients
from dask.distributed import Client
client = Client()
Create data if we don’t have any
from prep import accounts_csvs
accounts_csvs(3, 1000000, 500)
First we read our csv data as before.
CSV and other text-based file formats are the most common storage for data from many sources, because they require minimal pre-processing, can be written line-by-line and are human-readable. Since Pandas' read_csv is well-optimized, CSVs are a reasonable input, but far from optimized, since reading required extensive text parsing.
import os
filename = os.path.join('data', 'accounts.*.csv')
filename
import dask.dataframe as dd
df_csv = dd.read_csv(filename)
df_csv.head()
HDF5 and netCDF are binary array formats very commonly used in the scientific realm.
Pandas contains a specialized HDF5 format, HDFStore. The dd.DataFrame.to_hdf method works exactly like the pd.DataFrame.to_hdf method.
target = os.path.join('data', 'accounts.h5')
target
%time df_csv.to_hdf(target, '/data')
df_hdf = dd.read_hdf(target, '/data')
df_hdf.head()
For more information on Dask DataFrame Storage, click http://dask.pydata.org/en/latest/dataframe-create.html
Links
- https://dask.pydata.org/en/latest/
- http://matthewrocklin.com/blog/work/2017/10/16/streaming-dataframes-1
- http://people.duke.edu/~ccc14/sta-663-2017/18A_Dask.html
- https://www.kdnuggets.com/2016/09/introducing-dask-parallel-programming.html
- https://pypi.python.org/pypi/dask/
- https://www.hdfgroup.org/2015/03/hdf5-as-a-zero-configuration-ad-hoc-scientific-database-for-python/
- https://github.com/dask/dask-tutorial