Monday, November 26, 2012

Detecting Communities in Social Graph

In analyzing social network, one common problem is how to detecting communities, such as groups of people who knows or interacting frequently with each other.  Community is a subgraph of a graph where the connectivity are unusually dense.

In this blog, I will enumerate some common algorithms on finding communities.

First of all, community detection can be think of graph partitioning problem.  In this case, a single node will belong to no more than one community.  In other words, community does not overlap with each other.

High Betweenness Edge Removal

The intuition is that members within a community are densely connected and have many paths to reach each other.  On the other hand, nodes from different communities requires inter-community links to reach each other, and these inter-community links tends to have high betweenness score.

Therefore, by removing these high-betweenness links, the graph will be segregated into communities.

  1. For each edge, compute the edge-betweenness score
  2. Remove the edge with the highest betweenness score
  3. Until there are enough segregation
However, while this method achieve good result, it is very slow and not work effectively when there are more than couple thousand nodes with dense edges.

Hierarchical Clustering

This is a very general approach of detecting communities.  Some measure of distance (or similarities) is defined and computed between every pair of nodes first.  Then classical hierarchical cluster technique can be applied.  The distance should be chosen such that it is small between members within a community and big between members of different community.

Random Walk

Random walk can be used to compute the distance between every pair of nodes node-B and node-C.  Lets focus on undirected graph for now.  A random walker starts at node-B, throw a dice and has beta probability that it randomly pick a neighbor to visit based on the weight of links, and with 1 - beta probability that it will teleport back to the original node-v.  At an infinite number of steps, the probability distribution of landing on node-w will be high if node-B and node-C belongs to the same community.  The intuition here is that the random walker tends to be trapped within the community so all nodes that has high probability distribution tends to be within the same community as node-B (where the random walker is started).

Notice that the pick of beta is important.  If it is too big (close to 1), then the probability after converging is independent of the starting node (ie: the probability distribution only reflect the centrality of each node but not the community of the starting node).  If beta is too small (close to zero), then the walker will die down too quick before it fully explore the community's connectivity.

There is an analytical solution to this problem.

Lets M be the transition matrix before every pair of nodes. V represents the probability distribution of where the random walkers is.

 The "distance" between node-B and every other nodes is the eigenvector of M.  We can repeat the same to find out distance of all pairs of nodes, and then feed the result to a hierarchical clustering algorithm.

Label Propagation

The basic idea is that nodes observe its neighbors and set its own label to be the majority of its neighbors.
  1. Nodes are initially assigned with a unique label.
  2. In each round, each node examine the label of all its neighbors are set its own label to be the majority of its neighbors, when there is a tie, the winner is picked randomly.
  3. Until there is no more change of label assignments

Modularity Optimization

Within a community, the probability of 2 nodes having a link should be higher than if the link is just formed randomly within the whole graph.

probability of random link = deg(node-B) * deg(node-C) / (N * (N-1))
The actual link = Adjacency matrix[B, C]

Define com(B) to be community of node-B, com(C) to be community of node-C

So a utility function "Modularity" is created as follows ...
sum_over_v_w((actual - random) * delta(com(B), com(C)))

Now we examine communities that can be overlapping.  ie: A single node can belong to more than one community.

Finding Clique

Simple community detection usually starts with cliques.  Clique is a subgraph whether every node is connected to any other node.  In a K-Clique, there are K nodes and K^2 links between them.

However, communities has a looser definition, we don't require everyone to know every other people within the community, but we need them to know "enough" (maybe a certain percentage) of other people in the community.  K-core is more relaxed definition, it requires the nodes of the K-core to have connectivity to at least K other members.  There are some less popular relaxation, K-Clan requires every node to connect with every other members within K steps (path length less than K).  K-plex requires the node to connect to (N - K) members in the node where N total number of members within the K-plex.

The community is defined as the found K-core, or K-clan, or K-plex.

K-Clique Percolation

Another popular way of finding community is by rolling across adjacent K-Clique.  Two K-Clique is adjacent if they share K-1 nodes.  K is a parameter that we need to pick based on how dense we expect the community should have.

The algorithm is illustrated in following diagram.

K-Clique percolation is a popular way to identify communities which can potentially be overlapping with each other.

Thursday, October 4, 2012

Machine Learning in Gradient Descent

In Machine Learning, gradient descent is a very popular learning mechanism that is based on a greedy, hill-climbing approach.

Gradient Descent

The basic idea of Gradient Descent is to use a feedback loop to adjust the model based on the error it observes (between its predicted output and the actual output).  The adjustment (notice that there are multiple model parameters and therefore should be considered as a vector) is pointing to a direction where the error is decreasing in the steepest sense (hence the term "gradient").

Notice that we intentionally leave the following items vaguely defined so this approach can be applicable in a wide range of machine learning scenarios.
  • The Model
  • The loss function
  • The learning rate
Gradient Descent is very popular method because of the following reasons ...
  • Intuitive and easy to understand
  • Easy to run in parallel processing architecture
  • Easy to run incrementally with additional data
On the other hand, the greedy approach in Gradient Descent can be trapped in local optimum.  This can be mitigated by choosing a convex LOSS function (which has a single optimum), or multiple starting points can be picked randomly (in the case, we hope the best local optimum is close to the global optimum).

Batch vs Online Learning

While some other Machine Learning model (e.g. decision tree) requires a batch of data points before the learning can start, Gradient Descent is able to learn each data point independently and hence can support both batch learning and online learning easily.  The difference lies in how the training data is fed into the model and how the loss function computes its error.

In batch learning, all training will be fed to the model, who estimates the output for all data points.  Error will then be summed to compute the loss and then update the model.  Model in this case will be updated after predicting the whole batch of data points.

In online learning mode (also called stochastic gradient descent), data is fed to the model one at a time while the adjustment of the model is immediately made after evaluating the error of this single data point.  Notice that the final result of incremental learning can be different from batch learning, but it can be proved that the difference is bound and inversely proportional to the square root of the number of data points.

The learning rate can be adjusted as well to achieve a better stability in convergence.  In general, the learning rate is higher initially and decrease over the iteration of training (in batch learning it decreases in next round, in online learning it decreases at every data point).  This is quite intuitive as you paid less attention to the error as you have learn more and more.  Because of that online learning is sensitive to the arrival order of data.

One way to adjust the learning rate is to have a constant divide by the square root of N (where N is the number of data point seen so far).

 ɳ = ɳ_initial / (t ^ 0.5).

By using different decay factor, we can control how much attention we should pay for the late coming data.  In online learning, as data comes in time of occurrence, we can play around with this decay factor to guide how much attention the learning mechanism should be paying to latest arrival data.  Online learning automatically adapt to change of trends over time.

Most real world machine learning scenario relies on stationality of the model.  By the way, learning is about "learning from past experience".  If the environment changes too rapidly that the past experience is invalid, there is little value to learn.  Because of this reason, most machine learning project are satisfied by using batch learning (daily or weekly) and the demand of online learning is not very high.  A very common batch learning model is described in my previous blog here.

Parallel Learning

Because of no dependency in data processing, Gradient Descent is very easy to put into a parallel processing environment such as Map/Reduce.  Here we illustrate how to parallelize the execution of batch learning.

Notice that there are multiple rounds of Map/Reduce until the model converges.  On the other hand, online learning is not possible for Hadoop Map/Reduce which doesn't support real-time at this moment.

In summary, gradient descent is a very powerful approach of machine learning and works well in a wide spectrum of scenarios.

Sunday, September 23, 2012

Location Sensitive Hashing in Map Reduce

Inspired by Dr. Gautam Shroff who teaches the class: Web Intelligence and Big data in, there are many scenarios where we want to compute similarity between large amount of items (e.g. photos, products, persons, resumes ... etc).  I want to add another algorithm to my Map/Reduce algorithm catalog.

For the background of Map/Reduce implementation on Hadoop.  I have a previous post that covers the details.

Large Scale Similarity Computation

Lets say there are N items (N is billions) and we want to find all those that are similar to each other.  (similarity is defined by a distance function).  The goal is to output a similarity matrix.  (Notice that this matrix is very sparse and most of the cells are infinite)

One naive way is to compute the similarity of each possible pairs of items, hence an O(N^2) problem which is huge.  Can we reduce the order of complexity ?

Location Sensitive Hashing

First idea: Find a hashing function such that similar items (say distance is less than some predefined threshold) will be hashed to the same bucket.

Lets say if we pick the hash function such that Probability(H(a) == H(b)) is proportional to similarity between a and b.  And then we only perform detail comparison on items that falls into the same bucket.

Here is some R code that plots the relationship between similarity and the chance of performing a detail comparison.

x <- seq(0, 1, 0.01)
y <- x
plot(x, y, xlab="similarity", ylab="prob of detail compare")

Lets say we are interested in comparing all pairs of items whose similarity is above 0.3, we have a problem here because we have probability 0.7 = 1 - 0.3 of missing them (as they are not landing on the same bucket).  We want a mechanism that is highly selective; probability of performing a detail comparison should be close to one when similarity is above 0.3 and close to zero when similarity is below 0.3.

Second idea: Lets use 100 hash functions and 2 items that has 30 or more matches of such hash functions will be selected for detail comparison.

Here is some R code that plots the relationship between similarity and the chance of performing a detail comparison.

# Probability of having more than "threshold" matches out 
# of "no_of_hash" with a range of varying similarities

prob_select <- function(threshold, similarity, no_of_hash) {
  sum <- rep(0, length(similarity))
  for (k in 0:floor(no_of_hash * threshold)) {
    sum <- sum + dbinom(k, no_of_hash, similarity)
  return(1 - sum)

x <- seq(0, 1, 0.01)
y <- prob_select(0.3, x, 100)
plot(x, y, main="black: 100 hashes, Red: 1000 hashes", 
xlab="similarity", ylab="prob of detail compare")
lines(x, y)
y <- prob_select(0.3, x, 1000)
lines(x, y, col="red")

The graph looks much better this time, the chance of being selected for detail comparison jumps from zero to one sharply when the similarity crosses 0.3

To compare the items that are similar, we first compute 100 hashes (based on 100 different hash functions) for each item and output all combination 30 hashes as a key.  Then we perform pairwise comparison for all items that has same key.

But look at the combination of 30 out of 100, it is 100!/(30! * 70!) = 2.93 * 10^25, which is impractically huge.  Even the graph is a nice, we cannot use this mechanism in practice.

Third idea: Lets use 100 hash function and break them into b groups of r each (ie: b*r = 100).  Further let assume b = 20, and r = 5.  In other words, we have 20 groups and Group1 has hash1 to hash5, Group2 has hash6 to hash10 ... etc.  Now, we call itemA's group1 matches itemB's group1 if all their hash1 to hash5 are equal to each other.  Now, we'll perform a detail comparison of itemA and itemB if any of the groups are equal to each other.

Probability of being selected is  1 - (1-s^r)^b

The idea can be visualized as follows

Notice that in this model, finding r and b based on s is a bit trial and error.  Here we try 20 by 5, 33 by 3, 10 by 10.

prob_select2 <- function(similarity, row_per_grp, no_of_grp) {
  return(1 - (1 - similarity^row_per_grp)^no_of_grp)

x <- seq(0, 1, 0.01)
y <- prob_select2(x, 5, 20)

plot(x, y, 
main="black:20 by 5, red:10 by 10, blue:33 by 3", 
xlab="similarity", ylab="prob of detail compare")

lines(x, y)
y <- prob_select2(x, 10, 10)
lines(x, y, col="red")
y <- prob_select2(x, 3, 33)
lines(x, y, col="blue")

From the graph, we see the blue curve fits better to select the similarity at 0.3.  So lets use 33 by 3.

Notice that the ideal graph should be a step function where probability jumps from 0 to 1 when similarity cross over the similarity threshold that we are interested to capture (ie: we want to put all pairs whose similarity bigger than this threshold to be in a same bucket and all pairs whose similarity less that this threshold to be in a different bucket).  Unfortunately, our curve is a S-curve, not a step function.  This means there will be false positive and false negative.  False position lies on the left side of the similarity threshold where we have a small chance to put them into the same bucket, which will cost up some extra work to compare them later and throw them away.  On the other hand, false negative lies on the right side where we have a small chance of putting items that are very similar into different buckets and not considering them in the detail comparison.  Depends on whether we need to catch all the similar items above the threshold, we may need shift the S curve left or right by tuning the r and b parameters. 

To perform the detail comparison, we can use a parallel Map/Reduce implementation

Map Reduce Implementation

Here we have two round of Map/Reduce.  In the first round, map function will compute all the groupKeys for each item and emit the groupKey with the item.  All the items that has the groupKey matches will land on the same reducer, which creates all the possible pairs of items (these are candidates for pairwise comparison).

However, we don't want to perform the detail comparison in the first round as there may be many duplicates for item pairs that matches more than one group.  Therefore we want to perform another round of Map/reduce to remove the duplicates.

The first round proceeds as follows ...

After that, the second round proceeds as follows ...

By combining Location Sensitive Hashing and Map/Reduce,  we can perform large scale similarity calculation in an effective manner.

Monday, August 13, 2012

BIG Data Analytics Pipeline

"Big Data Analytics" has recently been one of the hottest buzzwords.  It is a combination of "Big Data" and "Deep Analysis".  The former is a phenomenon of Web2.0 where a lot of transaction and user activity data has been collected which can be mined for extracting useful information.  The later is about using advanced mathematical/statistical technique to build models from the data.  In reality I've found these two areas are quite different and disjoint and people working in each area have a pretty different background.

Big Data Camp

People working in this camp typically come from Hadoop, PIG/Hive background.  They usually have implemented some domain-specific logic to process large amount of raw data.  Often the logic is relatively straight-forward based on domain-specific business rules.

From my personal experience, most of the people working in big data come from a computer science and distributed parallel processing system background but not from the statistical or mathematical discipline.

Deep Analysis Camp

On the other hand, people working in this camp usually come from statistical and mathematical background which the first thing being taught is how to use sampling to understand a large population's characteristic.  Notice the magic of "sampling" is that the accuracy of estimating the large population depends only in the size of sample but not the actual size of the population.  In their world, there is never a need to process all the data in the population in the first place.  Therefore, Big Data Analytics is unnecessary under this philosophy.

Typical Data Processing Pipeline

Learning from my previous projects, I observe most data processing pipeline fall into the following pattern.

In this model, data is created from the OLTP (On Line Transaction Processing) system, flowing into the BIG Data Analytics system which produced various output; including data mart/cubes for OLAP (On Line Analytic Processing), reports for the consumption of business executives, and predictive models that feedback decision support for OLTP.

Big Data + Deep Analysis

The BIG data analytics box is usually done in a batch fashion (e.g. once a day), usually we see big data processing and deep data analysis happens at different stages of this batch process.

The big data processing part (colored in orange) is usually done using Hadoop/PIG/Hive technology with classical ETL logic implementation.  By leveraging the Map/Reduce model that Hadoop provides, we can linearly scale up the processing by adding more machines into the Hadoop cluster.  Drawing cloud computing resources (e.g. Amazon EMR) is a very common approach to perform this kind of tasks.

The deep analysis part (colored in green) is usually done in R, SPSS, SAS using a much smaller amount of carefully sampled data that fits into a single machine's capacity (usually less than couple hundred thousands data records).  The deep analysis part usually involve data visualization, data preparation, model learning (e.g. Linear regression and regularization, K-nearest-neighbour/Support vector machine/Bayesian network/Neural network, Decision Tree and Ensemble methods), model evaluation.  For those who are interested, please read up my earlier posts on these topics.

Implementation Architecture

There are many possible ways to implement the data pipeline described above.  Here is one common implementation that works well in many projects.

In this architecture, "Flume" is used to move data from OLTP system to Hadoop File System HDFS.  A workflow scheduler (typically a cron-tab entry calling a script) will periodically run to process the data using Map/Reduce.  The data has two portions:  a) Raw transaction data from HDFS  b) Previous model hosted in some NOSQL server.  Finally the "reducer" will update the previous model which will be available to the OLTP system.

For most the big data analytic projects that I got involved, the above architecture works pretty well.  I believe projects requiring real-time feedback loop may see some limitation in this architecture.  Real-time big data analytics is an interesting topic which I am planning to discuss in future posts.

Wednesday, August 8, 2012

Measuring similarity and distance function

Measuring similarity or distance between two data points is very fundamental to many Machine Learning algorithms such as K-Nearest-Neighbor, Clustering ... etc.  Depends on the nature of the data point, various measurement can be used.


Distance between numeric data points

When the dimension of data point is numeric, the general form is called Minkowski distance

( (x1 - x2)p + (y1 - y2)p )1/p

When p = 2, this is equivalent to Euclidean distance.  When p = 1, this is equivalent to Manhattan distance.

This measure is independent of the underlying data distribution.  But what if the value along the x-dimension is much bigger than that from the y-dimension.  So we need to bring all of them into the same scale first.  A common way is to perform a z-transform where each data point first subtract the mean value, and then divide the standard deviation.

(x1, y1) becomes ( (x1μx)/σx , (y1μy)/σy )

This measure, although taking into consideration of the distribution of each dimension, it assumes the dimension are independent of each other.  But what if x-dimension and y-dimension has some correlation.  To consider correlation between different dimensions, we use ...

Mahalanobis distance = (v 1 -  v 2)T.CovMatrix.(v 1 -  v 2)  where v 1  = (x1, y1)

If we care about the direction of the data rather than the magnitude, then cosine distance is a common approach.  It computes the dot product of the two data points divided by the product of their magnitude.  Cosine distance, together with term/document matrix, is commonly used to measure the similarity between documents.


Distance between categorical data points

Since there is no ordering between categorical value, we can only measure whether the categorical value is the same or not.  Basically we are measuring the degree of overlapping of attribute values.  Hamming distance can be used to measure how many attributes need to changed in order to match each other.  We can calculate the ratio to determine how similar (or difference) between two data points using simple matching coefficient:
noOfMatchAttributes / noOfAttributes

However, when the data point contains asymmetric binary data attributes, equality of certain value doesn't mean anything.  For example, lets say the data point represents a user with attributes represent each movie.  The data point contains a high dimensional binary value representing whether the user has seen the movie.  (1 represent yes and 0 represent no).  Given that most users only see a very small portion of all movies, if both user hasn't seen a particular movie (both value is zero), it doesn't indicate any similarity between the user.  On the other hand, if both user saw the same movie (both value is one), it implies a lot of similarity between the user.  In this case, equality of one should carry a much higher weight than equality of zero.  This lead to Jaccard similarity :
noOfOnesInBoth / (noOfOnesInA + noOfOnesInB - noOfOnesInAandB)

Besides matching or not, if category is structured as a Tree hierarchy, then the distance of two category can be quantified by path length of their common parent.  For example, "/product/spot/ballgame/basketball" is closer to "/product/spot/ballgame/soccer/shoes" than "/product/luxury/handbags" because the common parent has a longer path.


Similarity between instances containing mixed types of attributes

When the data point contain a mixed of attributes, we can calculate the similarity of each attribute (or group the attributes of the same type), and then combine them together using some weighted average.

But we have to be careful when treating asymmetric attributes where its presence doesn't mean anything.

combined_similarity(x, y) = Σover_k[wk * δk * similarity(xk, yk)] / Σover_kk)

where Σover_k(wk) = 1

Distance between sequence (String, TimeSeries)

In case each attribute represent an element of a sequence, we need a different way to measure the distance.  For example, lets say each data point is a string (which contains a sequence of characters), then edit distance is a common measurement.  Basically, edit distance is how many "modifications" (which can be insert, modify, delete) is needed to change stringA into stringB.  This is usually calculated by using dynamic programming technique.

Time Series is another example of sequence data.  Similar to the concept of edit distance, Dynamic Time Warp is about distorting the time dimension by adding more data points in both time series such that their square error between corresponding pairs is minimized.  Where to add these data points are solved using a similar dynamic programming technique.  Here is a very good paper that describe the details.


 Distance between nodes in a network

In a homogenous undirected graph (nodes are of the same type), distance between nodes can be measured by the shortest path.

In a bi-partite graph, there are two types of nodes in which each node only connects to the other type.  (e.g. People joining Communities).  Similarity between nodes (of same type) can be measured by analyzing how similar their connected communities are.

SimRank is an iterative algorithm that compute the similarity of each type of nodes by summing the similarity between all pairs of other type of nodes that it has connected, while other type of nodes' similarity is computed in the same way.

We can also use a probabilistic approach such as RandomWalk to determine the similarity.  Each people node will pass a token (label with the people's name) along a randomly picked community node which it is connected to (weighted by the strength of connectivity).  Each community node will propagated back the received token back to a randomly picked people.  Now the people who received the propagated token may drop the token (with a chance beta) or propagated to a randomly chosen community again.  This process continues under all the tokens are die out (since they have a chance of being dropped).  After that, we obtain the trace Matrix and compute the similarity based on the dot product of the tokens it receives.


Distance between population distribution

Instead of measuring distance between individual data points, we can also compare a collection of data points (ie: population) and measure the distance between them.  In fact, one important part of statistics is to measure the distance between two groups of samples and see if the "difference" is significant enough to conclude they are from different populations.

Lets say the population contains members that belongs to different categories and we want to measure if population A and population B have same or different proportions of members across these categories, then we can use Chi-Square or KL-Divergence to measure their distance.

In case every member of the population has two different numeric attributes (e.g. weight and height), and we want to infer one attribute from the other if they are correlated, correlation coefficient is a measure that quantify their degree of correlation; whether these two attributes are moving along the same direction (heavier people are taller), different direction (heavier people are shorter), or independent.  The correlation coefficient ranges from -1 (negatively correlated) to 0 (no correlation) to 1 (positively correlated).

If the two attributes are categorical (rather than numeric), then mutual information is a common way to measure their dependencies and give a good sense of whether knowing the value of one attribute can help inferring the other attribute.

Now if there are two judges who rank a collection of items and we are interested in the degree of agreement of their ranking order.  We can use Spearman's rank coefficient to measure their degree of consensus in the ranking order.

Thursday, July 5, 2012

Couchbase Architecture

After receiving a lot of good feedback and comment on my last blog on MongoDb, I was encouraged to do another deep dive on another popular document oriented db; Couchbase.

I have been a long-time fan CouchDb and has wrote a blog on it many years ago.  After it merges with Membase, I am very excited to take a deep look into it again.

Couchbase is the merge of two popular NOSQL technologies: 
  • Membase, which provides persistence, replication, sharding to the high performance memcached technology
  • CouchDB, which pioneers the document oriented model based on JSON
Like other NOSQL technologies, both Membase and CouchDB are built from the ground up on a highly distributed architecture, with data shard across machines in a cluster.  Built around the Memcached protocol, Membase provides an easy migration to existing Memcached users who want to add persistence, sharding and fault resilience on their familiar Memcached model.  On the other hand, CouchDB provides first class support for storing JSON documents as well as a simple RESTful API to access them.  Underneath, CouchDB also has a highly tuned storage engine that is optimized for both update transaction as well as query processing.  Taking the best of both technologies, Membase is well-positioned in the NOSQL marketplace.

Programming model

Couchbase provides client libraries for different programming languages such as Java / .NET / PHP / Ruby / C / Python / Node.js

For read, Couchbase provides a key-based lookup mechanism where the client is expected to provide the key, and only the server hosting the data (with that key) will be contacted.

Couchbase also provides a query mechanism to retrieve data where the client provides a query (for example, range based on some  secondary key) as well as the view (basically the index).  The query will be broadcasted to all servers in the cluster and the result will be merged and sent back to the client.

For write, Couchbase provides a key-based update mechanism where the client sends in an updated document with the key (as doc id).  When handling write request, the server will return to client’s write request as soon as the data is stored in RAM on the active server, which offers the lowest latency for write requests.

Following is the core API that Couchbase offers.  (in an abstract sense)

# Get a document by key

doc = get(key)

# Modify a document, notice the whole document 
#   need to be passed in

set(key, doc)

# Modify a document when no one has modified it 
#  since my last read

casVersion = doc.getCas()
cas(key, casVersion, changedDoc)

# Create a new document, with an expiration time 
#   after which the document will be deleted

addIfNotExist(key, doc, timeToLive)

# Delete a document


# When the value is an integer, increment the integer


# When the value is an integer, decrement the integer


# When the value is an opaque byte array, append more 
#  data into existing value 

append(key, newData)

# Query the data 

results = query(viewName, queryParameters)

In Couchbase, document is the unit of manipulation.  Currently Couchbase doesn't support server-side execution of custom logic.  Couchbase server is basically a passive store and unlike other document oriented DB, Couchbase doesn't support field-level modification.  In case of modifying documents, client need to retrieve documents by its key, do the modification locally and then send back the whole (modified) document back to the server.  This design tradeoff network bandwidth (since more data will be transferred across the network) for CPU (now CPU load shift to client).

Couchbase currently doesn't support bulk modification based on a condition matching.  Modification happens only in a per document basis.  (client will save the modified document one at a time).

Transaction Model

Similar to many NOSQL databases, Couchbase’s transaction model is primitive as compared to RDBMS.  Atomicity is guaranteed at a single document and transactions that span update of multiple documents are unsupported.  To provide necessary isolation for concurrent access, Couchbase provides a CAS (compare and swap) mechanism which works as follows …
  • When the client retrieves a document, a CAS ID (equivalent to a revision number) is attached to it.
  • While the client is manipulating the retrieved document locally, another client may modify this document.  When this happens, the CAS ID of the document at the server will be incremented.
  • Now, when the original client submits its modification to the server, it can attach the original  CAS ID in its request.  The server will verify this ID with the actual ID in the server.  If they differ, the document has been updated in between and the server will not apply the update.
  • The original client will re-read the document (which now has a newer ID) and re-submit its modification. 
Couchbase also provides a locking mechanism for clients to coordinate their access to documents.  Clients can request a LOCK on the document it intends to modify, update the documents and then releases the LOCK.  To prevent a deadlock situation, each LOCK grant has a timeout so it will automatically be released after a period of time.

Deployment Architecture

 In a typical setting, a Couchbase DB resides in a server clusters involving multiple machines.  Client library will connect to the appropriate servers to access the data.  Each machine contains a number of daemon processes which provides data access as well as management functions.

The data server, written in C/C++, is responsible to handle get/set/delete request from client.  The Management server, written in Erlang, is responsible to handle the query traffic from client, as well as manage the configuration and communicate with other member nodes in the cluster.

Virtual Buckets

The basic unit of data storage in Couchbase DB is a JSON document (or primitive data type such as int and byte array) which is associated with a key.  The overall key space is partitioned into 1024 logical storage unit called "virtual buckets" (or vBucket).  vBucket are distributed across machines within the cluster via a map that is shared among servers in the cluster as well as the client library.

High availability is achieved through data replication at the vBucket level.  Currently Couchbase supports one active vBucket zero or more standby replicas hosted in other machines.  Curremtly the standby server are idle and not serving any client request.  In future version of Couchbase, the standby replica will be able to serve read request.

Load balancing in Couchbase is achieved as follows:
  • Keys are uniformly distributed based on the hash function
  • When machines are added and removed in the cluster.  The administrator can request a redistribution of vBucket so that data are evenly spread across physical machines.

Management Server

Management server performs the management function and co-ordinate the other nodes within the cluster.  It includes the following monitoring and administration functions

Heartbeat: A watchdog process periodically communicates with all member nodes within the same cluster to provide Couchbase Server health updates.

Process monitor: This subsystem monitors execution of the local data manager, restarting failed processes as required and provide status information to the heartbeat module.

Configuration manager: Each Couchbase Server node shares a cluster-wide configuration which contains the member nodes within the cluster, a vBucket map.  The configuration manager pull this config from other member nodes at bootup time.

Within a cluster, one node’s Management Server will be elected as the leader which performs the following cluster-wide management function
  • Controls the distribution of vBuckets among other nodes and initiate vBucket migration
  • Orchestrates the failover and update the configuration manager of member nodes
If the leader node crashes, a new leader will be elected from surviving members in the cluster.

When a machine in the cluster has crashed, the leader will detect that and notify member machines in the cluster that all vBuckets hosted in the crashed machine is dead.  After getting this signal, machines hosting the corresponding vBucket replica will set the vBucket status as “active”.  The vBucket/server map is updated and eventually propagated to the client lib.  Notice that at this moment, the replication level of the vBucket will be reduced.  Couchbase doesn’t automatically re-create new replicas which will cause data copying traffic.  Administrator can issue a command to explicitly initiate a data rebalancing.  The crashed machine, after reboot can rejoin the cluster.  At this moment, all the data it stores previously will be completely discard and the machine will be treated as a brand new empty machine.

As more machines are put into the cluster (for scaling out), vBucket should be redistributed to achieve a load balance.  This is currently triggered by an explicit command from the administrator.  Once receive the “rebalance” command, the leader will compute the new provisional map which has the balanced distribution of vBuckets and send this provisional map to all members of the cluster.

To compute the vBucket map and migration plan, the leader attempts the following objectives:
  • Evenly distribute the number of active vBuckets and replica vBuckets among member nodes.
  • Place the active copy and each replicas in physically separated nodes.
  • Spread the replica vBucket as wide as possible among other member nodes.
  • Minimize the amount of data migration
  • Orchestrate the steps of replica redistribution so no node or network will be overwhelmed by the replica migration.
Once the vBucket maps is determined, the leader will pass the redistribution map to each member in the cluster and coordinate the steps of vBucket migration.  The actual data transfer happens directly between the origination node to the destination node.

Notice that since we have generally more vBuckets than machines.  The workload of migration will be evenly distributed automatically.  For example, when new machines are added into the clusters, all existing machines will migrate some portion of its vBucket to the new machines.  There is no single bottleneck in the cluster.

Throughput the migration and redistribution of vBucket among servers, the life cycle of a vBucket in a server will be in one of the following states
  • “Active”:  means the server is hosting the vBucket is ready to handle both read and write request
  • “Replica”: means the server is hosting the a copy of the vBucket that may be slightly out of date but can take read request that can tolerate some degree of outdate.
  • “Pending”: means the server is hosting a copy that is in a critical transitional state.  The server cannot take either read or write request at this moment.
  • “Dead”: means the server is no longer responsible for the vBucket and will not take either read or write request anymore.

Data Server

Data server implements the memcached APIs such as get, set, delete, append, prepend, etc. It contains the following key datastructure:
  • One in-memory hashtable (key by doc id) for the corresponding vBucket hosted.  The hashtable acts as both a metadata for all documents as well as a cache for the document content.  Maintain the entry gives a quick way to detect whether the document exists on disk.
  • To support async write, there is a checkpoint linkedlist per vBucket holding the doc id of modified documents that hasn't been flushed to disk or replicated to the replica.

To handle a "GET" request
  • Data server routes the request to the corresponding ep-engine responsible for the vBucket.
  • The ep-engine will lookup the document id from the in-memory hastable.  If the document content is found in cache (stored in the value of the hashtable), it will be returned.  Otherwise, a background disk fetch task will be created and queued into the RO dispatcher queue.
  • The RO dispatcher then reads the value from the underlying storage engine and populates the corresponding entry in the vbucket hash table.
  • Finally, the notification thread notifies the disk fetch completion to the memcached pending connection, so that the memcached worker thread can revisit the engine to process a get request.
To handle a "SET" request,  a success response will be returned to the calling client once the updated document has been put into the in-memory hashtable with a write request put into the checkpoint buffer.  Later on the Flusher thread will pickup the outstanding write request from each checkpoint buffer, lookup the corresponding document content from the hashtable and write it out to the storage engine.

Of course, data can be lost if the server crashes before the data has been replicated to another server and/or persisted.  If the client requires a high data availability across different crashes, it can issue a subsequent observe() call which blocks on the condition  that the server persist data on disk, or the server has replicated the data to another server (and get its ACK).  Overall speaking, the client has various options to tradeoff data integrity with throughput.

Hashtable Management

To synchronize accesses to a vbucket hash table, each incoming thread needs to acquire a lock before accessing a key region of the hash table. There are multiple locks per vbucket hash table, each of which is responsible for controlling exclusive accesses to a certain ket region on that hash table. The number of regions of a hash table can grow dynamically as more documents are inserted into the hash table.

To control the memory size of the hashtable, Item pager thread will monitor the memory utilization of the hashtable.  Once a high watermark is reached, it will initiate an eviction process to remove certain document content from the hashtable.  Only entries that is not referenced by entries in the checkpoint buffer can be evicted because otherwise the outstanding update (which only exists in hashtable but not persisted) will be lost.

After eviction, the entry of the document still remains in the hashtable; only the document content of the document will be removed from memory but the metadata is still there.  The eviction process stops after reaching the low watermark.  The high / low water mark is determined by the bucket memory quota. By default, the high water mark is set to 75% of bucket quota, while the low water mark is set to 60% of bucket quota. These water marks can be configurable at runtime.

In CouchDb, every document is associated with an expiration time and will be deleted once it is expired.  Expiry pager is responsible for tracking and removing expired document from both the hashtable as well as the storage engine (by scheduling a delete operation).

Checkpoint Manager
Checkpoint manager is responsible to recycle the checkpoint buffer, which holds the outstanding update request, consumed by the two downstream processes, Flusher and TAP replicator.  When all the request in the checkpoint buffer has been  processed, the checkpoint buffer will be deleted and a new one will be created.

TAP Replicator
TAP replicator is responsible to handle vBucket migration as well as vBucket replication from active server to replica server.  It does this by propagating the latest modified document to the corresponding replica server.

At the time a replica vBucket is established, the entire vBucket need to be copied from the active server to the empty destination replica server as follows
  • The in-memory hashtable at the active server will be transferred to the replica server.  Notice that during this period, some data may be updated and therefore the data set transfered to the replica can be inconsistent (some are the latest and some are outdated).
  • Nevertheless, all updates happen after the start of transfer is tracked in the checkpoint buffer.
  • Therefore, after the in-memory hashtable transferred is completed, the TAP replicator can pickup those updates from the checkpoint buffer.  This ensures the latest versioned of changed documents are sent to the replica, and hence fix the inconsistency.
  • However the hashtable cache doesn’t contain all the document content.  Data also need to be read from the vBucket file and send to the replica.  Notice that during this period, update of vBucket will happen in active server.  However, since the file is appended only, subsequent data update won’t interfere the vBucket copying process.
After the replica server has caught up, subsequent update at the active server will be available at its checkpoint buffer which will be pickup by the TAP replicator and send to the replica server.

CouchDB Storage Structure

Data server defines an interface where different storage structure can be plugged-in.  Currently it supports both a SQLite DB as well as CouchDB.  Here we describe the details of CouchDb, which provides a super high performance storage mechanism underneath the Couchbase technology.

Under the CouchDB structure, there will be one file per vBucket.  Data are written to this file in an append-only manner, which enables Couchbase to do mostly sequential writes for update, and provide the most optimized access patterns for disk I/O.  This unique storage structure attributes to Couchbase’s fast on-disk performance for write-intensive applications.

The following diagram illustrate the storage model and how it is modified by 3 batch updates (notice that since updates are asynchronous, it is perform by "Flusher" thread in batches).

The Flusher thread works as follows:

1) Pick up all pending write request from the dirty queue and de-duplicate multiple update request to the same document.

2) Sort each request (by key) into corresponding vBucket and open the corresponding file

3) Append the following into the vBucket file (in the following contiguous sequence)
  • All document contents in such write request batch.  Each document will be written as [length, crc, content] one after one sequentially.
  • The index that stores the mapping from document id to the document’s position on disk (called the BTree by-id)
  • The index that stores the mapping from update sequence number to the document’s position on disk.  (called the BTree by-seq)
The by-id index plays an important role for looking up the document by its id.  It is organized as a B-Tree where each node contains a key range.  To lookup a document by id, we just need to start from the header (which is the end of the file), transfer to the root BTree node of the by-id index, and then further traverse to the leaf BTree node that contains the pointer to the actual document position on disk.

During the write, the similar mechanism is used to trace back to the corresponding BTree node that contains the id of the modified documents.  Notice that in the append-only model, update is not happening in-place, instead we located the existing location and copy it over by appending.  In other words, the modified BTree node will be need to be copied over and modified and finally paste to the end of file, and then its parent need to be modified to point to the new location, which triggers the parents to be copied over and paste to the end of file.  Same happens to its parents’ parent and eventually all the way to the root node of the BTree.  The disk seek can be at the O(logN) complexity.

The by-seq index is used to keep track of the update sequence of lived documents and is used for asynchronous catchup purposes.  When a document is created, modified or deleted, a sequence number is added to the by-seq btree and the previous seq node will be deleted.  Therefore, for cross-site replication, view index update and compaction, we can quickly locate all the lived documents in the order of their update sequence.   When a vBucket replicator asks for the list of update since a particular time, it provides the last sequence number in previous update, the system will then scan through the by-seq BTree node to locate all the document that has sequence number larger than that, which effectively includes all the document that has been modified since the last replication.

As time goes by, certain data becomes garbage (see the grey-out region above) and become unreachable in the file.  Therefore, we need a garbage collection mechanism to clean up the garbage.  To trigger this process, the by-id and by-seq B-Tree node will keep track of the data size of lived documents (those that is not garbage) under its substree.  Therefore, by examining the root BTree node, we can determine the size of all lived documents within the vBucket.  When the ratio of actual size and vBucket file size fall below a certain threshold, a compaction process will be triggered  whose job is to open the vBucket file and copy the survived data to another file.

Technically, the compaction process opens the file and read the by-seq BTree at the end of the file.  It traces the Btree all the way to the leaf node and copy the corresponding document content to the new file.  The compaction process happens while the vBucket is being updated.  However, since the file is appended only, new changes are recorded after the BTree root that the compaction has opened, so subsequent data update won’t interfere with the compaction process.  When the compaction is completed, the system need to copy over the data that was appended since the beginning of the compaction to the new file.

View Index Structure

Unlike most indexing structure which provide a pointer from the search attribute back to the document.  The CouchDb index (called View Index) is better perceived as a denormalized table with arbitrary keys and values loosely associated to the document.

Such denormalized table is defined by a user-provided map() and reduce() function.

map = function(doc) {
   emit(k1, v1)
   emit(k2, v2)

reduce = function(keys, values, isRereduce) {
    if (isRereduce) {
        // Do the re-reduce only on values (keys will be null)
    } else {
        // Do the reduce on keys and values
    // result must be ready for input values to re-reduce

    return result

Whenever a document is created, updated, deleted, the corresponding map(doc) function will be invoked (in an asynchronous manner) to generate a set of key/value pairs.  Such key/value will be stored in a B-Tree structure.  All the key/values pairs of each B-Tree node will be passed into the reduce() function, which compute an aggregated value within that B-Tree node.  Re-reduce also happens in non-leaf B-Tree nodes which further aggregate the aggregated value of child B-Tree nodes.

The management server maintains the view index and persisted it to a separate file.

Create a view index is perform by broadcast the index creation request to all machines in the cluster.  The management process of each machine will read its active vBucket file and feed each surviving document to the Map function.  The key/value pairs emitted by the Map function will be stored in a separated BTree index file.  When writing out the BTree node, the reduce() function will be called with the list of all values in the tree node.  Its return result represent a partially reduced value is attached to the BTree node.

The view index will be updated incrementally as documents are subsequently getting into the system.  Periodically, the management process will open the vBucket file and scan all documents since the last sequence number.  For each changed document since the last sync, it invokes the corresponding map function to determine the corresponding key/value into the BTree node.  The BTree node will be split if appropriate.

Underlying, Couchbase use a back index to keep track of the document with the keys that it previously emitted.  Later when the document is deleted, it can look up the back index to determine what those key are and remove them.  In case the document is updated, the back index can also be examined; semantically a modification is equivalent to a delete followed by an insert.

The following diagram illustrates how the view index file will be incrementally updated via the append-only mechanism.

Query Processing

Query in Couchbase is made against the view index.  A query is composed of the view name, a start key and end key.  If the reduce() function isn’t defined, the query result will be the list of values sorted by the keys within the key range.  In case the reduce() function is defined, the query result will be a single aggregated value of all keys within the key range.

If the view has no reduce() function defined, the query processing proceeds as follows:
  • Client issue a query (with view, start/end key) to the management process of any server (unlike a key based lookup, there is no need to locate a specific server).
  • The management process will broadcast the request to other management process on all servers (include itself) within the cluster.
  • Each management process (after receiving the broadcast request) do a local search for value within the key range by traversing the BTree node of its view file, and start sending back the result (automatically sorted by the key) to the initial server.
  • The initial server will merge the sorted result and stream them back to the client.
 However, if the view has reduce() function defined, the query processing will involve computing a single aggregated value as follows:
  • Client issue a query (with view, start/end key) to the management process of any server (unlike a key based lookup, there is no need to locate a specific server).
  • The management process will broadcast the request to other management process on all servers (include itself) within the cluster.
  • Each management process do a local reduce for value within the key range by traversing the BTree node of its view file to compute the reduce value of the key range.  If the key range span across a BTree node, the pre-computed of the sub-range can be used.  This way, the reduce function can reuse a lot of partially reduced values and doesn’t need to recomputed every value of the key range from scratch.
  • The original server will do a final re-reduce() in all the return value from each other servers, and then passed back the final reduced value to the client.
To illustrate the re-reduce concept, lets say the query has its key range from A to F.

Instead of calling reduce([A,B,C,D,E,F]), the system recognize the BTree node that contains [B,C,D] has been pre-reduced and the result P is stored in the BTree node, so it only need to call reduce(A,P,E,F).

Update View Index as vBucket migrates
Since the view index is synchronized with the vBuckets in the same server, when the vBucket has migrated to a different server, the view index is no longer correct; those key/value that belong to a migrated vBucket should be discarded and the reduce value cannot be used anymore.

To keep track of the vBucket and key in the view index, each bTree node has a 1024-bitmask indicating all the vBuckets that is covered in the subtree (ie: it contains a key emitted from a document belonging to the vBucket).  Such bit-mask is maintained whenever the bTree node is updated.

At the server-level, a global bitmask is used to indicate all the vBuckets that this server is responsible for.

In processing the query of the map-only view, before the key/value pair is returned, an extra check will be perform for each key/value pair to make sure its associated vBucket is what this server is responsible for.

When processing the query of a view that has a reduce() function, we cannot use the pre-computed reduce value if the bTree node contains a vBucket that the server is not responsible for.  In this case, the bTree node’s bit mask is compared with the global bit mask.  In case if they are not aligned, then the reduce value need to be recomputed.

Here is an example to illustrate this process

Couchbase is one of the popular NOSQL technology built on a solid technology foundation designed for high performance.  In this post, we have examined a number of such key features:
  • Load balancing between servers inside a cluster that can grow and shrink according to workload conditions.  Data migration can be used to re-achieve workload balance.
  • Asynchronous write provides lowest possible latency to client as it returns once the data is store in memory.
  • Append-only update model pushes most update transaction into sequential disk access, hence provide extremely high throughput for write intensive applications.
  • Automatic compaction ensures the data lay out on disk are kept optimized all the time.
  • Map function can be used to pre-compute view index to enable query access.  Summary data can be pre-aggregated using the reduce function.  Overall, this cut down the workload of query processing dramatically.

For a review on NOSQL architecture in general and some theoretical foundation, I have wrote a NOSQL design pattern blog, as well as some fundamental difference between SQL and NOSQL.

For other NOSQL technologies, please read my other blog on MongoDb, Cassandra and HBase, Memcached

Special thanks to Damien Katz and Frank Weigel from Couchbase team who provide a lot of implementation details of Couchbase.

Monday, June 11, 2012

Predictive Analytics: Evaluate Model Performance

In previous posts, we have discussed various machine learning techniques including linear regression with regularization, SVM, Neural network, Nearest neighbor, Naive Bayes, Decision Tree and Ensemble models.  How do we pick which model is the best ?  or even whether the model we pick is better than a random guess ?  In this posts, we will cover how we evaluate the performance of the model and what can we do next to improve the performance.

Best guess with no model 

First of all, we need to understand the goal of our evaluation.  Are we trying to pick the best model ?  Are we trying to quantify the improvement of each model ?  Regardless of our goal, I found it is always useful to think about what the baseline should be.  Usually the baseline is what is your best guess if you don't have a model.

For classification problem, one approach is to do a random guess (with uniform probability) but a better approach is to guess the output class that has the largest proportion in the training samples.  For regression problem, the best guess will be the mean of output of training samples.

Prepare data for evaluation

In a typical setting, the set of data is divided into 3 disjoint groups; 20% data is set aside as testing data to evaluate the model we've trained.  The remaining 80% of data is dividing into k partitions.  k-1 partitions will be used as training data to train a model with a particular parameter value setting and 1 partition will be used as cross-validation data to pick the best parameter value that minimize the error of the cross-validation data.

As a concrete example, lets say we have 100 records available.  We'll set aside 20% which is 20 records for testing purposes and use the remaining 80 records to train the model.  Lets say the model has some tunable parameters (e.g. k in KNN, λ in linear model regularization).  For each particular parameter value, we'll conduct 10 rounds of training (ie: k = 10).  Within each round, we randomly select 90% which is 72 records to train a model and compute the error of prediction against the 8 unselected records.  Then we take the average error of these 10 rounds and pick the optimal parameter value that gives the minimal average error.  After picking the optimal tuning parameter,  we retrain the model using the whole 80 records.

To evaluate the predictive performance of the model, we'll test it against the 20 testing records we set aside at the beginning.  The details will be described below.

Measuring Regression Performance

For regression problem, measuring the distance between the estimated output from the actual output is used to quantify the model's performance.  Three measures are commonly used:  Root Mean Square Error, Relative Square Error and Coefficient of Determination.  Typically root mean square is used for measuring the absolute quantity of accuracy.

Mean Square Error MSE = (1/N) * ∑(yest – yactual)2
Root Mean Square Error RMSE = (MSE)1/2

To measure the accuracy with respect to the baseline, we use the ratio of MSE

Relative Square Error RSE = MSE / MSEbaseline
RSE = ∑(yest – yactual)2 / ∑(ymean – yactual)2

Coefficient Of Determination (also called R square) measures the variance that is explained by the model, which is the reduction of variance when using the model.  R square ranges from 0 to 1 while the model has strong predictive power when it is close to 1 and is not explaining anything when it is close to 0.

R2 = (MSEbaseline – MSE) / MSEbaseline
R2 = 1 – RSE

Here are some R code to compute these measures

> Prestige_clean <- Prestige[!$type),]
> model <- lm(prestige~., data=Prestige_clean)
> score <- predict(model, newdata=Prestige_clean)
> actual <- Prestige_clean$prestige
> rmse <- (mean((score - actual)^2))^0.5
> rmse
[1] 6.780719
> mu <- mean(actual)
> rse <- mean((score - actual)^2) / mean((mu - actual)^2) 
> rse
[1] 0.1589543
> rsquare <- 1 - rse
> rsquare
[1] 0.8410457

The Mean Square Error penalize the bigger difference more because of the square effect.  On the other hand, if we want to reduce the penalty of bigger difference, we can log transform the numeric quantity first.

Mean Square Log Error MSLE = (1/N) * ∑(log(y)est – log(y)actual)2
Root Mean Square Log Error RMSLE = (MSLE)1/2

Measuring Classification Performance

For classification problem, there are a couple of measures.
  • TP = Predict +ve when Actual +ve
  • TN = Predict -ve when Actual -ve
  • FP = Predict +ve when Actual -ve
  • FN = Predict -ve when Actual +ve
Precision = TP / Predict +ve = TP / (TP + FP)
Recall or Sensitivity = TP / Actual +ve = TP / (TP + FN)
Specificity = TN / Actual -ve = TN / (FP + TN)
Accuracy = (TP + TN) / (TP + TN + FP + FN)

Accuracy alone is not sufficient to represent the quality of prediction because the cost of making a FP may be different from the cost of making a FN.  F measures provides a tunable assigned weight for computing a final score and is commonly used to measure the quality of a classification model.

1/Fmeasure = α/recall + (1-α)/precision

Notice that most classification model is based on estimating a numeric score for each output class.  By choosing the cutoff point of this score, we can control the tradeoff between precision and recall.  We can plot the relationship between precision and recall at various cutoff points as follows ...

> library(ROCR)
> library(e1071)
> nb_model <- naiveBayes(Species~., data=iristrain)
> nb_prediction <- predict(nb_model,
> score <- nb_prediction[, c("virginica")]
> actual_class <- iristest$Species == 'virginica'
> # Mix some noise to the score
> # Make the score less precise for illustration
> score <- (score + runif(length(score))) / 2
> pred <- prediction(score, actual_class)
> perf <- performance(pred, "prec", "rec")
> plot(perf)

Another common plot is the ROC curve which plot the "sensitivity" (true positive rate) against 1 - "specificity" (false positive rate).  The area under curve "auc" is used to compare the quality between different models with varying cutoff points.  Here is how we produce the ROC curve.

> library(ROCR)
> library(e1071)
> nb_model <- naiveBayes(Species~., 
> nb_prediction <- predict(nb_model, 
> score <- nb_prediction[, c("virginica")]
> actual_class <- iristest$Species == 'virginica'
> # Mix some noise to the score
> # Make the score less precise for illustration
> score <- (score + runif(length(score))) / 2
> pred <- prediction(score, actual_class)
> perf <- performance(pred, "tpr", "fpr")
> auc <- performance(pred, "auc")
> auc <- unlist(slot(auc, "y.values"))
> plot(perf)
> legend(0.6,0.3,c(c(paste('AUC is', auc)),"\n"),
         box.col = "white")

We can also assign the relative cost of making a false +ve and false -ve decision to find the best cutoff threshold.  Here is how we plot the cost curve

> # Plot the cost curve to find the best cutoff
> # Assign the cost for False +ve and False -ve
> perf <- performance(pred, "cost", cost.fp=4, cost.fn=1)
> plot(perf)

From the curve, the best cutoff point 0.6 is where the cost is minimal.

Source of error: Bias and Variance

In model-based machine learning, we are making assumption that the underlying data follows some underlying mathematical model and during the training we try to fit the training data into this assumed model and determine the best model parameters which gives the minimal error.

One source of error is when our assumed model is fundamentally wrong (e.g. if the output has a non-linear relationship with the input but we are assuming a linear model).  This is known as the High Bias problem which we use an over-simplified model to represent the underlying data.

Another source of error is when the model parameters fits too specifically to the training data and not generalizing well to the underlying data pattern.  This is known as the High Variance problem and usually happen when there is insufficient training data compare to the number of model parameters.

High bias problem has the symptom that both training and cross-validation shows a high error rate and both error rate drops as the model complexity increases.  While the training error keep decreasing as the model complexity increases, the cross-validation error will increase after certain model complexity and this indicates the beginning of a high variance problem.  When the size of training data is fixed and the only thing we can choose is the model complexity, then we should choose the model complexity at the point where the cross-validation error is minimal.

Will getting more data help ?

When collecting more data cost both time and money, we need to carefully assess the situation before we spend our effort to do so.  There is a very pragmatic technique suggested by Andrew Ng from Stanford by plotting the error against the size of data.  In this approach, we sample different size of training data to train up different models and plot both the cross-validation error and the training error with respect to the training sample size.
If the problem is a high-bias problem, the error curve will look like the following.

In this case, collecting more data would not help.  We should spend our effort to do the following instead.
  • Add more input features by talking to domain experts to identify more input signals and collect them.
  • Add more input features by combining existing input features in a non-linear way.
  • Try more complex, machine learning models. (e.g. increase the number of hidden layers in Neural Network, or increase the number of Neurons at each layer)
If the problem is a high-variance problem because we over-estimate the model complexity, then we should reduce the model complexity by throwing away attributes that has low influence to the output.  We don't have to gather more data.

The only situation where having more data will be helpful is when the underlying data model is in fact complex.  Therefore we cannot just reduce the complexity as this will immediately results in a high-bias problem.  In this case, the error curve will have the following shape.

And the only way is to collect more training data such that overfitting is less likely to happen.

Evaluate the performance of a model is very important in the overall cycle of predictive analytics.  Hopefully this introductory post gives a basic idea on how this can be done.

Wednesday, June 6, 2012

Predictive Analytics: Decision Tree and Ensembles

Continue from my last post of walking down the list of machine learning technique.  In this post, I will covered Decision Tree and Ensemble methods.  We'll continue using the iris data we prepare in this earlier post.

Decision Tree

Decision Tree model is one of the oldest machine learning model and is usually used to illustrate the very basic idea of machine learning.  Based on a tree of decision nodes, the learning approach is to recursively divide the training data into buckets of homogeneous members through the most discriminative dividing criteria.  The measurement of "homogeneity" is based on the output label; when it is a numeric value, the measurement will be the variance of the bucket; when it is a category, the measurement will be the entropy or gini index of the bucket..

During the training, various dividing criteria based on the input will be tried (using in a greedy manner); when the input is a category (Mon, Tue, Wed ...), it will first be turned into binary (isMon, isTue, isWed ...) and then use the true/false as a decision boundary to evaluate the homogeneity; when the input is a numeric or ordinal value, the lessThan, greaterThan at each training data input value will be used as the decision boundary.

The training process stops when there is no significant gain in homogeneity by further split the Tree. The members of the bucket represented at leaf node will vote for the prediction; majority wins when the output is a category and member’s average is taken when the output is a numeric.

Here is an example in R

> install.packages('rpart')
> library(rpart)
> #Train the decision tree
> treemodel <- rpart(Species~., data=iristrain)
> plot(treemodel)
> text(treemodel, use.n=T)
> #Predict using the decision tree
> prediction <- predict(treemodel, newdata=iristest, type='class')
> #Use contingency table to see how accurate it is
> table(prediction, iristest$Species)
prediction   setosa versicolor virginica
  setosa         10          0         0
  versicolor      0         10         3
  virginica       0          0         7

Here is the decision tree that we have learned.

The good part of Tree is that it can take different data type of input and output variables which can be categorical, binary and numeric value.  It can handle missing attributes and outliers well.  The decision tree is also good in explaining reasoning for its prediction and therefore gives good insight about the underlying data.

The limitation of decision tree is that each decision boundary at each split point is a concrete binary decision. Also the decision criteria only consider one input attributes at a time but not a combination of multiple input variables. Another weakness of Tree is that once learned it cannot be updated incrementally. When new training data arrives, you have to throw away the old tree and retrain every data from scratch.  In practice, standalone decision tree and rarely used in practice as its predictive accuracy and relatively low.  Tree ensembles (described below) are the common way to use decision trees.

Tree Ensembles

Instead of picking a single model, Ensemble Method combines multiple models in a certain way to fit the training data.  There are two primary ways: “bagging” and “boosting”.  In “bagging”, we take a subset of training data (pick n random sample out of N training data with replacement) to train up each model.  After multiple models are trained, we use a voting scheme to predict future data.

“Boosting” is another approach in Ensemble Method.  Instead of sampling the input features, it samples the training data records, but it puts more emphasis on the training data that is wrongly predicted in previous iterations.  Initially each training data is equally weighted.  At each iteration, the data that is wrongly classified will have its weight increased. The final prediction will be voted by each tree learned at each iteration weighted by the accuracy of that tree.

Random Forest

Random Forest is one of the most popular bagging models; in additional to selecting n training data out of N, at each decision node of the tree, it randomly selects m input features from the total M input features (m ~ M^0.5) and learns a decision tree from it.  Finally each tree in the forest vote for the result.

Here is the R code to use Random Forest.

> install.packages(‘randomForest')
> library(randomForest)
> #Train 100 trees, random selected attributes
> model <- randomForest(Species~., data=iristrain, nTree=500)
> #Predict using the forest
> prediction <- predict(model, newdata=iristest, type='class')
> table(prediction, iristest$Species)
> importance(model)
Sepal.Length         7.807602
Sepal.Width          1.677239
Petal.Length        31.145822
Petal.Width         38.617223

Gradient Boosted Trees

Gradient Boosting Method is one of the most popular boosting methods. It is based on incrementally add a function that fits the gradient of a lost function of residuals
  1. Set i = 0 at the beginning, repeat until converge
  2. Learn a function F(X) to predict Y.  Basically find F that minimize expected(L(F(X) – Y)), where L is the lost function of the residual
  3. Learning another function g(X) to predict the gradient of above function
  4. Update F = F + a.g(X), where a is the learning rate
 Gradient Boosted Tree using decision tree as the learning model F.

Here is the sample code in R.

> library(gbm)
> iris2 <- iris
> newcol = data.frame(isVersicolor=(iris2$Species=='versicolor'))
> iris2 <- cbind(iris2, newcol)
> iris2[45:55,]
   Sepal.Length Sepal.Width Petal.Length Petal.Width    Species isVersicolor
45          5.1         3.8          1.9         0.4     setosa        FALSE
46          4.8         3.0          1.4         0.3     setosa        FALSE
47          5.1         3.8          1.6         0.2     setosa        FALSE
48          4.6         3.2          1.4         0.2     setosa        FALSE
49          5.3         3.7          1.5         0.2     setosa        FALSE
50          5.0         3.3          1.4         0.2     setosa        FALSE
51          7.0         3.2          4.7         1.4 versicolor         TRUE
52          6.4         3.2          4.5         1.5 versicolor         TRUE
53          6.9         3.1          4.9         1.5 versicolor         TRUE
54          5.5         2.3          4.0         1.3 versicolor         TRUE
55          6.5         2.8          4.6         1.5 versicolor         TRUE
> formula <- isVersicolor ~ Sepal.Length 
                              + Sepal.Width
                              + Petal.Length
                              + Petal.Width
> model <- gbm(formula, data=iris2, 
Iter   TrainDeviance   ValidDeviance   StepSize   Improve
     1        1.2714         -1.#IND     0.0010    0.0008
     2        1.2705         -1.#IND     0.0010    0.0004
     3        1.2688         -1.#IND     0.0010    0.0007
     4        1.2671         -1.#IND     0.0010    0.0008
     5        1.2655         -1.#IND     0.0010    0.0008
     6        1.2639         -1.#IND     0.0010    0.0007
     7        1.2621         -1.#IND     0.0010    0.0008
     8        1.2614         -1.#IND     0.0010    0.0003
     9        1.2597         -1.#IND     0.0010    0.0008
    10        1.2580         -1.#IND     0.0010    0.0008
   100        1.1295         -1.#IND     0.0010    0.0008
   200        1.0090         -1.#IND     0.0010    0.0005
   300        0.9089         -1.#IND     0.0010    0.0005
   400        0.8241         -1.#IND     0.0010    0.0004
   500        0.7513         -1.#IND     0.0010    0.0004
   600        0.6853         -1.#IND     0.0010    0.0003
   700        0.6266         -1.#IND     0.0010    0.0003
   800        0.5755         -1.#IND     0.0010    0.0002
   900        0.5302         -1.#IND     0.0010    0.0002
  1000        0.4901         -1.#IND     0.0010    0.0002

> prediction <- predict.gbm(model, iris2[45:55,], 
> round(prediction, 3)
 [1] 0.127 0.131 0.127 0.127 0.127 0.127 0.687 0.688 
     0.572 0.734 0.722
> summary(model)
           var       rel.inf
1 Petal.Length 61.4203761582
2  Petal.Width 34.7557511871
3  Sepal.Width  3.5407662531
4 Sepal.Length  0.2831064016

GBM R package also gave the relative importance of the input features.
I have gone through most of the common machine learning techniques.  In this next post, I will covered how to evaluate the performance of these models in their predictive power.