1 - Word Count with Parallel Python

Gregor von Laszewski (laszewski@gmail.com)

We will demonstrate Python’s multiprocessing API for parallel computation by writing a program that counts how many times each word in a collection of documents appear.

Generating a Document Collection

Before we begin, let us write a script that will generate document collections by specifying the number of documents and the number of words per document. This will make benchmarking straightforward.

To keep it simple, the vocabulary of the document collection will consist of random numbers rather than the words of an actual language:

'''Usage: generate_nums.py [-h] NUM_LISTS INTS_PER_LIST MIN_INT MAX_INT DEST_DIR

Generate random lists of integers and save them
as 1.txt, 2.txt, etc.

Arguments:
   NUM_LISTS      The number of lists to create.
   INTS_PER_LIST  The number of integers in each list.
   MIN_NUM        Each generated integer will be >= MIN_NUM.
   MAX_NUM        Each generated integer will be <= MAX_NUM.
   DEST_DIR       A directory where the generated numbers will be stored.

Options:
  -h --help
'''

import os, random, logging
from docopt import docopt


def generate_random_lists(num_lists,
                          ints_per_list, min_int, max_int):
    return [[random.randint(min_int, max_int) \
        for i in range(ints_per_list)] for i in range(num_lists)]


if __name__ == '__main__':
   args = docopt(__doc__)
   num_lists, ints_per_list, min_int, max_int, dest_dir = [
      int(args['NUM_LISTS']),
      int(args['INTS_PER_LIST']),
      int(args['MIN_INT']),
      int(args['MAX_INT']),
      args['DEST_DIR']
   ]

   if not os.path.exists(dest_dir):
      os.makedirs(dest_dir)

   lists = generate_random_lists(num_lists,
                                 ints_per_list,
                                 min_int,
                                 max_int)
   curr_list = 1
   for lst in lists:
      with open(os.path.join(dest_dir, '%d.txt' % curr_list), 'w') as f:
     f.write(os.linesep.join(map(str, lst)))
  curr_list += 1
   logging.debug('Numbers written.')

Notice that we are using the docopt module that you should be familiar with from the Section [Python DocOpts](#s-python-docopts} to make the script easy to run from the command line.

You can generate a document collection with this script as follows:

python generate_nums.py 1000 10000 0 100 docs-1000-10000

Serial Implementation

A first serial implementation of wordcount is straightforward:

'''Usage: wordcount.py [-h] DATA_DIR

Read a collection of .txt documents and count how many times each word
appears in the collection.

Arguments:
  DATA_DIR  A directory with documents (.txt files).

Options:
  -h --help
'''

import os, glob, logging
from docopt import docopt

logging.basicConfig(level=logging.DEBUG)


def wordcount(files):
   counts = {}
   for filepath in files:
      with open(filepath, 'r') as f:
     words = [word.strip() for word in f.read().split()]
     for word in words:
        if word not in counts:
           counts[word] = 0
        counts[word] += 1
   return counts


if __name__ == '__main__':
   args = docopt(__doc__)
   if not os.path.exists(args['DATA_DIR']):
      raise ValueError('Invalid data directory: %s' % args['DATA_DIR'])

   counts = wordcount(glob.glob(os.path.join(args['DATA_DIR'], '*.txt')))
   logging.debug(counts)

Serial Implementation Using map and reduce

We can improve the serial implementation in anticipation of parallelizing the program by making use of Python’s map and reduce functions.

In short, you can use map to apply the same function to the members of a collection. For example, to convert a list of numbers to strings, you could do:

import random
nums = [random.randint(1, 2) for _ in range(10)]
print(nums)
[2, 1, 1, 1, 2, 2, 2, 2, 2, 2]
print(map(str, nums))
['2', '1', '1', '1', '2', '2', '2', '2', '2', '2']

We can use reduce to apply the same function cumulatively to the items of a sequence. For example, to find the total of the numbers in our list, we could use reduce as follows:

def add(x, y):
    return x + y

print(reduce(add, nums))
17

We can simplify this even more by using a lambda function:

print(reduce(lambda x, y: x + y, nums))
17

You can read more about Python’s lambda function in the docs.

With this in mind, we can reimplement the wordcount example as follows:

'''Usage: wordcount_mapreduce.py [-h] DATA_DIR

Read a collection of .txt documents and count how
many times each word
appears in the collection.

Arguments:
   DATA_DIR  A directory with documents (.txt files).

Options:
   -h --help
'''

import os, glob, logging
from docopt import docopt

logging.basicConfig(level=logging.DEBUG)

def count_words(filepath):
   counts = {}
   with open(filepath, 'r') as f:
      words = [word.strip() for word in f.read().split()]

  for word in words:
     if word not in counts:
        counts[word] = 0
     counts[word] += 1
  return counts


def merge_counts(counts1, counts2):
   for word, count in counts2.items():
      if word not in counts1:
     counts1[word] = 0
  counts1[word] += counts2[word]
   return counts1


if __name__ == '__main__':
   args = docopt(__doc__)
   if not os.path.exists(args['DATA_DIR']):
      raise ValueError('Invalid data directory: %s' % args['DATA_DIR'])

      per_doc_counts = map(count_words,
                           glob.glob(os.path.join(args['DATA_DIR'],
                           '*.txt')))
   counts = reduce(merge_counts, [{}] + per_doc_counts)
   logging.debug(counts)

Parallel Implementation

Drawing on the previous implementation using map and reduce, we can parallelize the implementation using Python’s multiprocessing API:

'''Usage: wordcount_mapreduce_parallel.py [-h] DATA_DIR NUM_PROCESSES

Read a collection of .txt documents and count, in parallel, how many
times each word appears in the collection.

Arguments:
   DATA_DIR       A directory with documents (.txt files).
   NUM_PROCESSES  The number of parallel processes to use.

Options:
   -h --help
'''

import os, glob, logging
from docopt import docopt
from wordcount_mapreduce import count_words, merge_counts
from multiprocessing import Pool

logging.basicConfig(level=logging.DEBUG)

if __name__ == '__main__':
   args = docopt(__doc__)
   if not os.path.exists(args['DATA_DIR']):
      raise ValueError('Invalid data directory: %s' % args['DATA_DIR'])
   num_processes = int(args['NUM_PROCESSES'])

   pool = Pool(processes=num_processes)

   per_doc_counts = pool.map(count_words,
                             glob.glob(os.path.join(args['DATA_DIR'],
                             '*.txt')))
   counts = reduce(merge_counts, [{}] + per_doc_counts)
   logging.debug(counts)

Benchmarking

To time each of the examples, enter it into its own Python file and use Linux’s time command:

$ time python wordcount.py docs-1000-10000

The output contains the real run time and the user run time. real is wall clock time - time from start to finish of the call. user is the amount of CPU time spent in user-mode code (outside the kernel) within the process, that is, only actual CPU time used in executing the process.

Excersises

E.python.wordcount.1:

Run the three different programs (serial, serial w/ map and reduce, parallel) and answer the following questions:

  1. Is there any performance difference between the different versions of the program?
  2. Does user time significantly differ from real time for any of the versions of the program?
  3. Experiment with different numbers of processes for the parallel example, starting with 1. What is the performance gain when you goal from 1 to 2 processes? From 2 to 3? When do you stop seeing improvement? (this will depend on your machine architecture)

References

2 - NumPy

Gregor von Laszewski (laszewski@gmail.com)

NumPy is a popular library that is used by many other Python packages such as Pandas, SciPy, and scikit-learn. It provides a fast, simple-to-use way of interacting with numerical data organized in vectors and matrices. In this section, we will provide a short introduction to NumPy.

Installing NumPy

The most common way of installing NumPy, if it wasn’t included with your Python installation, is to install it via pip:

$ pip install numpy

If NumPy has already been installed, you can update to the most recent version using:

$ pip install -U numpy

You can verify that NumPy is installed by trying to use it in a Python program:

import numpy as np

Note that, by convention, we import NumPy using the alias ‘np’ - whenever you see ‘np’ sprinkled in example Python code, it’s a good bet that it is using NumPy.

NumPy Basics

At its core, NumPy is a container for n-dimensional data. Typically, 1-dimensional data is called an array and 2-dimensional data is called a matrix. Beyond 2-dimensions would be considered a multidimensional array. Examples, where you’ll encounter these dimensions, may include:

  • 1 Dimensional: time-series data such as audio, stock prices, or a single observation in a dataset.
  • 2 Dimensional: connectivity data between network nodes, user-product recommendations, and database tables.
  • 3+ Dimensional: network latency between nodes over time, video (RGB+time), and version-controlled datasets.

All of these data can be placed into NumPy’s array object, just with varying dimensions.

Data Types: The Basic Building Blocks

Before we delve into arrays and matrices, we will start with the most basic element of those: a single value. NumPy can represent data utilizing many different standard datatypes such as uint8 (an 8-bit usigned integer), float64 (a 64-bit float), or str (a string). An exhaustive listing can be found at:

Before moving on, it is important to know about the tradeoff made when using different datatypes. For example, a uint8 can only contain values between 0 and 255. This, however, contrasts with float64 which can express any value from +/- 1.80e+308. So why wouldn’t we just always use float64s? Though they allow us to be more expressive in terms of numbers, they also consume more memory. If we were working with a 12-megapixel image, for example, storing that image using uint8 values would require 3000 * 4000 * 8 = 96 million bits, or 91.55 MB of memory. If we were to store the same image utilizing float64, our image would consume 8 times as much memory: 768 million bits or 732.42 MB. It is important to use the right data type for the job to avoid consuming unnecessary resources or slowing down processing.

Finally, while NumPy will conveniently convert between datatypes, one must be aware of overflows when using smaller data types. For example:

a = np.array([6], dtype=np.uint8)
print(a)
>>>[6]
a = a + np.array([7], dtype=np.uint8)
print(a)
>>>[13]
a = a + np.array([245], dtype=np.uint8)
print(a)
>>>[2]

In this example, it makes sense that 6+7=13. But how does 13+245=2? Put simply, the object type (uint8) simply ran out of space to store the value and wrapped back around to the beginning. An 8-bit number is only capable of storing 2^8, or 256, unique values. An operation that results in a value above that range will ‘overflow’ and cause the value to wrap back around to zero. Likewise, anything below that range will ‘underflow’ and wrap back around to the end. In our example, 13+245 became 258, which was too large to store in 8 bits and wrapped back around to 0 and ended up at 2.

NumPy will, generally, try to avoid this situation by dynamically retyping to whatever datatype will support the result:

a = a + 260
print(test)
>>>[262]

Here, our addition caused our array, ‘a,’ to be upscaled to use uint16 instead of uint8. Finally, NumPy offers convenience functions akin to Python’s range() function to create arrays of sequential numbers:

X = np.arange(0.2,1,.1)
print(X)
>>>array([0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9], dtype=float32)

We can use this function to also generate parameters spaces that can be iterated on:

P = 10.0 ** np.arange(-7,1,1)
print(P)

for x,p in zip(X,P):
    print('%f, %f' % (x, p))

Arrays: Stringing Things Together

With our knowledge of datatypes in hand, we can begin to explore arrays. Simply put, arrays can be thought of as a sequence of values (not neccesarily numbers). Arrays are 1 dimensional and can be created and accessed simply:

a = np.array([1, 2, 3])
print(type(a))
>>><class 'numpy.ndarray'>
print(a)
>>>[1 2 3]
print(a.shape)
>>>(3,)
a[0]
>>>1

Arrays (and, later, matrices) are zero-indexed. This makes it convenient when, for example, using Python’s range() function to iterate through an array:

for i in range(3):
    print(a[i])
>>>1
>>>2
>>>3

Arrays are, also, mutable and can be changed easily:

a[0] = 42
print(a)
>>>array([42, 2, 3])

NumPy also includes incredibly powerful broadcasting features. This makes it very simple to perform mathematical operations on arrays that also makes intuitive sense:

a * 3
>>>array([3, 6, 9])
a**2
>>>array([1, 4, 9], dtype=int32)

Arrays can also interact with other arrays:

b = np.array([2, 3, 4])
print(a * b)
>>>array([ 2,  6, 12])

In this example, the result of multiplying together two arrays is to take the element-wise product while multiplying by a constant will multiply each element in the array by that constant. NumPy supports all of the basic mathematical operations: addition, subtraction, multiplication, division, and powers. It also includes an extensive suite of mathematical functions, such as log() and max(), which are covered later.

Matrices: An Array of Arrays

Matrices can be thought of as an extension of arrays - rather than having one dimension, matrices have 2 (or more). Much like arrays, matrices can be created easily within NumPy:

m = np.array([[1, 2], [3, 4]])
print(m)
>>>[[1 2]
>>> [3 4]]

Accessing individual elements is similar to how we did it for arrays. We simply need to pass in a number of arguments equal to the number of dimensions:

m[1][0]
>>>3

In this example, our first index selected the row and the second selected the column - giving us our result of 3. Matrices can be extending out to any number of dimensions by simply using more indices to access specific elements (though use-cases beyond 4 may be somewhat rare).

Matrices support all of the normal mathematial functions such as +, -, *, and /. A special note: the * operator will result in an element-wise multiplication. Using @ or np.matmul() for matrix multiplication:

print(m-m)
print(m*m)
print(m/m)

More complex mathematical functions can typically be found within the NumPy library itself:

print(np.sin(x))
print(np.sum(x))

A full listing can be found at: https://docs.scipy.org/doc/numpy/reference/routines.math.html

Slicing Arrays and Matrices

As one can imagine, accessing elements one-at-a-time is both slow and can potentially require many lines of code to iterate over every dimension in the matrix. Thankfully, NumPy incorporate a very powerful slicing engine that allows us to access ranges of elements easily:

m[1, :]
>>>array([3, 4])

The ‘:’ value tells NumPy to select all elements in the given dimension. Here, we’ve requested all elements in the first row. We can also use indexing to request elements within a given range:

a = np.arange(0, 10, 1)
print(a)
>>>[0 1 2 3 4 5 6 7 8 9]
a[4:8]
>>>array([4, 5, 6, 7])

Here, we asked NumPy to give us elements 4 through 7 (ranges in Python are inclusive at the start and non-inclusive at the end). We can even go backwards:

a[-5:]
>>>array([5, 6, 7, 8, 9])

In the previous example, the negative value is asking NumPy to return the last 5 elements of the array. Had the argument been ‘:-5,’ NumPy would’ve returned everything BUT the last five elements:

a[:-5]
>>>array([0, 1, 2, 3, 4])

Becoming more familiar with NumPy’s accessor conventions will allow you write more efficient, clearer code as it is easier to read a simple one-line accessor than it is a multi-line, nested loop when extracting values from an array or matrix.

Useful Functions

The NumPy library provides several convenient mathematical functions that users can use. These functions provide several advantages to code written by users:

  • They are open source typically have multiple contributors checking for errors.
  • Many of them utilize a C interface and will run much faster than native Python code.
  • They’re written to very flexible.

NumPy arrays and matrices contain many useful aggregating functions such as max(), min(), mean(), etc These functions are usually able to run an order of magnitude faster than looping through the object, so it’s important to understand what functions are available to avoid ‘reinventing the wheel.’ In addition, many of the functions are able to sum or average across axes, which make them extremely useful if your data has inherent grouping. To return to a previous example:

m = np.array([[1, 2], [3, 4]])
print(m)
>>>[[1 2]
>>> [3 4]]
m.sum()
>>>10
m.sum(axis=1)
>>>[3, 7]
m.sum(axis=0)
>>>[4, 6]

In this example, we created a 2x2 matrix containing the numbers 1 through 4. The sum of the matrix returned the element-wise addition of the entire matrix. Summing across axis 0 (rows) returned a new array with the element-wise addition across each row. Likewise, summing across axis 1 (columns) returned the columnar summation.

Linear Algebra

Perhaps one of the most important uses for NumPy is its robust support for Linear Algebra functions. Like the aggregation functions described in the previous section, these functions are optimized to be much faster than user implementations and can utilize processesor level features to provide very quick computations. These functions can be accessed very easily from the NumPy package:

a = np.array([[1, 2], [3, 4]])
b = np.array([[5, 6], [7, 8]])
print(np.matmul(a, b))
>>>[[19 22]
    [43 50]]

Included in within np.linalg are functions for calculating the Eigendecomposition of square matrices and symmetric matrices. Finally, to give a quick example of how easy it is to implement algorithms in NumPy, we can easily use it to calculate the cost and gradient when using simple Mean-Squared-Error (MSE):

cost = np.power(Y - np.matmul(X, weights)), 2).mean(axis=1)
gradient = np.matmul(X.T, np.matmul(X, weights) - y)

Finally, more advanced functions are easily available to users via the linalg library of NumPy as:

from numpy import linalg

A = np.diag((1,2,3))

w,v = linalg.eig(A)

print ('w =', w)
print ('v =', v)

NumPy Resources

3 - Scipy

Gregor von Laszewski (laszewski@gmail.com)

SciPy is a library built around NumPy and has a number of off-the-shelf algorithms and operations implemented. These include algorithms from calculus (such as integration), statistics, linear algebra, image-processing, signal processing, machine learning.

To achieve this, SciPy bundles a number of useful open-source software for mathematics, science, and engineering. It includes the following packages:

NumPy,

for managing N-dimensional arrays

SciPy library,

to access fundamental scientific computing capabilities

Matplotlib,

to conduct 2D plotting

IPython,

for an Interactive console (see jupyter)

Sympy,

for symbolic mathematics

pandas,

for providing data structures and analysis

Introduction

First, we add the usual scientific computing modules with the typical abbreviations, including sp for scipy. We could invoke scipy’s statistical package as sp.stats, but for the sake of laziness, we abbreviate that too.

import numpy as np # import numpy
import scipy as sp # import scipy
from scipy import stats # refer directly to stats rather than sp.stats
import matplotlib as mpl # for visualization
from matplotlib import pyplot as plt # refer directly to pyplot
                                     # rather than mpl.pyplot

Now we create some random data to play with. We generate 100 samples from a Gaussian distribution centered at zero.

s = sp.randn(100)

How many elements are in the set?

print ('There are',len(s),'elements in the set')

What is the mean (average) of the set?

print ('The mean of the set is',s.mean())

What is the minimum of the set?

print ('The minimum of the set is',s.min())

What is the maximum of the set?

print ('The maximum of the set is',s.max())

We can use the scipy functions too. What’s the median?

print ('The median of the set is',sp.median(s))

What about the standard deviation and variance?

print ('The standard deviation is',sp.std(s),
       'and the variance is',sp.var(s))

Isn’t the variance the square of the standard deviation?

    print ('The square of the standard deviation is',sp.std(s)**2)

How close are the measures? The differences are close as the following calculation shows

    print ('The difference is',abs(sp.std(s)**2 - sp.var(s)))

    print ('And in decimal form, the difference is %0.16f' %
           (abs(sp.std(s)**2 - sp.var(s))))

How does this look as a histogram? See Figure 1, Figure 2, Figure 3

plt.hist(s) # yes, one line of code for a histogram
plt.show()

Figure 1: Histogram 1

Figure 1: Histogram 1

Let us add some titles.

plt.clf() # clear out the previous plot

plt.hist(s)
plt.title("Histogram Example")
plt.xlabel("Value")
plt.ylabel("Frequency")

plt.show()

Figure 2: Histogram 2

Figure 2: Histogram 2

Typically we do not include titles when we prepare images for inclusion in LaTeX. There we use the caption to describe what the figure is about.

plt.clf() # clear out the previous plot

plt.hist(s)
plt.xlabel("Value")
plt.ylabel("Frequency")

plt.show()

Figure 3: Histogram 3

Figure 3: Histogram 3

Let us try out some linear regression or curve fitting. See @#fig:scipy-output_30_0

import random

def F(x):
    return 2*x - 2

def add_noise(x):
    return x + random.uniform(-1,1)

X = range(0,10,1)

Y = []
for i in range(len(X)):
    Y.append(add_noise(X[i]))

plt.clf() # clear out the old figure
plt.plot(X,Y,'.')
plt.show()

Figure 4: Result 1

Figure 4: Result 1

Now let’s try linear regression to fit the curve.

m, b, r, p, est_std_err = stats.linregress(X,Y)

What is the slope and y-intercept of the fitted curve?

print ('The slope is',m,'and the y-intercept is', b)

def Fprime(x): # the fitted curve
    return m*x + b

Now let’s see how well the curve fits the data. We’ll call the fitted curve F'.

X = range(0,10,1)

Yprime = []
for i in range(len(X)):
    Yprime.append(Fprime(X[i]))

plt.clf() # clear out the old figure

# the observed points, blue dots
plt.plot(X, Y, '.', label='observed points')

# the interpolated curve, connected red line
plt.plot(X, Yprime, 'r-', label='estimated points')

plt.title("Linear Regression Example") # title
plt.xlabel("x") # horizontal axis title
plt.ylabel("y") # vertical axis title
# legend labels to plot
plt.legend(['obsered points', 'estimated points'])

# comment out so that you can save the figure
#plt.show()

To save images into a PDF file for inclusion into LaTeX documents you can save the images as follows. Other formats such as png are also possible, but the quality is naturally not sufficient for inclusion in papers and documents. For that, you certainly want to use PDF. The save of the figure has to occur before you use the show() command. See Figure 5

plt.savefig("regression.pdf", bbox_inches='tight')

plt.savefig('regression.png')

plt.show()

Figure 5: Result 2

Figure 5: Result 2

References

For more information about SciPy we recommend that you visit the following link

https://www.scipy.org/getting-started.html#learning-to-work-with-scipy

Additional material and inspiration for this section are from

[![No

  • [No] Prasanth. “Simple statistics with SciPy.” Comfort at 1 AU. February

[![No 28, 2011. https://oneau.wordpress.com/2011/02/28/simple-statistics-with-scipy/.

  • [No] SciPy Cookbook. Lasted updated: 2015.

[![No http://scipy-cookbook.readthedocs.io/.

No create bibtex entries

No

4 - Scikit-learn

Gregor von Laszewski (laszewski@gmail.com)


Learning Objectives

  • Exploratory data analysis
  • Pipeline to prepare data
  • Full learning pipeline
  • Fine tune the model
  • Significance tests

Introduction to Scikit-learn

Scikit learn is a Machine Learning specific library used in Python. Library can be used for data mining and analysis. It is built on top of NumPy, matplotlib and SciPy. Scikit Learn features Dimensionality reduction, clustering, regression and classification algorithms. It also features model selection using grid search, cross validation and metrics.

Scikit learn also enables users to preprocess the data which can then be used for machine learning using modules like preprocessing and feature extraction.

In this section we demonstrate how simple it is to use k-means in scikit learn.

Installation

If you already have a working installation of numpy and scipy, the easiest way to install scikit-learn is using pip

$ pip install numpy
$ pip install scipy -U
$ pip install -U scikit-learn

Supervised Learning

Supervised Learning is used in machine learning when we already know a set of output predictions based on input characteristics and based on that we need to predict the target for a new input. Training data is used to train the model which then can be used to predict the output from a bounded set.

Problems can be of two types

  1. Classification : Training data belongs to three or four classes/categories and based on the label we want to predict the class/category for the unlabeled data.
  2. Regression : Training data consists of vectors without any corresponding target values. Clustering can be used for these type of datasets to determine discover groups of similar examples. Another way is density estimation which determine the distribution of data within the input space. Histogram is the most basic form.

Unsupervised Learning

Unsupervised Learning is used in machine learning when we have the training set available but without any corresponding target. The outcome of the problem is to discover groups within the provided input. It can be done in many ways.

Few of them are listed here

  1. Clustering : Discover groups of similar characteristics.
  2. Density Estimation : Finding the distribution of data within the provided input or changing the data from a high dimensional space to two or three dimension.

Building a end to end pipeline for Supervised machine learning using Scikit-learn

A data pipeline is a set of processing components that are sequenced to produce meaningful data. Pipelines are commonly used in Machine learning, since there is lot of data transformation and manipulation that needs to be applied to make data useful for machine learning. All components are sequenced in a way that the output of one component becomes input for the next and each of the component is self contained. Components interact with each other using data.

Even if a component breaks, the downstream component can run normally using the last output. Sklearn provide the ability to build pipelines that can be transformed and modeled for machine learning.

Steps for developing a machine learning model

  1. Explore the domain space
  2. Extract the problem definition
  3. Get the data that can be used to make the system learn to solve the problem definition.
  4. Discover and Visualize the data to gain insights
  5. Feature engineering and prepare the data
  6. Fine tune your model
  7. Evaluate your solution using metrics
  8. Once proven launch and maintain the model.

Exploratory Data Analysis

Example project = Fraud detection system

First step is to load the data into a dataframe in order for a proper analysis to be done on the attributes.

data = pd.read_csv('dataset/data_file.csv')
data.head()

Perform the basic analysis on the data shape and null value information.

print(data.shape)
print(data.info())
data.isnull().values.any()

Here is the example of few of the visual data analysis methods.

Bar plot

A bar chart or graph is a graph with rectangular bars or bins that are used to plot categorical values. Each bar in the graph represents a categorical variable and the height of the bar is proportional to the value represented by it.

Bar graphs are used:

To make comparisons between variables To visualize any trend in the data, i.e., they show the dependence of one variable on another Estimate values of a variable

plt.ylabel('Transactions')
plt.xlabel('Type')
data.type.value_counts().plot.bar()

Figure 1: Example of scikit-learn barplots

Figure 1: Example of scikit-learn barplots

Correlation between attributes

Attributes in a dataset can be related based on differnt aspects.

Examples include attributes dependent on another or could be loosely or tightly coupled. Also example includes two variables can be associated with a third one.

In order to understand the relationship between attributes, correlation represents the best visual way to get an insight. Positive correlation meaning both attributes moving into the same direction. Negative correlation refers to opposte directions. One attributes values increase results in value decrease for other. Zero correlation is when the attributes are unrelated.

# compute the correlation matrix
corr = data.corr()

# generate a mask for the lower triangle
mask = np.zeros_like(corr, dtype=np.bool)
mask[np.triu_indices_from(mask)] = True

# set up the matplotlib figure
f, ax = plt.subplots(figsize=(18, 18))

# generate a custom diverging color map
cmap = sns.diverging_palette(220, 10, as_cmap=True)

# draw the heatmap with the mask and correct aspect ratio
sns.heatmap(corr, mask=mask, cmap=cmap, vmax=.3,
            square=True,
            linewidths=.5, cbar_kws={"shrink": .5}, ax=ax);

Figure 2: scikit-learn correlation array

Figure 2: scikit-learn correlation array

Histogram Analysis of dataset attributes

A histogram consists of a set of counts that represent the number of times some event occurred.

%matplotlib inline
data.hist(bins=30, figsize=(20,15))
plt.show()

Figure 3: scikit-learn

Figure 3: scikit-learn

Box plot Analysis

Box plot analysis is useful in detecting whether a distribution is skewed and detect outliers in the data.

fig, axs = plt.subplots(2, 2, figsize=(10, 10))
tmp = data.loc[(data.type == 'TRANSFER'), :]

a = sns.boxplot(x = 'isFlaggedFraud', y = 'amount', data = tmp, ax=axs[0][0])
axs[0][0].set_yscale('log')
b = sns.boxplot(x = 'isFlaggedFraud', y = 'oldbalanceDest', data = tmp, ax=axs[0][1])
axs[0][1].set(ylim=(0, 0.5e8))
c = sns.boxplot(x = 'isFlaggedFraud', y = 'oldbalanceOrg', data=tmp, ax=axs[1][0])
axs[1][0].set(ylim=(0, 3e7))
d = sns.regplot(x = 'oldbalanceOrg', y = 'amount', data=tmp.loc[(tmp.isFlaggedFraud ==1), :], ax=axs[1][1])
plt.show()

Figure 4: scikit-learn

Figure 4: scikit-learn

Scatter plot Analysis

The scatter plot displays values of two numerical variables as Cartesian coordinates.

plt.figure(figsize=(12,8))
sns.pairplot(data[['amount', 'oldbalanceOrg', 'oldbalanceDest', 'isFraud']], hue='isFraud')

Figure 5: scikit-learn scatter plots

Figure 5: scikit-learn scatter plots

Data Cleansing - Removing Outliers

If the transaction amount is lower than 5 percent of the all the transactions AND does not exceed USD 3000, we will exclude it from our analysis to reduce Type 1 costs If the transaction amount is higher than 95 percent of all the transactions AND exceeds USD 500000, we will exclude it from our analysis, and use a blanket review process for such transactions (similar to isFlaggedFraud column in original dataset) to reduce Type 2 costs

low_exclude = np.round(np.minimum(fin_samp_data.amount.quantile(0.05), 3000), 2)
high_exclude = np.round(np.maximum(fin_samp_data.amount.quantile(0.95), 500000), 2)

###Updating Data to exclude records prone to Type 1 and Type 2 costs
low_data = fin_samp_data[fin_samp_data.amount > low_exclude]
data = low_data[low_data.amount < high_exclude]

Pipeline Creation

Machine learning pipeline is used to help automate machine learning workflows. They operate by enabling a sequence of data to be transformed and correlated together in a model that can be tested and evaluated to achieve an outcome, whether positive or negative.

Defining DataFrameSelector to separate Numerical and Categorical attributes

Sample function to seperate out Numerical and categorical attributes.

from sklearn.base import BaseEstimator, TransformerMixin

# Create a class to select numerical or categorical columns
# since Scikit-Learn doesn't handle DataFrames yet
class DataFrameSelector(BaseEstimator, TransformerMixin):
    def __init__(self, attribute_names):
        self.attribute_names = attribute_names
    def fit(self, X, y=None):
        return self
    def transform(self, X):
        return X[self.attribute_names].values

Feature Creation / Additional Feature Engineering

During EDA we identified that there are transactions where the balances do not tally after the transaction is completed.We believe this could potentially be cases where fraud is occurring. To account for this error in the transactions, we define two new features"errorBalanceOrig" and “errorBalanceDest,” calculated by adjusting the amount with the before and after balances for the Originator and Destination accounts.

Below, we create a function that allows us to create these features in a pipeline.

from sklearn.base import BaseEstimator, TransformerMixin

# column index
amount_ix, oldbalanceOrg_ix, newbalanceOrig_ix, oldbalanceDest_ix, newbalanceDest_ix = 0, 1, 2, 3, 4

class CombinedAttributesAdder(BaseEstimator, TransformerMixin):
    def __init__(self): # no *args or **kargs
        pass
    def fit(self, X, y=None):
        return self  # nothing else to do
    def transform(self, X, y=None):
        errorBalanceOrig = X[:,newbalanceOrig_ix] +  X[:,amount_ix] -  X[:,oldbalanceOrg_ix]
        errorBalanceDest = X[:,oldbalanceDest_ix] +  X[:,amount_ix]-  X[:,newbalanceDest_ix]

        return np.c_[X, errorBalanceOrig, errorBalanceDest]

Creating Training and Testing datasets

Training set includes the set of input examples that the model will be fit into or trained on by adjusting the parameters. Testing dataset is critical to test the generalizability of the model . By using this set, we can get the working accuracy of our model.

Testing set should not be exposed to model unless model training has not been completed. This way the results from testing will be more reliable.

from sklearn.model_selection import train_test_split
X_train, X_test, y_train, y_test = train_test_split(X,y,test_size=0.30, random_state=42, stratify=y)

Creating pipeline for numerical and categorical attributes

Identifying columns with Numerical and Categorical characteristics.

X_train_num = X_train[["amount","oldbalanceOrg", "newbalanceOrig", "oldbalanceDest", "newbalanceDest"]]
X_train_cat = X_train[["type"]]
X_model_col = ["amount","oldbalanceOrg", "newbalanceOrig", "oldbalanceDest", "newbalanceDest","type"]
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler
from sklearn.preprocessing import Imputer

num_attribs = list(X_train_num)
cat_attribs = list(X_train_cat)

num_pipeline = Pipeline([
        ('selector', DataFrameSelector(num_attribs)),
        ('attribs_adder', CombinedAttributesAdder()),
        ('std_scaler', StandardScaler())
    ])

cat_pipeline = Pipeline([
        ('selector', DataFrameSelector(cat_attribs)),
        ('cat_encoder', CategoricalEncoder(encoding="onehot-dense"))
    ])

Selecting the algorithm to be applied

Algorithim selection primarily depends on the objective you are trying to solve and what kind of dataset is available. There are differnt type of algorithms which can be applied and we will look into few of them here.

Linear Regression

This algorithm can be applied when you want to compute some continuous value. To predict some future value of a process which is currently running, you can go with regression algorithm.

Examples where linear regression can used are :

  1. Predict the time taken to go from one place to another
  2. Predict the sales for a future month
  3. Predict sales data and improve yearly projections.
from sklearn.linear_model import LinearRegression
from sklearn.preprocessing import StandardScaler
import time
scl= StandardScaler()
X_train_std = scl.fit_transform(X_train)
X_test_std = scl.transform(X_test)
start = time.time()
lin_reg = LinearRegression()
lin_reg.fit(X_train_std, y_train) #SKLearn's linear regression
y_train_pred = lin_reg.predict(X_train_std)
train_time = time.time()-start

Logistic Regression

This algorithm can be used to perform binary classification. It can be used if you want a probabilistic framework. Also in case you expect to receive more training data in the future that you want to be able to quickly incorporate into your model.

  1. Customer churn prediction.
  2. Credit Scoring & Fraud Detection which is our example problem which we are trying to solve in this chapter.
  3. Calculating the effectiveness of marketing campaigns.
from sklearn.linear_model import LogisticRegression
from sklearn.model_selection import train_test_split

X_train, _, y_train, _ = train_test_split(X_train, y_train, stratify=y_train, train_size=subsample_rate, random_state=42)
X_test, _, y_test, _ = train_test_split(X_test, y_test, stratify=y_test, train_size=subsample_rate, random_state=42)

model_lr_sklearn = LogisticRegression(multi_class="multinomial", C=1e6, solver="sag", max_iter=15)
model_lr_sklearn.fit(X_train, y_train)

y_pred_test = model_lr_sklearn.predict(X_test)
acc = accuracy_score(y_test, y_pred_test)
results.loc[len(results)] = ["LR Sklearn", np.round(acc, 3)]
results

Decision trees

Decision trees handle feature interactions and they’re non-parametric. Doesnt support online learning and the entire tree needs to be rebuild when new traning dataset comes in. Memory consumption is very high.

Can be used for the following cases

  1. Investment decisions
  2. Customer churn
  3. Banks loan defaulters
  4. Build vs Buy decisions
  5. Sales lead qualifications
from sklearn.tree import DecisionTreeRegressor
dt = DecisionTreeRegressor()
start = time.time()
dt.fit(X_train_std, y_train)
y_train_pred = dt.predict(X_train_std)
train_time = time.time() - start

start = time.time()
y_test_pred = dt.predict(X_test_std)
test_time = time.time() - start

K Means

This algorithm is used when we are not aware of the labels and one needs to be created based on the features of objects. Example will be to divide a group of people into differnt subgroups based on common theme or attribute.

The main disadvantage of K-mean is that you need to know exactly the number of clusters or groups which is required. It takes a lot of iteration to come up with the best K.

from sklearn.neighbors import KNeighborsClassifier
from sklearn.model_selection import train_test_split, GridSearchCV, PredefinedSplit
from sklearn.metrics import accuracy_score

X_train, _, y_train, _ = train_test_split(X_train, y_train, stratify=y_train, train_size=subsample_rate, random_state=42)
X_test, _, y_test, _ = train_test_split(X_test, y_test, stratify=y_test, train_size=subsample_rate, random_state=42)

model_knn_sklearn = KNeighborsClassifier(n_jobs=-1)
model_knn_sklearn.fit(X_train, y_train)

y_pred_test = model_knn_sklearn.predict(X_test)
acc = accuracy_score(y_test, y_pred_test)

results.loc[len(results)] = ["KNN Arbitary Sklearn", np.round(acc, 3)]
results

Support Vector Machines

SVM is a supervised ML technique and used for pattern recognition and classification problems when your data has exactly two classes. Its popular in text classification problems.

Few cases where SVM can be used is

  1. Detecting persons with common diseases.
  2. Hand-written character recognition
  3. Text categorization
  4. Stock market price prediction

Naive Bayes

Naive Bayes is used for large datasets.This algoritm works well even when we have a limited CPU and memory available. This works by calculating bunch of counts. It requires less training data. The algorthim cant learn interation between features.

Naive Bayes can be used in real-world applications such as:

  1. Sentiment analysis and text classification
  2. Recommendation systems like Netflix, Amazon
  3. To mark an email as spam or not spam
  4. Face recognition

Random Forest

Ranmdon forest is similar to Decision tree. Can be used for both regression and classification problems with large data sets.

Few case where it can be applied.

  1. Predict patients for high risks.
  2. Predict parts failures in manufacturing.
  3. Predict loan defaulters.
from sklearn.ensemble import RandomForestRegressor
forest = RandomForestRegressor(n_estimators = 400, criterion='mse',random_state=1, n_jobs=-1)
start = time.time()
forest.fit(X_train_std, y_train)
y_train_pred = forest.predict(X_train_std)
train_time = time.time() - start

start = time.time()
y_test_pred = forest.predict(X_test_std)
test_time = time.time() - start

Neural networks

Neural network works based on weights of connections between neurons. Weights are trained and based on that the neural network can be utilized to predict the class or a quantity. They are resource and memory intensive.

Few cases where it can be applied.

  1. Applied to unsupervised learning tasks, such as feature extraction.
  2. Extracts features from raw images or speech with much less human intervention

Deep Learning using Keras

Keras is most powerful and easy-to-use Python libraries for developing and evaluating deep learning models. It has the efficient numerical computation libraries Theano and TensorFlow.

XGBoost

XGBoost stands for eXtreme Gradient Boosting. XGBoost is an implementation of gradient boosted decision trees designed for speed and performance. It is engineered for efficiency of compute time and memory resources.

Scikit Cheat Sheet

Scikit learning has put a very indepth and well explained flow chart to help you choose the right algorithm that I find very handy.

Figure 6: scikit-learn

Figure 6: scikit-learn

Parameter Optimization

Machine learning models are parameterized so that their behavior can be tuned for a given problem. These models can have many parameters and finding the best combination of parameters can be treated as a search problem.

A parameter is a configurationthat is part of the model and values can be derived from the given data.

  1. Required by the model when making predictions.
  2. Values define the skill of the model on your problem.
  3. Estimated or learned from data.
  4. Often not set manually by the practitioner.
  5. Often saved as part of the learned model.

Hyperparameter optimization/tuning algorithms

Grid search is an approach to hyperparameter tuning that will methodically build and evaluate a model for each combination of algorithm parameters specified in a grid.

Random search provide a statistical distribution for each hyperparameter from which values may be randomly sampled.

Experiments with Keras (deep learning), XGBoost, and SVM (SVC) compared to Logistic Regression(Baseline)

Creating a parameter grid

grid_param = [
                [{   #LogisticRegression
                   'model__penalty':['l1','l2'],
                   'model__C': [0.01, 1.0, 100]
                }],

                [{#keras
                    'model__optimizer': optimizer,
                    'model__loss': loss
                }],

                [{  #SVM
                   'model__C' :[0.01, 1.0, 100],
                   'model__gamma': [0.5, 1],
                   'model__max_iter':[-1]
                }],

                [{   #XGBClassifier
                    'model__min_child_weight': [1, 3, 5],
                    'model__gamma': [0.5],
                    'model__subsample': [0.6, 0.8],
                    'model__colsample_bytree': [0.6],
                    'model__max_depth': [3]

                }]
            ]

Implementing Grid search with models and also creating metrics from each of the model.

Pipeline(memory=None,
     steps=[('preparation', FeatureUnion(n_jobs=None,
       transformer_list=[('num_pipeline', Pipeline(memory=None,
     steps=[('selector', DataFrameSelector(attribute_names=['amount', 'oldbalanceOrg', 'newbalanceOrig', 'oldbalanceDest', 'newbalanceDest'])), ('attribs_adder', CombinedAttributesAdder()...penalty='l2', random_state=None, solver='warn',
          tol=0.0001, verbose=0, warm_start=False))])
from sklearn.metrics import mean_squared_error
from sklearn.metrics import classification_report
from sklearn.metrics import f1_score
from xgboost.sklearn import XGBClassifier
from sklearn.svm import SVC

test_scores = []
#Machine Learning Algorithm (MLA) Selection and Initialization
MLA = [
        linear_model.LogisticRegression(),
        keras_model,
        SVC(),
        XGBClassifier()

      ]

#create table to compare MLA metrics
MLA_columns = ['Name', 'Score', 'Accuracy_Score','ROC_AUC_score','final_rmse','Classification_error','Recall_Score','Precision_Score', 'mean_test_score', 'mean_fit_time', 'F1_Score']
MLA_compare = pd.DataFrame(columns = MLA_columns)
Model_Scores = pd.DataFrame(columns = ['Name','Score'])

row_index = 0
for alg in MLA:

    #set name and parameters
    MLA_name = alg.__class__.__name__
    MLA_compare.loc[row_index, 'Name'] = MLA_name
    #MLA_compare.loc[row_index, 'Parameters'] = str(alg.get_params())


    full_pipeline_with_predictor = Pipeline([
        ("preparation", full_pipeline),  # combination of numerical and categorical pipelines
        ("model", alg)
    ])

    grid_search = GridSearchCV(full_pipeline_with_predictor, grid_param[row_index], cv=4, verbose=2, scoring='f1', return_train_score=True)

    grid_search.fit(X_train[X_model_col], y_train)
    y_pred = grid_search.predict(X_test)

    MLA_compare.loc[row_index, 'Accuracy_Score'] = np.round(accuracy_score(y_pred, y_test), 3)
    MLA_compare.loc[row_index, 'ROC_AUC_score'] = np.round(metrics.roc_auc_score(y_test, y_pred),3)
    MLA_compare.loc[row_index,'Score'] = np.round(grid_search.score(X_test, y_test),3)

    negative_mse = grid_search.best_score_
    scores = np.sqrt(-negative_mse)
    final_mse = mean_squared_error(y_test, y_pred)
    final_rmse = np.sqrt(final_mse)
    MLA_compare.loc[row_index, 'final_rmse'] = final_rmse

    confusion_matrix_var = confusion_matrix(y_test, y_pred)
    TP = confusion_matrix_var[1, 1]
    TN = confusion_matrix_var[0, 0]
    FP = confusion_matrix_var[0, 1]
    FN = confusion_matrix_var[1, 0]
    MLA_compare.loc[row_index,'Classification_error'] = np.round(((FP + FN) / float(TP + TN + FP + FN)), 5)
    MLA_compare.loc[row_index,'Recall_Score'] = np.round(metrics.recall_score(y_test, y_pred), 5)
    MLA_compare.loc[row_index,'Precision_Score'] = np.round(metrics.precision_score(y_test, y_pred), 5)
    MLA_compare.loc[row_index,'F1_Score'] = np.round(f1_score(y_test,y_pred), 5)


    MLA_compare.loc[row_index, 'mean_test_score'] = grid_search.cv_results_['mean_test_score'].mean()
    MLA_compare.loc[row_index, 'mean_fit_time'] = grid_search.cv_results_['mean_fit_time'].mean()

    Model_Scores.loc[row_index,'MLA Name'] = MLA_name
    Model_Scores.loc[row_index,'ML Score'] = np.round(metrics.roc_auc_score(y_test, y_pred),3)

    #Collect Mean Test scores for statistical significance test
    test_scores.append(grid_search.cv_results_['mean_test_score'])
    row_index+=1

Results table from the Model evaluation with metrics.

Figure 7: scikit-learn

Figure 7: scikit-learn

ROC AUC Score

AUC - ROC curve is a performance measurement for classification problem at various thresholds settings. ROC is a probability curve and AUC represents degree or measure of separability. It tells how much model is capable of distinguishing between classes. Higher the AUC, better the model is at predicting 0s as 0s and 1s as 1s.

Figure 8: scikit-learn

Figure 8: scikit-learn

Figure 9: scikit-learn

Figure 9: scikit-learn

K-means in scikit learn.

Import

K-means Algorithm

In this section we demonstrate how simple it is to use k-means in scikit learn.

Import

    from time import time
    import numpy as np
    import matplotlib.pyplot as plt
    from sklearn import metrics
    from sklearn.cluster import KMeans
    from sklearn.datasets import load_digits
    from sklearn.decomposition import PCA
    from sklearn.preprocessing import scale

Create samples

    np.random.seed(42)

    digits = load_digits()
    data = scale(digits.data)

Create samples

    np.random.seed(42)

    digits = load_digits()
    data = scale(digits.data)

    n_samples, n_features = data.shape
    n_digits = len(np.unique(digits.target))
    labels = digits.target

    sample_size = 300

    print("n_digits: %d, \t n_samples %d, \t n_features %d" % (n_digits, n_samples, n_features))
    print(79 * '_')
    print('% 9s' % 'init' '    time  inertia    homo   compl  v-meas     ARI AMI  silhouette')
    print("n_digits: %d, \t n_samples %d, \t n_features %d"
          % (n_digits, n_samples, n_features))


    print(79 * '_')
    print('% 9s' % 'init'
          '    time  inertia    homo   compl  v-meas     ARI AMI  silhouette')


    def bench_k_means(estimator, name, data):
        t0 = time()
        estimator.fit(data)
        print('% 9s   %.2fs    %i   %.3f   %.3f   %.3f   %.3f   %.3f    %.3f'
              % (name, (time() - t0), estimator.inertia_,
                 metrics.homogeneity_score(labels, estimator.labels_),
                 metrics.completeness_score(labels, estimator.labels_),
                 metrics.v_measure_score(labels, estimator.labels_),
                 metrics.adjusted_rand_score(labels, estimator.labels_),
                 metrics.adjusted_mutual_info_score(labels,  estimator.labels_),

                 metrics.silhouette_score(data, estimator.labels_,metric='euclidean',sample_size=sample_size)))

    bench_k_means(KMeans(init='k-means++', n_clusters=n_digits, n_init=10), name="k-means++", data=data)

    bench_k_means(KMeans(init='random', n_clusters=n_digits, n_init=10), name="random", data=data)

                 metrics.silhouette_score(data, estimator.labels_,
                                          metric='euclidean',
                                          sample_size=sample_size)))

    bench_k_means(KMeans(init='k-means++', n_clusters=n_digits, n_init=10),
                  name="k-means++", data=data)

    bench_k_means(KMeans(init='random', n_clusters=n_digits, n_init=10),
                  name="random", data=data)


    # in this case the seeding of the centers is deterministic, hence we run the
    # kmeans algorithm only once with n_init=1
    pca = PCA(n_components=n_digits).fit(data)

    bench_k_means(KMeans(init=pca.components_,n_clusters=n_digits, n_init=1),name="PCA-based", data=data)
    print(79 * '_')

Visualize

See Figure 10

    bench_k_means(KMeans(init=pca.components_,
                         n_clusters=n_digits, n_init=1),
                  name="PCA-based",
                  data=data)
    print(79 * '_')

Visualize

See Figure 10

    reduced_data = PCA(n_components=2).fit_transform(data)
    kmeans = KMeans(init='k-means++', n_clusters=n_digits, n_init=10)
    kmeans.fit(reduced_data)

    # Step size of the mesh. Decrease to increase the quality of the VQ.
    h = .02     # point in the mesh [x_min, x_max]x[y_min, y_max].

    # Plot the decision boundary. For that, we will assign a color to each
    x_min, x_max = reduced_data[:, 0].min() - 1, reduced_data[:, 0].max() + 1
    y_min, y_max = reduced_data[:, 1].min() - 1, reduced_data[:, 1].max() + 1
    xx, yy = np.meshgrid(np.arange(x_min, x_max, h), np.arange(y_min, y_max, h))

    # Obtain labels for each point in mesh. Use last trained model.
    Z = kmeans.predict(np.c_[xx.ravel(), yy.ravel()])

    # Put the result into a color plot
    Z = Z.reshape(xx.shape)
    plt.figure(1)
    plt.clf()
    plt.imshow(Z, interpolation='nearest',
               extent=(xx.min(), xx.max(), yy.min(), yy.max()),
               cmap=plt.cm.Paired,
               aspect='auto', origin='lower')

    plt.plot(reduced_data[:, 0], reduced_data[:, 1], 'k.', markersize=2)
    # Plot the centroids as a white X
    centroids = kmeans.cluster_centers_
    plt.scatter(centroids[:, 0], centroids[:, 1],
                marker='x', s=169, linewidths=3,
                color='w', zorder=10)
    plt.title('K-means clustering on the digits dataset (PCA-reduced data)\n'
              'Centroids are marked with white cross')
    plt.xlim(x_min, x_max)
    plt.ylim(y_min, y_max)
    plt.xticks(())
    plt.yticks(())
    plt.show()

Figure 10: Result

Figure 10: Result

5 - Dask - Random Forest Feature Detection

Gregor von Laszewski (laszewski@gmail.com)

Setup

First we need our tools. pandas gives us the DataFrame, very similar to R’s DataFrames. The DataFrame is a structure that allows us to work with our data more easily. It has nice features for slicing and transformation of data, and easy ways to do basic statistics.

numpy has some very handy functions that work on DataFrames.

Dataset

We are using a dataset about the wine quality dataset, archived at UCI’s Machine Learning Repository (http://archive.ics.uci.edu/ml/index.php).

import pandas as pd
import numpy as np

Now we will load our data. pandas makes it easy!

# red wine quality data, packed in a DataFrame
red_df = pd.read_csv('winequality-red.csv',sep=';',header=0, index_col=False)

# white wine quality data, packed in a DataFrame
white_df = pd.read_csv('winequality-white.csv',sep=';',header=0,index_col=False)

# rose? other fruit wines? plum wine? :(

Like in R, there is a .describe() method that gives basic statistics for every column in the dataset.

# for red wines
red_df.describe()
<style>
    .dataframe thead tr:only-child th {
        text-align: right;
    }

    .dataframe thead th {
        text-align: left;
    }

    .dataframe tbody tr th {
        vertical-align: top;
    }
</style>
<table border="1" class="dataframe">
<thead>
<tr style="text-align: right;">
<th>
</th>
<th>

fixed acidity

</th>
<th>

volatile acidity

</th>
<th>

citric acid

</th>
<th>

residual sugar

</th>
<th>

chlorides

</th>
<th>

free sulfur dioxide

</th>
<th>

total sulfur dioxide

</th>
<th>

density

</th>
<th>

pH

</th>
<th>

sulphates

</th>
<th>

alcohol

</th>
<th>

quality

</th>
</tr>
</thead>
<tbody>
<tr>
<th>

count

</th>
<td>

1599.000000

</td>
<td>

1599.000000

</td>
<td>

1599.000000

</td>
<td>

1599.000000

</td>
<td>

1599.000000

</td>
<td>

1599.000000

</td>
<td>

1599.000000

</td>
<td>

1599.000000

</td>
<td>

1599.000000

</td>
<td>

1599.000000

</td>
<td>

1599.000000

</td>
<td>

1599.000000

</td>
</tr>
<tr>
<th>

mean

</th>
<td>

8.319637

</td>
<td>

0.527821

</td>
<td>

0.270976

</td>
<td>

2.538806

</td>
<td>

0.087467

</td>
<td>

15.874922

</td>
<td>

46.467792

</td>
<td>

0.996747

</td>
<td>

3.311113

</td>
<td>

0.658149

</td>
<td>

10.422983

</td>
<td>

5.636023

</td>
</tr>
<tr>
<th>

std

</th>
<td>

1.741096

</td>
<td>

0.179060

</td>
<td>

0.194801

</td>
<td>

1.409928

</td>
<td>

0.047065

</td>
<td>

10.460157

</td>
<td>

32.895324

</td>
<td>

0.001887

</td>
<td>

0.154386

</td>
<td>

0.169507

</td>
<td>

1.065668

</td>
<td>

0.807569

</td>
</tr>
<tr>
<th>

min

</th>
<td>

4.600000

</td>
<td>

0.120000

</td>
<td>

0.000000

</td>
<td>

0.900000

</td>
<td>

0.012000

</td>
<td>

1.000000

</td>
<td>

6.000000

</td>
<td>

0.990070

</td>
<td>

2.740000

</td>
<td>

0.330000

</td>
<td>

8.400000

</td>
<td>

3.000000

</td>
</tr>
<tr>
<th>

25%

</th>
<td>

7.100000

</td>
<td>

0.390000

</td>
<td>

0.090000

</td>
<td>

1.900000

</td>
<td>

0.070000

</td>
<td>

7.000000

</td>
<td>

22.000000

</td>
<td>

0.995600

</td>
<td>

3.210000

</td>
<td>

0.550000

</td>
<td>

9.500000

</td>
<td>

5.000000

</td>
</tr>
<tr>
<th>

50%

</th>
<td>

7.900000

</td>
<td>

0.520000

</td>
<td>

0.260000

</td>
<td>

2.200000

</td>
<td>

0.079000

</td>
<td>

14.000000

</td>
<td>

38.000000

</td>
<td>

0.996750

</td>
<td>

3.310000

</td>
<td>

0.620000

</td>
<td>

10.200000

</td>
<td>

6.000000

</td>
</tr>
<tr>
<th>

75%

</th>
<td>

9.200000

</td>
<td>

0.640000

</td>
<td>

0.420000

</td>
<td>

2.600000

</td>
<td>

0.090000

</td>
<td>

21.000000

</td>
<td>

62.000000

</td>
<td>

0.997835

</td>
<td>

3.400000

</td>
<td>

0.730000

</td>
<td>

11.100000

</td>
<td>

6.000000

</td>
</tr>
<tr>
<th>

max

</th>
<td>

15.900000

</td>
<td>

1.580000

</td>
<td>

1.000000

</td>
<td>

15.500000

</td>
<td>

0.611000

</td>
<td>

72.000000

</td>
<td>

289.000000

</td>
<td>

1.003690

</td>
<td>

4.010000

</td>
<td>

2.000000

</td>
<td>

14.900000

</td>
<td>

8.000000

</td>
</tr>
</tbody>
</table>
# for white wines
white_df.describe()
<style>
    .dataframe thead tr:only-child th {
        text-align: right;
    }

    .dataframe thead th {
        text-align: left;
    }

    .dataframe tbody tr th {
        vertical-align: top;
    }
</style>
<table border="1" class="dataframe">
<thead>
<tr style="text-align: right;">
<th>
</th>
<th>

fixed acidity

</th>
<th>

volatile acidity

</th>
<th>

citric acid

</th>
<th>

residual sugar

</th>
<th>

chlorides

</th>
<th>

free sulfur dioxide

</th>
<th>

total sulfur dioxide

</th>
<th>

density

</th>
<th>

pH

</th>
<th>

sulphates

</th>
<th>

alcohol

</th>
<th>

quality

</th>
</tr>
</thead>
<tbody>
<tr>
<th>

count

</th>
<td>

4898.000000

</td>
<td>

4898.000000

</td>
<td>

4898.000000

</td>
<td>

4898.000000

</td>
<td>

4898.000000

</td>
<td>

4898.000000

</td>
<td>

4898.000000

</td>
<td>

4898.000000

</td>
<td>

4898.000000

</td>
<td>

4898.000000

</td>
<td>

4898.000000

</td>
<td>

4898.000000

</td>
</tr>
<tr>
<th>

mean

</th>
<td>

6.854788

</td>
<td>

0.278241

</td>
<td>

0.334192

</td>
<td>

6.391415

</td>
<td>

0.045772

</td>
<td>

35.308085

</td>
<td>

138.360657

</td>
<td>

0.994027

</td>
<td>

3.188267

</td>
<td>

0.489847

</td>
<td>

10.514267

</td>
<td>

5.877909

</td>
</tr>
<tr>
<th>

std

</th>
<td>

0.843868

</td>
<td>

0.100795

</td>
<td>

0.121020

</td>
<td>

5.072058

</td>
<td>

0.021848

</td>
<td>

17.007137

</td>
<td>

42.498065

</td>
<td>

0.002991

</td>
<td>

0.151001

</td>
<td>

0.114126

</td>
<td>

1.230621

</td>
<td>

0.885639

</td>
</tr>
<tr>
<th>

min

</th>
<td>

3.800000

</td>
<td>

0.080000

</td>
<td>

0.000000

</td>
<td>

0.600000

</td>
<td>

0.009000

</td>
<td>

2.000000

</td>
<td>

9.000000

</td>
<td>

0.987110

</td>
<td>

2.720000

</td>
<td>

0.220000

</td>
<td>

8.000000

</td>
<td>

3.000000

</td>
</tr>
<tr>
<th>

25%

</th>
<td>

6.300000

</td>
<td>

0.210000

</td>
<td>

0.270000

</td>
<td>

1.700000

</td>
<td>

0.036000

</td>
<td>

23.000000

</td>
<td>

108.000000

</td>
<td>

0.991723

</td>
<td>

3.090000

</td>
<td>

0.410000

</td>
<td>

9.500000

</td>
<td>

5.000000

</td>
</tr>
<tr>
<th>

50%

</th>
<td>

6.800000

</td>
<td>

0.260000

</td>
<td>

0.320000

</td>
<td>

5.200000

</td>
<td>

0.043000

</td>
<td>

34.000000

</td>
<td>

134.000000

</td>
<td>

0.993740

</td>
<td>

3.180000

</td>
<td>

0.470000

</td>
<td>

10.400000

</td>
<td>

6.000000

</td>
</tr>
<tr>
<th>

75%

</th>
<td>

7.300000

</td>
<td>

0.320000

</td>
<td>

0.390000

</td>
<td>

9.900000

</td>
<td>

0.050000

</td>
<td>

46.000000

</td>
<td>

167.000000

</td>
<td>

0.996100

</td>
<td>

3.280000

</td>
<td>

0.550000

</td>
<td>

11.400000

</td>
<td>

6.000000

</td>
</tr>
<tr>
<th>

max

</th>
<td>

14.200000

</td>
<td>

1.100000

</td>
<td>

1.660000

</td>
<td>

65.800000

</td>
<td>

0.346000

</td>
<td>

289.000000

</td>
<td>

440.000000

</td>
<td>

1.038980

</td>
<td>

3.820000

</td>
<td>

1.080000

</td>
<td>

14.200000

</td>
<td>

9.000000

</td>
</tr>
</tbody>
</table>

Sometimes it is easier to understand the data visually. A histogram of the white wine quality data citric acid samples is shown next. You can of course visualize other columns' data or other datasets. Just replace the DataFrame and column name (see Figure 1).

import matplotlib.pyplot as plt

def extract_col(df,col_name):
    return list(df[col_name])

col = extract_col(white_df,'citric acid') # can replace with another dataframe or column
plt.hist(col)

#TODO: add axes and such to set a good example

plt.show()

Figure 1: Histogram

Figure 1: Histogram

Detecting Features

Let us try out a some elementary machine learning models. These models are not always for prediction. They are also useful to find what features are most predictive of a variable of interest. Depending on the classifier you use, you may need to transform the data pertaining to that variable.

Data Preparation

Let us assume we want to study what features are most correlated with pH. pH of course is real-valued, and continuous. The classifiers we want to use usually need labeled or integer data. Hence, we will transform the pH data, assigning wines with pH higher than average as hi (more basic or alkaline) and wines with pH lower than average as lo (more acidic).

# refresh to make Jupyter happy
red_df = pd.read_csv('winequality-red.csv',sep=';',header=0, index_col=False)
white_df = pd.read_csv('winequality-white.csv',sep=';',header=0,index_col=False)

#TODO: data cleansing functions here, e.g. replacement of NaN

# if the variable you want to predict is continuous, you can map ranges of values
# to integer/binary/string labels

# for example, map the pH data to 'hi' and 'lo' if a pH value is more than or
# less than the mean pH, respectively
M = np.mean(list(red_df['pH'])) # expect inelegant code in these mappings
Lf = lambda p: int(p < M)*'lo' + int(p >= M)*'hi' # some C-style hackery

# create the new classifiable variable
red_df['pH-hi-lo'] = map(Lf,list(red_df['pH']))

# and remove the predecessor
del red_df['pH']

Now we specify which dataset and variable you want to predict by assigning vlues to SELECTED_DF and TARGET_VAR, respectively.

We like to keep a parameter file where we specify data sources and such. This lets me create generic analytics code that is easy to reuse.

After we have specified what dataset we want to study, we split the training and test datasets. We then scale (normalize) the data, which makes most classifiers run better.

from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
from sklearn import metrics

# make selections here without digging in code
SELECTED_DF = red_df # selected dataset
TARGET_VAR = 'pH-hi-lo' # the predicted variable

# generate nameless data structures
df = SELECTED_DF
target = np.array(df[TARGET_VAR]).ravel()
del df[TARGET_VAR] # no cheating

#TODO: data cleansing function calls here

# split datasets for training and testing
X_train, X_test, y_train, y_test = train_test_split(df,target,test_size=0.2)

# set up the scaler
scaler = StandardScaler()
scaler.fit(X_train)

# apply the scaler
X_train = scaler.transform(X_train)
X_test = scaler.transform(X_test)

Now we pick a classifier. As you can see, there are many to try out, and even more in scikit-learn’s documentation and many examples and tutorials. Random Forests are data science workhorses. They are the go-to method for most data scientists. Be careful relying on them though–they tend to overfit. We try to avoid overfitting by separating the training and test datasets.

Random Forest

# pick a classifier

from sklearn.tree import DecisionTreeClassifier,DecisionTreeRegressor,ExtraTreeClassifier,ExtraTreeRegressor
from sklearn.ensemble import RandomForestClassifier,ExtraTreesClassifier

clf = RandomForestClassifier()

Now we will test it out with the default parameters.

Note that this code is boilerplate. You can use it interchangeably for most scikit-learn models.

# test it out

model = clf.fit(X_train,y_train)
pred = clf.predict(X_test)
conf_matrix = metrics.confusion_matrix(y_test,pred)

var_score = clf.score(X_test,y_test)

# the results
importances = clf.feature_importances_
indices = np.argsort(importances)[::-1]

Now output the results. For Random Forests, we get a feature ranking. Relative importances usually exponentially decay. The first few highly-ranked features are usually the most important.

# for the sake of clarity
num_features = X_train.shape[1]
features = map(lambda x: df.columns[x],indices)
feature_importances = map(lambda x: importances[x],indices)

print 'Feature ranking:\n'

for i in range(num_features):
    feature_name = features[i]
    feature_importance = feature_importances[i]
    print '%s%f' % (feature_name.ljust(30), feature_importance)
Feature ranking:

fixed acidity                 0.269778
citric acid                   0.171337
density                       0.089660
volatile acidity              0.088965
chlorides                     0.082945
alcohol                       0.080437
total sulfur dioxide          0.067832
sulphates                     0.047786
free sulfur dioxide           0.042727
residual sugar                0.037459
quality                       0.021075

Sometimes it’s easier to visualize. We’ll use a bar chart. See Figure 2

plt.clf()
plt.bar(range(num_features),feature_importances)
plt.xticks(range(num_features),features,rotation=90)
plt.ylabel('relative importance (a.u.)')
plt.title('Relative importances of most predictive features')
plt.show()

Figure 2: Result

Figure 2: Result

import dask.dataframe as dd

red_df = dd.read_csv('winequality-red.csv',sep=';',header=0)
white_df = dd.read_csv('winequality-white.csv',sep=';',header=0)

Acknowledgement

This notebook was developed by Juliette Zerick and Gregor von Laszewski

6 - Parallel Computing in Python

Gregor von Laszewski (laszewski@gmail.com)

In this module, we will review the available Python modules that can be used for parallel computing. Parallel computing can be in the form of either multi-threading or multi-processing. In multi-threading approach, the threads run in the same shared memory heap whereas in case of multi-processing, the memory heaps of processes are separate and independent, therefore the communication between the processes are a little bit more complex.

Multi-threading in Python

Threading in Python is perfect for I/O operations where the process is expected to be idle regularly, e.g. web scraping. This is a very useful feature because several applications and scripts might spend the majority of their runtime waiting for network or data I/O. In several cases, e.g. web scraping, the resources, i.e. downloading from different websites, are most of the time-independent. Therefore the processor can download in parallel and join the result at the end.

Thread vs Threading

There are two built-in modules in Python that are related to threading, namely thread and threading. The former module is deprecated for some time in Python 2, and in Python 3 it is renamed to _thread for the sake of backward incompatibilities. The _thread module provides low-level threading API for multi-threading in Python, whereas the module threading builds a high-level threading interface on top of it.

The Thread() is the main method of the threading module, the two important arguments of which are target, for specifying the callable object, and args to pass the arguments for the target callable. We illustrate these in the following example:

import threading

def hello_thread(thread_num):
    print ("Hello from Thread ", thread_num)

if __name__ == '__main__':
    for thread_num in range(5):
        t = threading.Thread(target=hello_thread,arg=(thread_num,))
        t.start()

This is the output of the previous example:

In [1]: %run threading.py
Hello from Thread  0
Hello from Thread  1
Hello from Thread  2
Hello from Thread  3
Hello from Thread  4

In case you are not familiar with the if __name__ == '__main__:' statement, what it does is making sure that the code nested under this condition will be run only if you run your module as a program and it will not run in case your module is imported into another file.

Locks

As mentioned prior, the memory space is shared between the threads. This is at the same time beneficial and problematic: it is beneficial in a sense that the communication between the threads becomes easy, however, you might experience a strange outcome if you let several threads change the same variable without caution, e.g. thread 2 changes variable x while thread 1 is working with it. This is when lock comes into play. Using lock, you can allow only one thread to work with a variable. In other words, only a single thread can hold the lock. If the other threads need to work with that variable, they have to wait until the other thread is done and the variable is “unlocked.”

We illustrate this with a simple example:

import threading

global counter
counter = 0

def incrementer1():
    global counter
    for j in range(2):
        for i in range(3):
            counter += 1
            print("Greeter 1 incremented the counter by 1")
        print ("Counter is %d"%counter)

def incrementer2():
    global counter
    for j in range(2):
        for i in range(3):
            counter += 1
            print("Greeter 2 incremented the counter by 1")
        print ("Counter is now %d"%counter)


if __name__ == '__main__':
    t1 = threading.Thread(target = incrementer1)
    t2 = threading.Thread(target = incrementer2)

    t1.start()
    t2.start()

Suppose we want to print multiples of 3 between 1 and 12, i.e. 3, 6, 9 and 12. For the sake of argument, we try to do this using 2 threads and a nested for loop. Then we create a global variable called counter and we initialize it with 0. Then whenever each of the incrementer1 or incrementer2 functions are called, the counter is incremented by 3 twice (counter is incremented by 6 in each function call). If you run the previous code, you should be really lucky if you get the following as part of your output:

Counter is now 3
Counter is now 6
Counter is now 9
Counter is now 12

The reason is the conflict that happens between threads while incrementing the counter in the nested for loop. As you probably noticed, the first level for loop is equivalent to adding 3 to the counter and the conflict that might happen is not effective on that level but the nested for loop. Accordingly, the output of the previous code is different in every run. This is an example output:

$ python3 lock_example.py
Greeter 1 incremented the counter by 1
Greeter 1 incremented the counter by 1
Greeter 1 incremented the counter by 1
Counter is 4
Greeter 2 incremented the counter by 1
Greeter 2 incremented the counter by 1
Greeter 1 incremented the counter by 1
Greeter 2 incremented the counter by 1
Greeter 1 incremented the counter by 1
Counter is 8
Greeter 1 incremented the counter by 1
Greeter 2 incremented the counter by 1
Counter is 10
Greeter 2 incremented the counter by 1
Greeter 2 incremented the counter by 1
Counter is 12

We can fix this issue using a lock: whenever one of the function is going to increment the value by 3, it will acquire() the lock and when it is done the function will release() the lock. This mechanism is illustrated in the following code:

import threading

increment_by_3_lock = threading.Lock()

global counter
counter = 0

def incrementer1():
    global counter
    for j in range(2):
        increment_by_3_lock.acquire(True)
        for i in range(3):
            counter += 1
            print("Greeter 1 incremented the counter by 1")
        print ("Counter is %d"%counter)
        increment_by_3_lock.release()

def incrementer2():
    global counter
    for j in range(2):
        increment_by_3_lock.acquire(True)
        for i in range(3):
            counter += 1
            print("Greeter 2 incremented the counter by 1")
        print ("Counter is %d"%counter)
        increment_by_3_lock.release()

if __name__ == '__main__':
    t1 = threading.Thread(target = incrementer1)
    t2 = threading.Thread(target = incrementer2)

    t1.start()
    t2.start()

No matter how many times you run this code, the output would always be in the correct order:

$ python3 lock_example.py
Greeter 1 incremented the counter by 1
Greeter 1 incremented the counter by 1
Greeter 1 incremented the counter by 1
Counter is 3
Greeter 1 incremented the counter by 1
Greeter 1 incremented the counter by 1
Greeter 1 incremented the counter by 1
Counter is 6
Greeter 2 incremented the counter by 1
Greeter 2 incremented the counter by 1
Greeter 2 incremented the counter by 1
Counter is 9
Greeter 2 incremented the counter by 1
Greeter 2 incremented the counter by 1
Greeter 2 incremented the counter by 1
Counter is 12

Using the Threading module increases both the overhead associated with thread management as well as the complexity of the program and that is why in many situations, employing multiprocessing module might be a better approach.

Multi-processing in Python

We already mentioned that multi-threading might not be sufficient in many applications and we might need to use multiprocessing sometimes, or better to say most of the time. That is why we are dedicating this subsection to this particular module. This module provides you with an API for spawning processes the way you spawn threads using threading module. Moreover, some functionalities are not even available in threading module, e.g. the Pool class which allows you to run a batch of jobs using a pool of worker processes.

Process

Similar to threading module which was employing thread (aka _thread) under the hood, multiprocessing employs the Process class. Consider the following example:

from multiprocessing import Process
import os

def greeter (name):
    proc_idx = os.getpid()
    print ("Process {0}: Hello {1}!".format(proc_idx,name))

if __name__ == '__main__':
    name_list = ['Harry', 'George', 'Dirk', 'David']
    process_list = []
    for name_idx, name in enumerate(name_list):
        current_process = Process(target=greeter, args=(name,))
        process_list.append(current_process)
        current_process.start()
    for process in process_list:
        process.join()

In this example, after importing the Process module we created a greeter() function that takes a name and greets that person. It also prints the pid (process identifier) of the process that is running it. Note that we used the os module to get the pid. In the bottom of the code after checking the __name__='__main__' condition, we create a series of Processes and start them. Finally in the last for loop and using the join method, we tell Python to wait for the processes to terminate. This is one of the possible outputs of the code:

$ python3 process_example.py
Process 23451: Hello Harry!
Process 23452: Hello George!
Process 23453: Hello Dirk!
Process 23454: Hello David!

Pool

Consider the Pool class as a pool of worker processes. There are several ways for assigning jobs to the Pool class and we will introduce the most important ones in this section. These methods are categorized as blocking or non-blocking. The former means that after calling the API, it blocks the thread/process until it has the result or answer ready and the control returns only when the call completes. In thenon-blockin` on the other hand, the control returns immediately.

Synchronous Pool.map()

We illustrate the Pool.map method by re-implementing our previous greeter example using Pool.map:

from multiprocessing import Pool
import os

def greeter(name):
    pid = os.getpid()
    print("Process {0}: Hello {1}!".format(pid,name))

if __name__ == '__main__':
    names = ['Jenna', 'David','Marry', 'Ted','Jerry','Tom','Justin']
    pool = Pool(processes=3)
    sync_map = pool.map(greeter,names)
    print("Done!")

As you can see, we have seven names here but we do not want to dedicate each greeting to a separate process. Instead, we do the whole job of “greeting seven people” using “two processes.” We create a pool of 3 processes with Pool(processes=3) syntax and then we map an iterable called names to the greeter function using pool.map(greeter,names). As we expected, the greetings in the output will be printed from three different processes:

$ python poolmap_example.py
Process 30585: Hello Jenna!
Process 30586: Hello David!
Process 30587: Hello Marry!
Process 30585: Hello Ted!
Process 30585: Hello Jerry!
Process 30587: Hello Tom!
Process 30585: Hello Justin!
Done!

Note that Pool.map() is in blocking category and does not return the control to your script until it is done calculating the results. That is why Done! is printed after all of the greetings are over.

Asynchronous Pool.map_async()

As the name implies, you can use the map_async method, when you want assign many function calls to a pool of worker processes asynchronously. Note that unlike map, the order of the results is not guaranteed (as oppose to map) and the control is returned immediately. We now implement the previous example using map_async:

from multiprocessing import Pool
import os

def greeter(name):
    pid = os.getpid()
    print("Process {0}: Hello {1}!".format(pid,name))

if __name__ == '__main__':
    names = ['Jenna', 'David','Marry', 'Ted','Jerry','Tom','Justin']
    pool = Pool(processes=3)
    async_map = pool.map_async(greeter,names)
    print("Done!")
    async_map.wait()

As you probably noticed, the only difference (clearly apart from the map_async method name) is calling the wait() method in the last line. The wait() method tells your script to wait for the result of map_async before terminating:

$ python poolmap_example.py
Done!
Process 30740: Hello Jenna!
Process 30741: Hello David!
Process 30740: Hello Ted!
Process 30742: Hello Marry!
Process 30740: Hello Jerry!
Process 30741: Hello Tom!
Process 30742: Hello Justin!

Note that the order of the results are not preserved. Moreover, Done! is printer before any of the results, meaning that if we do not use the wait() method, you probably will not see the result at all.

Locks

The way multiprocessing module implements locks is almost identical to the way the threading module does. After importing Lock from multiprocessing all you need to do is to acquire it, do some computation and then release the lock. We will clarify the use of Lock by providing an example in next section about process communication.

Process Communication

Process communication in multiprocessing is one of the most important, yet complicated, features for better use of this module. As oppose to threading, the Process objects will not have access to any shared variable by default, i.e. no shared memory space between the processes by default. This effect is illustrated in the following example:

from multiprocessing import Process, Lock, Value
import time

global counter
counter = 0

def incrementer1():
    global counter
    for j in range(2):
        for i in range(3):
            counter += 1
        print ("Greeter1: Counter is %d"%counter)

def incrementer2():
    global counter
    for j in range(2):
        for i in range(3):
            counter += 1
        print ("Greeter2: Counter is %d"%counter)


if __name__ == '__main__':

    t1 = Process(target = incrementer1 )
    t2 = Process(target = incrementer2 )
    t1.start()
    t2.start()

Probably you already noticed that this is almost identical to our example in threading section. Now, take a look at the strange output:

$ python communication_example.py
Greeter1: Counter is 3
Greeter1: Counter is 6
Greeter2: Counter is 3
Greeter2: Counter is 6

As you can see, it is as if the processes does not see each other. Instead of having two processes one counting to 6 and the other counting from 6 to 12, we have two processes counting to 6.

Nevertheless, there are several ways that Processes from multiprocessing can communicate with each other, including Pipe, Queue, Value, Array and Manager. Pipe and Queue are appropriate for inter-process message passing. To be more specific, Pipe is useful for process-to-process scenarios while Queue is more appropriate for processes-toprocesses ones. Value and Array are both used to provide synchronized access to a shared data (very much like shared memory) and Managers can be used on different data types. In the following sub-sections, we cover both Value and Array since they are both lightweight, yet useful, approach.

Value

The following example re-implements the broken example in the previous section. We fix the strange output, by using both Lock and Value:

from multiprocessing import Process, Lock, Value
import time

increment_by_3_lock = Lock()


def incrementer1(counter):
    for j in range(3):
        increment_by_3_lock.acquire(True)
        for i in range(3):
            counter.value += 1
            time.sleep(0.1)
        print ("Greeter1: Counter is %d"%counter.value)
        increment_by_3_lock.release()

def incrementer2(counter):
    for j in range(3):
        increment_by_3_lock.acquire(True)
        for i in range(3):
            counter.value += 1
            time.sleep(0.05)
        print ("Greeter2: Counter is %d"%counter.value)
        increment_by_3_lock.release()


if __name__ == '__main__':

    counter = Value('i',0)
    t1 = Process(target = incrementer1, args=(counter,))
    t2 = Process(target = incrementer2 , args=(counter,))
    t2.start()
    t1.start()

The usage of Lock object in this example is identical to the example in threading section. The usage of counter is on the other hand the novel part. First, note that counter is not a global variable anymore and instead it is a Value which returns a ctypes object allocated from a shared memory between the processes. The first argument 'i' indicates a signed integer, and the second argument defines the initialization value. In this case we are assigning a signed integer in the shared memory initialized to size 0 to the counter variable. We then modified our two functions and pass this shared variable as an argument. Finally, we change the way we increment the counter since the counter is not a Python integer anymore but a ctypes signed integer where we can access its value using the value attribute. The output of the code is now as we expected:

$ python mp_lock_example.py
Greeter2: Counter is 3
Greeter2: Counter is 6
Greeter1: Counter is 9
Greeter1: Counter is 12

The last example related to parallel processing, illustrates the use of both Value and Array, as well as a technique to pass multiple arguments to a function. Note that the Process object does not accept multiple arguments for a function and therefore we need this or similar techniques for passing multiple arguments. Also, this technique can also be used when you want to pass multiple arguments to map or map_async:

from multiprocessing import Process, Lock, Value, Array
import time
from ctypes import c_char_p


increment_by_3_lock = Lock()


def incrementer1(counter_and_names):
    counter=  counter_and_names[0]
    names = counter_and_names[1]
    for j in range(2):
        increment_by_3_lock.acquire(True)
        for i in range(3):
            counter.value += 1
            time.sleep(0.1)
        name_idx = counter.value//3 -1
        print ("Greeter1: Greeting {0}! Counter is {1}".format(names.value[name_idx],counter.value))
        increment_by_3_lock.release()

def incrementer2(counter_and_names):
    counter=  counter_and_names[0]
    names = counter_and_names[1]
    for j in range(2):
        increment_by_3_lock.acquire(True)
        for i in range(3):
            counter.value += 1
            time.sleep(0.05)
        name_idx = counter.value//3 -1
        print ("Greeter2: Greeting {0}! Counter is {1}".format(names.value[name_idx],counter.value))
        increment_by_3_lock.release()


if __name__ == '__main__':
    counter = Value('i',0)
    names = Array (c_char_p,4)
    names.value = ['James','Tom','Sam', 'Larry']
    t1 = Process(target = incrementer1, args=((counter,names),))
    t2 = Process(target = incrementer2 , args=((counter,names),))
    t2.start()
    t1.start()

In this example, we created a multiprocessing.Array() object and assigned it to a variable called names. As we mentioned before, the first argument is the ctype data type and since we want to create an array of strings with a length of 4 (second argument), we imported the c_char_p and passed it as the first argument.

Instead of passing the arguments separately, we merged both the Value and Array objects in a tuple and passed the tuple to the functions. We then modified the functions to unpack the objects in the first two lines in both functions. Finally, we changed the print statement in a way that each process greets a particular name. The output of the example is:

$ python3 mp_lock_example.py
Greeter2: Greeting James! Counter is 3
Greeter2: Greeting Tom! Counter is 6
Greeter1: Greeting Sam! Counter is 9
Greeter1: Greeting Larry! Counter is 12

7 - Dask

Gregor von Laszewski (laszewski@gmail.com)

Dask is a python-based parallel computing library for analytics. Parallel computing is a type of computation in which many calculations or the execution of processes are carried out simultaneously. Large problems can often be divided into smaller ones, which can then be solved concurrently.

Dask is composed of two components:

  1. Dynamic task scheduling optimized for computation. This is similar to Airflow, Luigi, Celery, or Make, but optimized for interactive computational workloads.
  2. Big Data collections like parallel arrays, dataframes, and lists that extend common interfaces like NumPy, Pandas, or Python iterators to larger-than-memory or distributed environments. These parallel collections run on top of the dynamic task schedulers.

Dask emphasizes the following virtues:

  • Familiar: Provides parallelized NumPy array and Pandas DataFrame objects.
  • Flexible: Provides a task scheduling interface for more custom workloads and integration with other projects.
  • Native: Enables distributed computing in Pure Python with access to the PyData stack.
  • Fast: Operates with low overhead, low latency, and minimal serialization necessary for fast numerical algorithms
  • Scales up: Runs resiliently on clusters with 1000s of cores
  • Scales down: Trivial to set up and run on a laptop in a single process
  • Responsive: Designed with interactive computing in mind it provides rapid feedback and diagnostics to aid humans

The section is structured in a number of subsections addressing the following topics:

Foundations:

an explanation of what Dask is, how it works, and how to use lower level primitives to set up computations. Casual users may wish to skip this section, although we consider it useful knowledge for all users.

Distributed Features:

information on running Dask on the distributed scheduler, which enables scale-up to distributed settings and enhanced monitoring of task operations. The distributed scheduler is now generally the recommended engine for executing task work, even on single workstations or laptops.

Collections:

convenient abstractions giving a familiar feel to big data.

Bags:

Python iterators with a functional paradigm, such as found in func/iter-tools and toolz - generalize lists/generators to big data; this will seem very familiar to users of PySpark’s RDD

Array:

massive multi-dimensional numerical data, with Numpy functionality

Dataframe:

massive tabular data, with Pandas functionality

How Dask Works

Dask is a computation tool for larger-than-memory datasets, parallel execution or delayed/background execution.

We can summarize the basics of Dask as follows:

  • process data that does not fit into memory by breaking it into blocks and specifying task chains
  • parallelize execution of tasks across cores and even nodes of a cluster
  • move computation to the data rather than the other way around, to minimize communication overheads

We use for-loops to build basic tasks, Python iterators, and the Numpy (array) and Pandas (dataframe) functions for multi-dimensional or tabular data, respectively.

Dask allows us to construct a prescription for the calculation we want to carry out. A module named Dask.delayed lets us parallelize custom code. It is useful whenever our problem doesn’t quite fit a high-level parallel object like dask.array or dask.dataframe but could still benefit from parallelism. Dask.delayed works by delaying our function evaluations and putting them into a dask graph. Here is a small example:

from dask import delayed

@delayed
def inc(x):
    return x + 1

@delayed
def add(x, y):
    return x + y

Here we have used the delayed annotation to show that we want these functions to operate lazily - to save the set of inputs and execute only on demand.

Dask Bag

Dask-bag excels in processing data that can be represented as a sequence of arbitrary inputs. We’ll refer to this as “messy” data, because it can contain complex nested structures, missing fields, mixtures of data types, etc. The functional programming style fits very nicely with standard Python iteration, such as can be found in the itertools module.

Messy data is often encountered at the beginning of data processing pipelines when large volumes of raw data are first consumed. The initial set of data might be JSON, CSV, XML, or any other format that does not enforce strict structure and datatypes. For this reason, the initial data massaging and processing is often done with Python lists, dicts, and sets.

These core data structures are optimized for general-purpose storage and processing. Adding streaming computation with iterators/generator expressions or libraries like itertools or toolz let us process large volumes in a small space. If we combine this with parallel processing then we can churn through a fair amount of data.

Dask.bag is a high level Dask collection to automate common workloads of this form. In a nutshell

dask.bag = map, filter, toolz + parallel execution

You can create a Bag from a Python sequence, from files, from data on S3, etc.

# each element is an integer
import dask.bag as db
b = db.from_sequence([1, 2, 3, 4, 5, 6, 7, 8, 9, 10])

# each element is a text file of JSON lines
import os
b = db.read_text(os.path.join('data', 'accounts.*.json.gz'))

# Requires `s3fs` library
# each element is a remote CSV text file
b = db.read_text('s3://dask-data/nyc-taxi/2015/yellow_tripdata_2015-01.csv')

Bag objects hold the standard functional API found in projects like the Python standard library, toolz, or pyspark, including map, filter, groupby, etc.

As with Array and DataFrame objects, operations on Bag objects create new bags. Call the .compute() method to trigger execution.

def is_even(n):
    return n % 2 == 0

b = db.from_sequence([1, 2, 3, 4, 5, 6, 7, 8, 9, 10])
c = b.filter(is_even).map(lambda x: x ** 2)
c

# blocking form: wait for completion (which is very fast in this case)
c.compute()

For more details on Dask Bag check https://dask.pydata.org/en/latest/bag.html

Concurrency Features

Dask supports a real-time task framework that extends Python’s concurrent.futures interface. This interface is good for arbitrary task scheduling, like dask.delayed, but is immediate rather than lazy, which provides some more flexibility in situations where the computations may evolve. These features depend on the second-generation task scheduler found in dask.distributed (which, despite its name, runs very well on a single machine).

Dask allows us to simply construct graphs of tasks with dependencies. We can find that graphs can also be created automatically for us using functional, Numpy, or Pandas syntax on data collections. None of this would be very useful if there weren’t also a way to execute these graphs, in a parallel and memory-aware way. Dask comes with four available schedulers:

  • dask.threaded.get: a scheduler backed by a thread pool
  • dask.multiprocessing.get: a scheduler backed by a process pool
  • dask.async.get_sync: a synchronous scheduler, good for debugging
  • distributed.Client.get: a distributed scheduler for executing graphs on multiple machines.

Here is a simple program for dask.distributed library:

from dask.distributed import Client
client = Client('scheduler:port')

futures = []
for fn in filenames:
    future = client.submit(load, fn)
    futures.append(future)

summary = client.submit(summarize, futures)
summary.result()

For more details on Concurrent Features by Dask check https://dask.pydata.org/en/latest/futures.html

Dask Array

Dask arrays implement a subset of the NumPy interface on large arrays using blocked algorithms and task scheduling. These behave like numpy arrays, but break a massive job into tasks that are then executed by a scheduler. The default scheduler uses threading but you can also use multiprocessing or distributed or even serial processing (mainly for debugging). You can tell the dask array how to break the data into chunks for processing.

import dask.array as da
f = h5py.File('myfile.hdf5')
x = da.from_array(f['/big-data'], chunks=(1000, 1000))
x - x.mean(axis=1).compute()

For more details on Dask Array check https://dask.pydata.org/en/latest/array.html

Dask DataFrame

A Dask DataFrame is a large parallel dataframe composed of many smaller Pandas dataframes, split along the index. These pandas dataframes may live on disk for larger-than-memory computing on a single machine, or on many different machines in a cluster. Dask.dataframe implements a commonly used subset of the Pandas interface including elementwise operations, reductions, grouping operations, joins, timeseries algorithms, and more. It copies the Pandas interface for these operations exactly and so should be very familiar to Pandas users. Because Dask.dataframe operations merely coordinate Pandas operations they usually exhibit similar performance characteristics as are found in Pandas. To run the following code, save ‘student.csv’ file in your machine.

import pandas as pd
df = pd.read_csv('student.csv')
d = df.groupby(df.HID).Serial_No.mean()
print(d)

ID
101     1
102     2
104     3
105     4
106     5
107     6
109     7
111     8
201     9
202    10
Name: Serial_No, dtype: int64

import dask.dataframe as dd
df = dd.read_csv('student.csv')
dt = df.groupby(df.HID).Serial_No.mean().compute()
print (dt)

ID
101     1.0
102     2.0
104     3.0
105     4.0
106     5.0
107     6.0
109     7.0
111     8.0
201     9.0
202    10.0
Name: Serial_No, dtype: float64

For more details on Dask DataFrame check https://dask.pydata.org/en/latest/dataframe.html

Dask DataFrame Storage

Efficient storage can dramatically improve performance, particularly when operating repeatedly from disk.

Decompressing text and parsing CSV files is expensive. One of the most effective strategies with medium data is to use a binary storage format like HDF5.

# be sure to shut down other kernels running distributed clients
from dask.distributed import Client
client = Client()

Create data if we don’t have any

from prep import accounts_csvs
accounts_csvs(3, 1000000, 500)

First we read our csv data as before.

CSV and other text-based file formats are the most common storage for data from many sources, because they require minimal pre-processing, can be written line-by-line and are human-readable. Since Pandas' read_csv is well-optimized, CSVs are a reasonable input, but far from optimized, since reading required extensive text parsing.

import os
filename = os.path.join('data', 'accounts.*.csv')
filename

import dask.dataframe as dd
df_csv = dd.read_csv(filename)
df_csv.head()

HDF5 and netCDF are binary array formats very commonly used in the scientific realm.

Pandas contains a specialized HDF5 format, HDFStore. The dd.DataFrame.to_hdf method works exactly like the pd.DataFrame.to_hdf method.

target = os.path.join('data', 'accounts.h5')
target

%time df_csv.to_hdf(target, '/data')

df_hdf = dd.read_hdf(target, '/data')
df_hdf.head()

For more information on Dask DataFrame Storage, click http://dask.pydata.org/en/latest/dataframe-create.html