Friday, June 27, 2014

Helpful findings on Scalding

Many of us are excited by such bold DSL like Scalding/Cascading. It helps to develop clear and understandable Hadoop applications in a few lines! That is really awesome. Truly stream-driven approach.

Like any DSL, if you use strait-forward features, everything goes well. However once you need something extraordinary [somehow I need this quite often] then DSL is your enemy and ... wasting hours to fight against the framework and looking for workarounds. I believe many of us faced this in the past.

So Scalding has the same sickness. Here is list of my findings that were born in the throes.

1. Accessing to filename of processing data chunk from Scalding functions
Sometimes there is a need to access to filename of processing data chunk from map function in order to retrieve additional useful information.  My personal example was this. I had to read huge hive data directly from filesystem avoiding use of hive or hcatalog interfaces. In that map function I needed values of partition keys that usually  are not stored inside the data. Fortunatelly Hive uses hierahical way to store data, keeping  each partition key in separeted directory. So my idea was to retrieve all partition keys from the file path. That was my scenario. However in some cases it is quite useful to have direct access to Cascading's FlowProcess. 

Unfortunatelly there is no simple way to access that object, at least I have not found any official reference in API.
Good news are that Scalding has kind of tear, through which  we can pull out reference to FlowProcess object and I must say quite legally J . Here is an example:


//configuring source and simple map
TSV(files).read
  .map('line ->('cookieId,'segName,'year,'month,'day,'hour)) { line =>

    //create fake Stat object, it does not matter what values are used with.
    val hfp = Stat("123","123").flowProcess.asInstanceOf[HadoopFlowProcess]
    val mis =  hfp.getReporter().getInputSplit.asInstanceOf[MultiInputSplit]
    val fs = mis.getWrappedInputSplit.asInstanceOf[FileSplit]
      
    // my case requires filename however there dozen other useful methods in FlowProcess
    val fileName = fs.getPath.toString
    ...


Important note is that this approach is appliable only for file-based sources!

2. Distributed Cache in Scalding
No need to say that sometimes there is specific data we would like to have access to from mapreduce jobs. It might be external config file or just big input parameter. Accesing these data through distributed cache drammatically improves performance.


//somewhere ouside Job definition
val fl = DistributedCacheFile("/user/boris/zooKeeper.json")
    
// next value can be passed through any Scalding's jobs via Args object for instance
val fileName = fl.path
...  
class Job(val args:Args) {
  // once we receive fl.path we can read it like a ordinary file 
  val fileName = args.get("fileName")
  lazy val data = readJSONFromFile(fileName)
   ...
   TSV(args.get("input")).read.map('line -> 'word ) {
                 line =>  ... /* using data json object*/ ... }
}


3. Hadoop Counters
Another important thing is to access Hadoop counters. Counters is very nice way to collect some business statistics from data. Developer just have to place the counters in code.
Counter usage in Scalding is simple if you know what it is look like. Aha, Stat object might be used to update counters:



Stat("jdbc.call.counter","myapp").incBy(1)


P.S.
Aside of that, just can't get over. Today Ukrainian President pronounced historical speech during assignment with EU /06/27/2014/. It was awesome, he is the first president I am proud of. Slava Ukraini!