In this example I’m going to outline a ghetto way to use R’s distributed computing packages (‘snow’ and ‘snowfall’) and RODBC to process large amounts of data. This specific case is motivated by the fact that it’s only possible to bring a certain amount of data onto a local R workspace.
There are plenty of solid tutorials out there on working with big data in R. I plan to get to some more big data application with R in the future…for now, here’s a quick-and-dirty way I’ve used to run some exploratory stats on a large data set.
The basic set-up is this: I’m working with satellite tracking data of large marine predators. There are ~6000 units whose positions (latitude and longitude coordinates) are logged several times an hour and pinged back to a receiver. The result is several million observations each month. One, rather elementary thing I’d like to do, is know how far each unit travels every day.
First, we’ll set up the Haversine distance function for later use:
require(RODBC) #establish the haversine distance function #function inputs: # p1: [1 X 2] -- (lat1,long1) # p2: [1 X 2] -- (lat2,long2) total.distance <- function(pos1,pos2){ p1 <- (pos.1*pi)/180 p2 <- (pos.2*pi)/180 a <- sin((p2[1]-p1[1])*0.5)*sin((p2[1]-p1[1])*0.5) + sin((p2[2]-p1[2])*0.5)*sin((p2[2]-p1[2])*0.5)* cos(p1[1])*cos(p2[1]) c <- 2*atan2(sqrt(a),sqrt(1-a)) d <- c*6371 return(d) }
Next, we set up the requirements for distributed computing. These basically consist of:
- the required R packages (‘snow’ and ‘snowfall’)
- the command sfInit which tell R how many processors I want to use (‘cpus = 4’)
- the command sfLibrary which send the required packages out to the cluster nodes
- and the command sfExport, which exports required objects out to the cluster nodes
#Set up the requirements for distributed computing #required packages require(snowfall) require(snow) #initialize the cluster sfInit(parallel=T, cpus=4) #send needed packages out to the cluster nodes sfLibrary(snowfall) sfLibrary(RODBC) #send necessary objects out to cluster nodes sfExport(list=c("total.distance"))
Here’s where shit gets real. With > 17 million records every month, I can’t bring an entire year’s worth of tracking data onto my local R workspace. Instead of pinging the server for one manageable chunk of data at a time, I’m going to bring chunks of data into R and process them in parallel. This is done using the ‘sfLapply’ function. The routine below proceeds in a few parts:
1. the function inside ‘sfLapply’ sets up a database query to bring one month’s worth of data into R. The critical piece here is the SQL code embedded in ‘sqlcmd’ which parses the database table and brings in data for year 2009 and month = ‘i’.
"WHERE DATEPART(yyyy,fmc_pos_woc_master.utc_date)=2009", "AND", "DATEPART","(mm,fmc_pos_woc_master.utc_date)=", i)
2. With that chunk of data we’re going to do some basic coercing of data types, then calculate the total distance traveled for each unit in the data (since our data chunk is a month, the will return the total distance traveled for each unit during the month).
Here it is assumed that the data we want are in a table within the database called “tracking_data.” And that tracking data contains a single position variable called ‘geom’ (of data type Geometry) from which we extract the lat/long using the SQL syntax geom.Lat/geom.Long. The tracking_data table also contains a unit identifier “object_id” and a date and time variable (“utc_date” and “utc_time”, respectively).
t <- Sys.time() result.list <- sfLapply(1:12, function(i){channel <- odbcConnect("server1", uid="samuelsoncondition", pwd = "pUblikG00dz") sqlcmd <- paste("SELECT", "tracking_data.geom.Lat as lat", ",", "tracking_data.geom.Long as long", ",", "tracking_data.utc_time",",", "tracking_data.utc_date",",", "tracking_data.object_id", "FROM tracking_data", "WHERE DATEPART(yyyy,tracking_data.utc_date)=2009", "AND", "DATEPART(mm,tracking_data.utc_date)=", i) df.tracks <- sqlQuery(channel,sqlcmd) close(channel) df.tracks$utc_time <- substr(as.character(df.tracks$utc_time), 1, nchar(as.character(df.tracks$utc_time))-3) df.tracks$utc_time <- as.POSIXct(df.tracks$utc_time, format="%H:%M") id.list <- unique(df.tracks$object_id) #apply the distance calculation function to the set of unique #object id's for the current data chunk. The distance function #will calculate the distance between two consecutive points #and sum these. lapply(id.list, function(id,data){ df.tmp <- data[which(data$object_id==id),] df.tmp <- df.tmp[order(df.tmp$utc_date,df.tmp$utc_time),] D<-0 for(irow in 2:nrow(df.tmp)){ d <-total.distance(c(df.tmp$lat[irow-1], df.tmp$long[irow-1]), c(df.tmp$lat[irow], df.tmp$long[irow])) D <- D + d } return(c(id,D)) },data=df.tracks) } ) Sys.time() - t sfStop()
To recap: the basic problem was that I wanted a quick way to summarize the distances traveled by units in my satellite tracking data set. However, the data are such that it is not feasible to bring an entire year’s worth data into R from the server and manipulate it there. I have no doubt that there are straightforward ways to do the data manipulation above directly on the SQL server. For me, it was pretty quick to split the data into monthly chunks and manipulate each chunk. Using R’s ‘snow’, ‘snowfall’ and a computational server with ~200 virtual machines, I was able to send 12 separate data chunks (1 for each month) out to 12 different cluster nodes. This cut my computational time down to 30 minutes it would have taken for this routine to run sequentially.
A final note: the real bottleneck here is R’s communication with the SQL server. The actual calculation and aggregation of distances by unit runs super fast. For this reason, it makes sense to find out how much data you can bring into a local workspace. If, for example, you can bring 3 months at a time instead of 1 month at a time, you’re looking at A BIG time savings.
Thanks. I’m not finding wordpress the easiest environment to navigate but I’ll work on adding a re-blog option.