This post got a little long on me so let me go ahead and start with the punchline…that way you can decide for yourself whether to invest the next 4.5 minutes of your life reading the rest: In some past work I used R’s lapply() function with the packages “snow” and “snowfall” to implement a split/apply/combine strategy to calculate total distance for a set of 216,000 fishing trips. Even using parallel programming methods with a local cluster this took a long time…as in hours. I recently redid this same task using Oracle R Enterprise (warning: not a free open source product) in 9 minutes. I thought that time savings was cool enough to write a blog post about.

Here are some resources for the Oracle R Enterprise function that this post focuses on:

  • https://blogs.oracle.com/R/entry/invoking_r_scripts_via_oracle1
  • https://blogs.oracle.com/R/entry/solving_big_problems_with_oracle1
  •  

    In this post:
    https://thesamuelsoncondition.com/2014/03/05/r-rodbc-and-big-data-analytics/

    I provided some code to use the ‘RODBC’ and ‘snow’ packages in R to query a database then process the resulting data in a parallel environment.  In that example I used the split/apply/combine protocol to:

    1. split the data into manageable chunks
    2. send the chunks to different nodes on a cluster of workers
    3. apply a function which creates a path from a collection of georeferenced points and calculates the total distance traveled along that path.

     

    I’ve recently had occassion to do something similar using Oracle R Enterprise and, although I’m still not a huge fan of this product for a variety of reason, I was able to operate with a very large dataset amazingly fast with the split/apply/combine protocol….this made me happy. Unfortunately, I don’t really have the gumption for a full introductory “How to get up and running with Oracle R Enterprise” post…maybe that will come another time. Right now I’d like to focus on one specific use case where having access to this costly product paid off.

     

    Here’s the high-level summary of what I was trying to accomplish:
    I work with a database containing a table called TRIPS.  The table contains information on commercial fishing trip taken in Washington, Oregon and California.  The table looks something like this (the data are confidential so these field names and values are totally made-up):

     

    TRIP_ID DEPART_PORT RETURN_PORT DEPART_DAY RETURN_DAY
    1 Astoria Astoria 2011-10-11 2011-10-14
    2 San Francisco Bodega Bay 1984-4-20 1984-4-21
    3 Eureka Crescent City 1999-3-21 1999-3-23

     

    The TRIPS table is linked to another table called TOWS through the field TRIP_ID.  TOWS is a table containing (among other things) the latitude and longitude coordinates of each fishing event on each fishing trip.  It looks something like this:

     

    TRIP_ID TOW_NUM SET_LAT SET_LONG
    1 1 37.124 -124.124
    1 2 37.234 -124.345
    1 3 37.667 -124.345

     

    Finally, there is a .csv file that I created which contains the lat/long coordinates of all the ports in my data. The data look something like this:

     

    PORT_ID PORT_NAME LAT LONG
    AST Astoria 46.187 -123.831
    BDG Bodega Bay 38.333 -123.048

     

    Fishing trips generally contain between 5 and 10 fishing events.  I need to use the information on the starting/ending points for each trip, along with the lat/long coordinates of fishing events, to calculate a total distance traveled on each fishing trip.

     

    The distance between two pairs of lat/long coordinates can be calculated using the Haversine distance formula, which can be operationalized in R as a function like this (it can also be operationalized in more compact form but I like to be explicit with my shit):

     

    #Inputs:
    # pos.1 = c(latitude 1, longitude 1)
    # pos.2 = c(latitude 2, longitude 2)
    distance <- function(pos1,pos2){
      p1 <- (pos.1*pi)/180
      p2 <- (pos.2*pi)/180
      a <- sin((p2[1]-p1[1])*0.5)*sin((p2[1]-p1[1])*0.5) + 
        sin((p2[2]-p1[2])*0.5)*sin((p2[2]-p1[2])*0.5)*
        cos(p1[1])*cos(p2[1])
      c <- 2*atan2(sqrt(a),sqrt(1-a))
      d <- c*6371
    return(d)
    }
    

     

    By calling this function for each successive pair of points on a fishing trip (i.e. departure port to first tow, first tow to second tow,…, last tow to return port) then summing all of those distances, we can get a decent approximation to the distance traveled on a fishing trip.

    The Split/Apply/Combine approach
    One approach to approximating this total distance is to write another function that will call the distance function for each segment of each fishing trip:

    trip.distance <- function(trip.id){
      data <- point.data[point.data$TRIP_ID==trip.id,]
      td <- list()
      for(i in 1:max(data$TOWNUM)){
        if(i==1){
           pos.1 <- c(data$DLAT,data$DLONG)
           pos.2 <- c(data$SET_LAT[i], data$SET_LONG[i])
           d <- distance(pos.1,pos.2)
       
           pos.1 <- c(data$SET_LAT[i],data$SET_LONG[i])
           pos.2 <- c(data$SET_LAT[i+1], data$SET_LONG[i+1])
           d <- d+ distance(pos.1,pos.2)
        }
        if(i>1 & i<max(data$TOWNUM)){
           pos.1 <- c(data$SET_LAT[i],data$SET_LONG[i])
           pos.2 <- c(data$SET_LAT[i+1], data$SET_LONG[i+1])
           d <- distance(pos.1,pos.2)      
        }
        if(i==max(data$TOWNUM)){
           pos.1 <- c(data$SET_LAT[i],data$SET_LONG[1])
           pos.2 <- c(data$RLAT[i], data$RLONG[i])
           d <- distance(pos.1,pos.2)
        }
      td[[i]] <- d
      }
    return(data.frame(TRIP_ID=trip.id,D=sum(unlist(td))))
    }
    

     

    Provided that the data frame “point.data” contains the departure port, return port and lat/long coordinates for each fishing event for each fishing trip, the function above (trip.distance) can be applied to each TRIP_ID in the data frame “point.data”,

    require(data.table)
    trip.distances <- data.frame(rbindlist(lapply(unique(point.data$TRIP_ID),trip.distance)))
    

    This, in a nutshell, is the split/apply/combine protocol. The function trip.distance() selects a small chunk from the larger data frame (the chunk is all data corresponding to a particular trip identifier), then lapply() tells R to apply the trip.distance() function to each unique TRIP_ID in the dataframe. The issue at hand is that the data I’m working with have ~200,000 unique fishing trips. Since each trip has between 5 – 10 fishing events, the georeferenced point data I’m working with has ~2 million rows. For context, I timed a few runs through the function above:

    t <- Sys.time()
    x.tmp <- lapply(unique(point.data$TRIP_ID)[1:10],trip.distance)
    Sys.time() - t
    > Time difference of 3.283689 secs
    
    t <- Sys.time()
    x.tmp <- lapply(unique(point.data$TRIP_ID)[1:100],trip.distance)
    Sys.time() - t
    > Time difference of 32.3096 secs
    
    t <- Sys.time()
    x.tmp <- lapply(unique(point.data$TRIP_ID)[1:1000],trip.distance)
    Sys.time() - t
    > Time difference of 5.409794 mins
    
    

    So it’s probably pretty clear that I won’t be able to run this calculation for all 216,000 unique trip identifiers in the data in any reasonable amount of time….at this current rate it’s looking like around 16 hours of run time should do the job. I could (and have) parse this job out to a cluster using R’s “snow” and “snowfall” packages for easy distributed computing. With a local cluster of 8 CPUs I got the job down to a couple hours. Using the ore.groupApply() function available in Oracle R Enterprise, I was able to get total trip distance for all 216,000 unique fishing trips in my data in under 10 minutes. Here is the code for my solution:

    First, I setup the data which basically consists of merging departure and return ports for each fishing trip with the point by point data on locations of each fishing event (tow).

    #load the necessary Oracle R Enterprise packages
    library(OREbase)
    library(ORE)
    library(DBI)
    library(ROracle)
    
    #connect to my schema on the database
    ore.connect(user=username, sid="sidname", host="hostlocation", password=password, port=1521, all=TRUE)
    
    #Am I connected?
    ore.is.connected()
    
    #note, I've had some issue with Oracle saving a lot of temporary
    # objects to a recycle bin that seems to fill up my schema.  I like
    # to start fresh so I purge the trash bin before and after 
    ore.exec ("purge tablespace users user samuelsoncondition")
    
    #turn off the option to hold over temporary objects
    ore.exec("ALTER SESSION SET recyclebin = OFF")
    
    #get the external .csv file that contains latitude and longitude
    # coordinates for each West Coast port
    port.codes <- read.csv("port_codes.csv",stringsAsFactors=F)
    
    #Sync with the correct schema to get access to data
    ore.sync("SCHEMA1",table=c("TRIP","TOW"))
    ore.attach('SCHEMA1')
    
    #get starting and ending points for each fishing trip
    start.end <- TRIP[,c('DPORT','RPORT','TRIP_ID')]
    
    #merge the starting and ending points (departure and return ports) with 
    #the location data on individual fishing events
    tows <- TOW[,c('TRIP_ID','TOWNUM','SET_LAT','SET_LONG','UP_LAT','UP_LONG')]
    tows <- merge(tows,start.end,by="TRIP_ID")
    
    #get the lat/longs for departure and return port
    port.codes <- ore.push(port.codes)
    tows <- merge(tows,port.codes,by.x="DPORT",by.y="LBK_PORT")
    names(tows)[names(tows)=="LAT"] <- "DLAT"
    names(tows)[names(tows)=="LONG"] <- "DLONG"
    
    tows <- merge(tows,port.codes,by.x="RPORT",by.y="LBK_PORT")
    names(tows)[names(tows)=="LAT"] <- "RLAT"
    names(tows)[names(tows)=="LONG"] <- "RLONG
    

     

    At this point I have a temporary table (~2 million rows with each row corresponding to the location of one fishing event of a particular trip identifier) residing in my schema on the database which looks something like this:

     

    TRIP_ID TOW_NUM SET_LAT SET_LONG DLAT DLONG RLAT RLONG
    1 1 47.9 -125.56 46.89 -124.104 46.89 -124.104
    1 2 47.966 -125.65 46.89 -124.104 46.89 -124.104
    2 1 47.81 -125.133 46.89 -124.104 46.89 -124.104

     

    The code to implement the ore.groupApply() function over these data is:

     

    trip.distance <- ore.groupApply(TOWS,INDEX=TOWS$TRIP_ID,
           function(dat){
                            
           #if there are 0's or NA's in the lat/long data we skip the current trip
           if(length(dat$TOWNUM[dat$SET_LAT==0|dat$SET_LONG==0|is.na(dat$SET_LAT)==T|is.na(dat$SET_LONG)==T])>0){
              dist <- list(NA)
           }else{
           dist <- list()
           for(i in 1:max(dat$TOWNUM)){
             d <- 0
             if(i==1){
                     
              #first the distance from departure port to first tow set
              x <- c(dat$DLAT[i],dat$DLONG[i])
              y <- c(dat$SET_LAT[i],dat$SET_LONG[i])
              p1 <- (x*pi)/180
              p2 <- (y*pi)/180
              a <- sin((p2[1]-p1[1])*0.5)*sin((p2[1]-p1[1])*0.5) + sin((p2[2]-p1[2])*0.5)*sin((p2[2]-p1[2])*0.5)*cos(p1[1])*cos(p2[1])
              c <- 2*atan2(sqrt(a),sqrt(1-a))
              d <- c*6371
                                  
              #next the distance from first two set to 2nd tow set
              x <- c(dat$SET_LAT[i],dat$SET_LONG[i])
              y <- c(dat$SET_LAT[i+1],dat$SET_LONG[i+1])
              p1 <- (x*pi)/180
              p2 <- (y*pi)/180
              a <- sin((p2[1]-p1[1])*0.5)*sin((p2[1]-p1[1])*0.5) + sin((p2[2]-p1[2])*0.5)*sin((p2[2]-p1[2])*0.5)*cos(p1[1])*cos(p2[1])
              c <- 2*atan2(sqrt(a),sqrt(1-a))
              d <- d + (c*6371)
                             
            }
            if(i!=1 & i!=max(dat$TOWNUM)){
            x <- c(dat$SET_LAT[i],dat$SET_LONG[i])
            y <- c(dat$SET_LAT[i+1],dat$SET_LONG[i+1])
            p1 <- (x*pi)/180
            p2 <- (y*pi)/180
            a <- sin((p2[1]-p1[1])*0.5)*sin((p2[1]-p1[1])*0.5) + sin((p2[2]-p1[2])*0.5)*sin((p2[2]-p1[2])*0.5)*cos(p1[1])*cos(p2[1])
            c <- 2*atan2(sqrt(a),sqrt(1-a))
            d <- c*6371
            }
            if(i==max(dat$TOWNUM)){
            x <- c(dat$SET_LAT[i],dat$SET_LONG[i])
            y <- c(dat$RLAT[i],dat$RLONG[i])
            p1 <- (x*pi)/180
            p2 <- (y*pi)/180
            a <- sin((p2[1]-p1[1])*0.5)*sin((p2[1]-p1[1])*0.5) + sin((p2[2]-p1[2])*0.5)*sin((p2[2]-p1[2])*0.5)*cos(p1[1])*cos(p2[1])
            c <- 2*atan2(sqrt(a),sqrt(1-a))
            d <- c*6371
            }
                               
            dist[[i]] <- d
            }
            }  
                           
            return(data.frame(TRIP_ID=unique(dat$TRIP_ID),D=sum(unlist(dist))))
                          
            })
    

     

    The function inside of ore.groupApply() that calculates total trip distance is exactly like the trip.distance() function we defined above with one rather simple exception: it accepts the input “dat” rather than the input “trip.id.” This is because the argument INDEX=TOWS$TRIP_ID in the ore.groupApply() function is telling R to split the data into chunks based on the values in the column TRIP_ID of the TOWS table. It will then apply the operations in function(){} to each chunk of the data (“dat”) and bring the results back together as a list.

    As I said before this is blindingly fast. I was able to make the trip distance calculation for all 216,000 unique trips and ~2 million pairs of lat/long coordinates in 9.5 minutes. I admit I don’t really know why doing this in Oracle R Enterprise is so fast. I suspect that, because the operations are running on the R engine living on the database server and not on my local R instance, that it simply has more workers to parse this parallel job out to…maybe if I tested my local R + “snow” solution with a couple hundred workers instead of just 8 it would be just as fast?

    So far I’ve uncovered an important logistical detail in using ore.groupApply(): my database administrator had to explicitly grant me “create produce” privileges in order to use ore.groupApply().

    Advertisements