Skip to content

Using the distributed cache

dmlayton edited this page Apr 29, 2013 · 4 revisions

The distributed cache is simply hadoop's method for allowing each node local access to a specific file. In the example, I am mapping ip addresses to geographical locations (country, city, etc.). The heavy lifting is done my Maxmind's geoip LookupService. LookupService requires random access to a local file, GeoLiteCity.dat, which defines a mapping from ip ranges to a location object.

Firstly, we need to place the file in question on the hadoop file system. I used s3distcp to put a copy of GeoLiteCity.dat to hdfs://cache/GeoLiteCity.dat. If your copy is not on s3, you can use the normal distcp command.

So now GeoLiteCity.dat is distributed across the data nodes. So let now go to the scalding code.

We will need to import a few things:

import com.maxmind.geoip.LookupService
import org.apache.hadoop.fs.Path
import org.apache.hadoop.fs.FileSystem
import org.apache.hadoop.filecache.DistributedCache

Next, we will need access to the Job's configuration. In reality, we are just setting configuration parameters:

def jobConfOption(implicit mode : Mode) = {
    mode match {
      case Hdfs(_, conf) => Option(conf)
      case _ => None
    }
}

Eventually this will be pushed upstream as it is not specific to this example. It should be noted that this will only behave properly in true distibuted mode as you need to be using the hdfs. Also, since this is a job configuation action, it should not be called from inside mappers or reducers.

The following function uses the JobConfOption, and the DistributedCache. We need to construct a URI for our file on the hdfs.

def addGeoIpToDistributiveCache(){
    val conf = jobConfOption.get
    val fs = FileSystem.get(conf)
    val fsUri = fs.getUri
    val mapUriStr = fsUri.toString() + "/cache/GeoLiteCity.dat#geoip"
    DistributedCache.createSymlink(conf)
    DistributedCache.addCacheFile(new java.net.URI(mapUriStr), conf)
  }
addGeoIpToDistributiveCache()

By calling addGeoIpToDistributiveCache, we are adding to the config a request to have said file added to the cache with a symlink added to the working directory of each node. Although we don't know where on the local filesystem GeoLiteCity.dat will end up (as this is governed by other parameters), we will have a symlink at "./geoip".

We could ask the class, DistributedCache, but we would need the jobConf which will not be available to the mappers and in not know during configuration.

Next, we need to be very careful about how we instantiate LookupService. Lookup service is not serializable, so we cannot be holding a reference when the job get serialized to the nodes. Also we don't want to be initializing it more than once per task attempt.

 @transient private lazy val lookupService = new LookupService("./geoip",
                                                                LookupService.GEOIP_MEMORY_CACHE)

By making lookupService transient, we avoid serialization, but it means that it will be null when received by the nodes. Making it lazy ensures that the nodes will initalize it when needed. We use the symlink at "./geoip" for ease and portability. This does not require the jobConf as it will not be availible, when lookupService is lazily evaluated.

Lastly, we write a function to be used by our mappers to do the lookup. This is included for completeness:

def getLocationInfo(ip:String) : (String, String, String, String, String, Int) = {
    val unknown = ("", "", "", "", "", 0)
    if (ip == "" || ip == null)
      return unknown
    val location = lookupService.getLocation(ip)
    if (location == null)
      return unknown
    (location.countryName,
     location.countryCode,
     location.region,
     location.city,
     location.postalCode,
     location.metro_code)
  }

This should be generalizable, but would require extensive error handling--especially around what the behaviour should be in local or pseudo-distributed mode which may be how you are testing. I advise that you test this behaviour in a small, but true distributed environment.

Contents

Getting help

Documentation

Matrix API

Third Party Modules

Videos

How-tos

Tutorials

Articles

Other

Clone this wiki locally