Iris example in SPMD

This is a SPMD version for the iris example. This intends to perform parallel computing on distributed data. It should be run by four processors by Rscript as

SHELL> mpiexec -np 4 Rscript iris_spmd.r

A file Rplot.pdf or a similar file will be created. The plot contains visualization of clustering results on the first two principal components.


SPMD Code: (iris_spmd.r)

# File name: iris_spmd.r
# Run: mpiexec -np 4 Rscript iris_spmd.r

rm(list = ls())                                       # Clean environment
library(pbdMPI, quiet = TRUE)                         # Load library
if(comm.size() != 4)
  comm.stop("4 processors are required.")

### Load data
X <- as.matrix(iris[, -5])                            # Dimension 150 by 4
X.cid <- as.numeric(iris[, 5])                        # True id

### Distribute data
jid <- get.jid(nrow(X))
X.spmd <- X[jid,]                                     # SPMD row-major format

### Standardized
N <- allreduce(nrow(X.spmd))                          # 150
p <- ncol(X.spmd)                                     # 4
mu <- allreduce(colSums(X.spmd / N))
X.std <- sweep(X.spmd, 2, mu, FUN = "-")              # Substract mean
std <- sqrt(allreduce(colSums(X.std^2 / (N - 1))))
X.std <- sweep(X.std, 2, std, FUN = "/")              # Divide standard error

### SVD manually in serial
X.tmp <- crossprod(X.std)                             # X'X (local)
X.tmp <- allreduce(X.tmp)
dim(X.tmp) <- c(p, p)
ret <- eigen(X.tmp)                                   # X'X = V D^2 V'
d <- sqrt(ret$values)
v <- ret$vectors
u <- X.std %*% v %*% diag(1/d)                        # Why X V D^(-1)) = U?

### Validation 
tmp <- svd(scale(X))
comm.print(c(sum(abs(tmp$u[jid,] - u)),
             sum(abs(tmp$v - v)),
             sum(abs(tmp$d - d))))

### Project on column space of singular vectors
A <- u %*% diag(d)
B <- X.std %*% v                                      # A ~ B 
X.prj <- A[, 1:2]                                     # Only useful for plot
X.prj <- do.call("rbind", allgather(X.prj))

### Clustering
library(pmclust, quiet = TRUE)
comm.set.seed(123, diff = TRUE)

X.spmd <- X.std
PARAM.org <- set.global(K = 3)                        # Preset storage
.pmclustEnv$CONTROL$debug <- 0                        # Disable debug messages
PARAM.org <- initial.center(PARAM.org)                # Initial parameters
PARAM.kms <- kmeans.step(PARAM.org)                   # K-means
X.kms.cid <- allgather(.pmclustEnv$CLASS.spmd,
                       unlist = TRUE)

PARAM.org <- set.global(K = 3)                        # Preset storage
.pmclustEnv$CONTROL$debug <- 0                        # Disable debug messages
PARAM.org <- initial.em(PARAM.org,
                        MU = PARAM.kms$MU)            # Initial by K-means
PARAM.mbc1 <- em.step(PARAM.org)                      # Model-based clustering
X.mbc1.cid <- allgather(.pmclustEnv$CLASS.spmd,
                        unlist = TRUE)

PARAM.org <- set.global(K = 3, RndEM.iter = 1000)     # Preset storage
.pmclustEnv$CONTROL$debug <- 0                        # Disable debug messages
PARAM.org <- initial.RndEM(PARAM.org)                 # Initial by Rand-EM
PARAM.mbc2 <- em.step(PARAM.org)                      # Model-based clustering
X.mbc2.cid <- allgather(.pmclustEnv$CLASS.spmd,
                        unlist = TRUE)

### Validation
X.kms.adjR <- EMCluster::RRand(X.cid, X.kms.cid)$adjRand
X.mbc1.adjR <- EMCluster::RRand(X.cid, X.mbc1.cid)$adjRand
X.mbc2.adjR <- EMCluster::RRand(X.cid, X.mbc2.cid)$adjRand
comm.print(c(X.kms.adjR, X.mbc1.adjR, X.mbc2.adjR))

### Swap classification id
tmp <- X.kms.cid
X.kms.cid[tmp == 1] <- 2
X.kms.cid[tmp == 2] <- 1
tmp <- X.mbc1.cid
X.mbc1.cid[tmp == 1] <- 2
X.mbc1.cid[tmp == 2] <- 1

### Display on first 2 components
if(comm.rank() == 0){
  par(mfrow = c(2, 2))
  plot(X.prj, col = X.cid + 1, pch = X.cid,
       main = "iris (true)", xlab = "PC1", ylab = "PC2")
  plot(X.prj, col = X.kms.cid + 1, pch = X.kms.cid,
       main = paste("iris (kmeans)", sprintf("%.4f", X.kms.adjR)),
       xlab = "PC1", ylab = "PC2")
  plot(X.prj, col = X.mbc1.cid + 1, pch = X.mbc1.cid,
       main = paste("iris (model-based 1)", sprintf("%.4f", X.mbc1.adjR)),
       xlab = "PC1", ylab = "PC2")
  plot(X.prj, col = X.mbc2.cid + 1, pch = X.mbc2.cid,
       main = paste("iris (model-based 2)", sprintf("%.4f", X.mbc2.adjR)),
       xlab = "PC1", ylab = "PC2")
}

### Finish
finalize()