-
Notifications
You must be signed in to change notification settings - Fork 707
Using the distributed cache
The distributed cache is simply hadoop's method for allowing each node local access to a specific file. In the example, I am mapping ip addresses to geographical locations (country, city, etc.). The heavy lifting is done my Maxmind's geoip LookupService. LookupService requires random access to a local file, GeoLiteCity.dat, which defines a mapping from ip ranges to a location object.
Firstly, we need to place the file in question on the hadoop file system. I used s3distcp to put a copy of GeoLiteCity.dat to hdfs://cache/GeoLiteCity.dat. If your copy is not on s3, you can use the normal distcp command.
So now GeoLiteCity.dat is distributed across the data nodes. So let now go to the scalding code.
We will need to import a few things:
import com.maxmind.geoip.LookupService
import org.apache.hadoop.fs.Path
import org.apache.hadoop.fs.FileSystem
import org.apache.hadoop.filecache.DistributedCache
Next, we will need access to the Job's configuration. In reality, we are just setting configuration parameters:
def jobConfOption(implicit mode : Mode) = {
mode match {
case Hdfs(_, conf) => Option(conf)
case _ => None
}
}
Eventually this will be pushed upstream as it is not specific to this example. It should be noted that this will only behave properly in true distibuted mode as you need to be using the hdfs. Also, since this is a job configuation action, it should not be called from inside mappers or reducers.
The following function uses the JobConfOption, and the DistributedCache. We need to construct a URI for our file on the hdfs.
def addGeoIpToDistributiveCache(){
val conf = jobConfOption.get
val fs = FileSystem.get(conf)
val fsUri = fs.getUri
val mapUriStr = fsUri.toString() + "/cache/GeoLiteCity.dat#geoip"
DistributedCache.createSymlink(conf)
DistributedCache.addCacheFile(new java.net.URI(mapUriStr), conf)
}
addGeoIpToDistributiveCache()
By calling addGeoIpToDistributiveCache, we are adding to the config a request to have said file added to the cache with a symlink added to the working directory of each node. Although we don't know where on the local filesystem GeoLiteCity.dat will end up (as this is governed by other parameters), we will have a symlink at "./geoip".
We could ask the class, DistributedCache, but we would need the jobConf which will not be available to the mappers and in not know during configuration.
Next, we need to be very careful about how we instantiate LookupService. Lookup service is not serializable, so we cannot be holding a reference when the job get serialized to the nodes. Also we don't want to be initializing it more than once per task attempt.
@transient private lazy val lookupService = new LookupService("./geoip",
LookupService.GEOIP_MEMORY_CACHE)
By making lookupService transient, we avoid serialization, but it means that it will be null when received by the nodes. Making it lazy ensures that the nodes will initalize it when needed. We use the symlink at "./geoip" for ease and portability. This does not require the jobConf as it will not be availible, when lookupService is lazily evaluated.
Lastly, we write a function to be used by our mappers to do the lookup. This is included for completeness:
def getLocationInfo(ip:String) : (String, String, String, String, String, Int) = {
val unknown = ("", "", "", "", "", 0)
if (ip == "" || ip == null)
return unknown
val location = lookupService.getLocation(ip)
if (location == null)
return unknown
(location.countryName,
location.countryCode,
location.region,
location.city,
location.postalCode,
location.metro_code)
}
This should be generalizable, but would require extensive error handling--especially around what the behaviour should be in local or pseudo-distributed mode which may be how you are testing. I advise that you test this behaviour in a small, but true distributed environment.
- Scaladocs
- Getting Started
- Type-safe API Reference
- SQL to Scalding
- Building Bigger Platforms With Scalding
- Scalding Sources
- Scalding-Commons
- Rosetta Code
- Fields-based API Reference (deprecated)
- Scalding: Powerful & Concise MapReduce Programming
- Scalding lecture for UC Berkeley's Analyzing Big Data with Twitter class
- Scalding REPL with Eclipse Scala Worksheets
- Scalding with CDH3U2 in a Maven project
- Running your Scalding jobs in Eclipse
- Running your Scalding jobs in IDEA intellij
- Running Scalding jobs on EMR
- Running Scalding with HBase support: Scalding HBase wiki
- Using the distributed cache
- Unit Testing Scalding Jobs
- TDD for Scalding
- Using counters
- Scalding for the impatient
- Movie Recommendations and more in MapReduce and Scalding
- Generating Recommendations with MapReduce and Scalding
- Poker collusion detection with Mahout and Scalding
- Portfolio Management in Scalding
- Find the Fastest Growing County in US, 1969-2011, using Scalding
- Mod-4 matrix arithmetic with Scalding and Algebird
- Dean Wampler's Scalding Workshop
- Typesafe's Activator for Scalding