This post got a little long on me so let me go ahead and start with the punchline…that way you can decide for yourself whether to invest the next 4.5 minutes of your life reading the rest: In some past work I used R’s lapply() function with the packages “snow” and “snowfall” to implement a split/apply/combine strategy to calculate total distance for a set of 216,000 fishing trips. Even using parallel programming methods with a local cluster this took a long time…as in hours. I recently redid this same task using Oracle R Enterprise (warning: not a free open source product) in 9 minutes. I thought that time savings was cool enough to write a blog post about.

Here are some resources for the Oracle R Enterprise function that this post focuses on:

In this post:

https://thesamuelsoncondition.com/2014/03/05/r-rodbc-and-big-data-analytics/

I provided some code to use the ‘RODBC’ and ‘snow’ packages in R to query a database then process the resulting data in a parallel environment. In that example I used the split/apply/combine protocol to:

TRIP_ID | DEPART_PORT | RETURN_PORT | DEPART_DAY | RETURN_DAY |
---|---|---|---|---|

1 | Astoria | Astoria | 2011-10-11 | 2011-10-14 |

2 | San Francisco | Bodega Bay | 1984-4-20 | 1984-4-21 |

3 | Eureka | Crescent City | 1999-3-21 | 1999-3-23 |

TRIP_ID | TOW_NUM | SET_LAT | SET_LONG |
---|---|---|---|

1 | 1 | 37.124 | -124.124 |

1 | 2 | 37.234 | -124.345 |

1 | 3 | 37.667 | -124.345 |

Finally, there is a .csv file that I created which contains the lat/long coordinates of all the ports in my data. The data look something like this:

PORT_ID | PORT_NAME | LAT | LONG |
---|---|---|---|

AST | Astoria | 46.187 | -123.831 |

BDG | Bodega Bay | 38.333 | -123.048 |

The distance between two pairs of lat/long coordinates can be calculated using the Haversine distance formula, which can be operationalized in R as a function like this (it can also be operationalized in more compact form but I like to be explicit with my shit):

#Inputs: # pos.1 = c(latitude 1, longitude 1) # pos.2 = c(latitude 2, longitude 2) distance <- function(pos1,pos2){ p1 <- (pos.1*pi)/180 p2 <- (pos.2*pi)/180 a <- sin((p2[1]-p1[1])*0.5)*sin((p2[1]-p1[1])*0.5) + sin((p2[2]-p1[2])*0.5)*sin((p2[2]-p1[2])*0.5)* cos(p1[1])*cos(p2[1]) c <- 2*atan2(sqrt(a),sqrt(1-a)) d <- c*6371 return(d) }

By calling this function for each successive pair of points on a fishing trip (i.e. departure port to first tow, first tow to second tow,…, last tow to return port) then summing all of those distances, we can get a decent approximation to the distance traveled on a fishing trip.

**The Split/Apply/Combine approach**

One approach to approximating this total distance is to write another function that will call the distance function for each segment of each fishing trip:

trip.distance <- function(trip.id){ data <- point.data[point.data$TRIP_ID==trip.id,] td <- list() for(i in 1:max(data$TOWNUM)){ if(i==1){ pos.1 <- c(data$DLAT,data$DLONG) pos.2 <- c(data$SET_LAT[i], data$SET_LONG[i]) d <- distance(pos.1,pos.2) pos.1 <- c(data$SET_LAT[i],data$SET_LONG[i]) pos.2 <- c(data$SET_LAT[i+1], data$SET_LONG[i+1]) d <- d+ distance(pos.1,pos.2) } if(i>1 & i<max(data$TOWNUM)){ pos.1 <- c(data$SET_LAT[i],data$SET_LONG[i]) pos.2 <- c(data$SET_LAT[i+1], data$SET_LONG[i+1]) d <- distance(pos.1,pos.2) } if(i==max(data$TOWNUM)){ pos.1 <- c(data$SET_LAT[i],data$SET_LONG[1]) pos.2 <- c(data$RLAT[i], data$RLONG[i]) d <- distance(pos.1,pos.2) } td[[i]] <- d } return(data.frame(TRIP_ID=trip.id,D=sum(unlist(td)))) }

Provided that the data frame “point.data” contains the departure port, return port and lat/long coordinates for each fishing event for each fishing trip, the function above (trip.distance) can be applied to each TRIP_ID in the data frame “point.data”,

require(data.table) trip.distances <- data.frame(rbindlist(lapply(unique(point.data$TRIP_ID),trip.distance)))

This, in a nutshell, is the split/apply/combine protocol. The function trip.distance() selects a small chunk from the larger data frame (the chunk is all data corresponding to a particular trip identifier), then lapply() tells R to apply the trip.distance() function to each unique TRIP_ID in the dataframe. The issue at hand is that the data I’m working with have ~200,000 unique fishing trips. Since each trip has between 5 – 10 fishing events, the georeferenced point data I’m working with has ~2 million rows. For context, I timed a few runs through the function above:

t <- Sys.time() x.tmp <- lapply(unique(point.data$TRIP_ID)[1:10],trip.distance) Sys.time() - t > Time difference of 3.283689 secs t <- Sys.time() x.tmp <- lapply(unique(point.data$TRIP_ID)[1:100],trip.distance) Sys.time() - t > Time difference of 32.3096 secs t <- Sys.time() x.tmp <- lapply(unique(point.data$TRIP_ID)[1:1000],trip.distance) Sys.time() - t > Time difference of 5.409794 mins

So it’s probably pretty clear that I won’t be able to run this calculation for all 216,000 unique trip identifiers in the data in any reasonable amount of time….at this current rate it’s looking like around 16 hours of run time should do the job. I could (and have) parse this job out to a cluster using R’s “snow” and “snowfall” packages for easy distributed computing. With a local cluster of 8 CPUs I got the job down to a couple hours. Using the ore.groupApply() function available in Oracle R Enterprise, I was able to get total trip distance for all 216,000 unique fishing trips in my data in under 10 minutes. Here is the code for my solution:

First, I setup the data which basically consists of merging departure and return ports for each fishing trip with the point by point data on locations of each fishing event (tow).

#load the necessary Oracle R Enterprise packages library(OREbase) library(ORE) library(DBI) library(ROracle) #connect to my schema on the database ore.connect(user=username, sid="sidname", host="hostlocation", password=password, port=1521, all=TRUE) #Am I connected? ore.is.connected() #note, I've had some issue with Oracle saving a lot of temporary # objects to a recycle bin that seems to fill up my schema. I like # to start fresh so I purge the trash bin before and after ore.exec ("purge tablespace users user samuelsoncondition") #turn off the option to hold over temporary objects ore.exec("ALTER SESSION SET recyclebin = OFF") #get the external .csv file that contains latitude and longitude # coordinates for each West Coast port port.codes <- read.csv("port_codes.csv",stringsAsFactors=F) #Sync with the correct schema to get access to data ore.sync("SCHEMA1",table=c("TRIP","TOW")) ore.attach('SCHEMA1') #get starting and ending points for each fishing trip start.end <- TRIP[,c('DPORT','RPORT','TRIP_ID')] #merge the starting and ending points (departure and return ports) with #the location data on individual fishing events tows <- TOW[,c('TRIP_ID','TOWNUM','SET_LAT','SET_LONG','UP_LAT','UP_LONG')] tows <- merge(tows,start.end,by="TRIP_ID") #get the lat/longs for departure and return port port.codes <- ore.push(port.codes) tows <- merge(tows,port.codes,by.x="DPORT",by.y="LBK_PORT") names(tows)[names(tows)=="LAT"] <- "DLAT" names(tows)[names(tows)=="LONG"] <- "DLONG" tows <- merge(tows,port.codes,by.x="RPORT",by.y="LBK_PORT") names(tows)[names(tows)=="LAT"] <- "RLAT" names(tows)[names(tows)=="LONG"] <- "RLONG

At this point I have a temporary table (~2 million rows with each row corresponding to the location of one fishing event of a particular trip identifier) residing in my schema on the database which looks something like this:

TRIP_ID | TOW_NUM | SET_LAT | SET_LONG | DLAT | DLONG | RLAT | RLONG |
---|---|---|---|---|---|---|---|

1 | 1 | 47.9 | -125.56 | 46.89 | -124.104 | 46.89 | -124.104 |

1 | 2 | 47.966 | -125.65 | 46.89 | -124.104 | 46.89 | -124.104 |

2 | 1 | 47.81 | -125.133 | 46.89 | -124.104 | 46.89 | -124.104 |

The code to implement the ore.groupApply() function over these data is:

trip.distance <- ore.groupApply(TOWS,INDEX=TOWS$TRIP_ID, function(dat){ #if there are 0's or NA's in the lat/long data we skip the current trip if(length(dat$TOWNUM[dat$SET_LAT==0|dat$SET_LONG==0|is.na(dat$SET_LAT)==T|is.na(dat$SET_LONG)==T])>0){ dist <- list(NA) }else{ dist <- list() for(i in 1:max(dat$TOWNUM)){ d <- 0 if(i==1){ #first the distance from departure port to first tow set x <- c(dat$DLAT[i],dat$DLONG[i]) y <- c(dat$SET_LAT[i],dat$SET_LONG[i]) p1 <- (x*pi)/180 p2 <- (y*pi)/180 a <- sin((p2[1]-p1[1])*0.5)*sin((p2[1]-p1[1])*0.5) + sin((p2[2]-p1[2])*0.5)*sin((p2[2]-p1[2])*0.5)*cos(p1[1])*cos(p2[1]) c <- 2*atan2(sqrt(a),sqrt(1-a)) d <- c*6371 #next the distance from first two set to 2nd tow set x <- c(dat$SET_LAT[i],dat$SET_LONG[i]) y <- c(dat$SET_LAT[i+1],dat$SET_LONG[i+1]) p1 <- (x*pi)/180 p2 <- (y*pi)/180 a <- sin((p2[1]-p1[1])*0.5)*sin((p2[1]-p1[1])*0.5) + sin((p2[2]-p1[2])*0.5)*sin((p2[2]-p1[2])*0.5)*cos(p1[1])*cos(p2[1]) c <- 2*atan2(sqrt(a),sqrt(1-a)) d <- d + (c*6371) } if(i!=1 & i!=max(dat$TOWNUM)){ x <- c(dat$SET_LAT[i],dat$SET_LONG[i]) y <- c(dat$SET_LAT[i+1],dat$SET_LONG[i+1]) p1 <- (x*pi)/180 p2 <- (y*pi)/180 a <- sin((p2[1]-p1[1])*0.5)*sin((p2[1]-p1[1])*0.5) + sin((p2[2]-p1[2])*0.5)*sin((p2[2]-p1[2])*0.5)*cos(p1[1])*cos(p2[1]) c <- 2*atan2(sqrt(a),sqrt(1-a)) d <- c*6371 } if(i==max(dat$TOWNUM)){ x <- c(dat$SET_LAT[i],dat$SET_LONG[i]) y <- c(dat$RLAT[i],dat$RLONG[i]) p1 <- (x*pi)/180 p2 <- (y*pi)/180 a <- sin((p2[1]-p1[1])*0.5)*sin((p2[1]-p1[1])*0.5) + sin((p2[2]-p1[2])*0.5)*sin((p2[2]-p1[2])*0.5)*cos(p1[1])*cos(p2[1]) c <- 2*atan2(sqrt(a),sqrt(1-a)) d <- c*6371 } dist[[i]] <- d } } return(data.frame(TRIP_ID=unique(dat$TRIP_ID),D=sum(unlist(dist)))) })

The function inside of ore.groupApply() that calculates total trip distance is exactly like the trip.distance() function we defined above with one rather simple exception: it accepts the input “dat” rather than the input “trip.id.” This is because the argument INDEX=TOWS$TRIP_ID in the ore.groupApply() function is telling R to split the data into chunks based on the values in the column TRIP_ID of the TOWS table. It will then apply the operations in function(){} to each chunk of the data (“dat”) and bring the results back together as a list.

As I said before this is blindingly fast. I was able to make the trip distance calculation for all 216,000 unique trips and ~2 million pairs of lat/long coordinates in 9.5 minutes. I admit I don’t really know why doing this in Oracle R Enterprise is so fast. I suspect that, because the operations are running on the R engine living on the database server and not on my local R instance, that it simply has more workers to parse this parallel job out to…maybe if I tested my local R + “snow” solution with a couple hundred workers instead of just 8 it would be just as fast?

So far I’ve uncovered an important logistical detail in using ore.groupApply(): my database administrator had to explicitly grant me “create produce” privileges in order to use ore.groupApply().

I wish you had the re-blog button, I would re-blog a lot of your posts!