oiko: A MapReduce DSL for a lazy man.

Language

Local vs Global Pipeline
<<local>>
    [ ] -> ()
<</local>>
-->
<<local>>
    () -> []
<</local>>

Define a task flow

From HDFS

import std.sink.fs.HDFS;

task1 = [HDFS:'/input/data'] -> ('maptask1') 
        -> [order<desc>: 'name']
        -> [group: 'name' ]
        -> ('reducetask1') -> [HDFS:'/output/data'];

Local File

import std.sink.fs.LOCAL;
import std.sink.fs.HDFS;

task1 = [LOCAL:'/mnt/temp/data'] -> ('maptask1') 
        -> [order<asc>: 'age']
        -> [group: 'age' ]
        -> ('reducetask1') -> [HDFS:'/output/data'];

Define a map task

import task.lang.js;

maptask1 = js.do(key, value, context) 
           {
              var words = value.split(" ");
              for(var w in words) this.emit(w, 1);
           }

Define a reduce task

import task.lang.python;

reducetask1 = python.do(key, values, context) 
           {
              import hashlib
              def newkey(key):
                  return hashlib.md5(key).hexdigest()

              this.emit(newkey(key), sum(values))
           }

Different data source and destination

import std.sink.mq.RabbitMQ;

task2 = [RabbitMQ:queue=data1] -> ('maptask1') 
        -> [order<desc>: 'key1']
        -> ('reducetask1') -> [RabbitMQ:queue=data2];

task3 = [HDFS:/input/data1] -> ('maptask1') 
        -> [order<desc>: 'key1']
        -> ('reducetask1') -> [RabbitMQ:queue=data2];

Example

import std.sink.fs.HDFS;
import task.lang.js;

task1 = [HDFS:'/input/text'] 
        -> (line.split(' ')) 
        -> ('map_words') 
           -> [order<desc>: 'word']
           -> [group: 'word' ]
        -> ('reduce_sum') 
        -> [HDFS:'/output/data']
        -> ('map_inversekey')
           -> [order<desc>: value]
        -> ('reduce_topn');         

map_words = js.do(key, value, context) 
           {
              var words = value.split(" ");
              for(var w in words) this.emit(w, 1);
           }

reduce_sum = js.do(key, values, context)
           {
                this.emit(keu, sum(values));
           }                            

map_inversekey = js.do(key, value, context) 
           {
                this.emit(value, key);
           }

reduce_topn = js.do(key, values, context)
           {
                this.emit(key, sum(values));
           }                            

ret = schedule_and_run(task1);
while(ret.is_alive()) {
    println("progress…");
}