Map-Reduce implementation in Lua


lua-mapreduce is a fast and easy MapReduce implementation for lua inspired by other ma-reduce implementation and particularly octopy in python.
It doesn't aim to meet all your distributed computing needs, but its simple approach is amendable to a large proportion of parallelizable tasks. If your code has a for-loop, there's a good chance that you can make it distributed with just a few small changes.
It uses following lua modules.
  1. lausocket: tcp client-server connectivity
  2. copas: Coroutine Oriented Portable Asynchronous Services for Lua
  3. lualogging
  4. serialize(included in this project)
  5. luafilesystem: Used only in the task-file example to list files from the directory. lua-mapreduce client/server doesn't depend on this module
For windows, you can install luaforwindows which includes these modules.
For Linux/Unix/MacOS and Windows: you can use LuaDist

Directory structure:

  1. lua-mapreduce-server.lua : It is a map-reduce server which receives the connections from clients, sends them task-file and than sends them tasks to perform map/reduce functionality.
  2. lua-mapreduce-client.lua : It connects to the server, receives the task and executes map/reduce functions defines in the task-file
  3. utils/utils.lua : Provides utility functionality
  4. utils/serialize.lua : Provides table serialization functionality
  5. example/word-count-taskfile.lua : Example task-file for counting words from all .lua files in current directory More details on how to create task file is given in word-count example page of wiki.

Usage:

  1. Start Server: lua-mapreduce-server.lua -t task-file.lua [-s server-ip -p port -l loglevel]
   2. Start Client: lua-mapreduce-client.lua [-s server-ip -p port -l loglevel]
example/word-counttask-file.lua is a sample task-file for work count which implements following functions. Each of these functions are invoked using coroutines to avoid non-blocking calls so as well as returning results one entry at a time to save the memory.
All these functions must be defines as part of table and return this table in function mapreduce()

Server required functions:

  1. taskfn: It reads the source and creates the map of the tasks. E.g For word count, it reads the all file with .lua from current directory and creates a map with key as a file name and content as a value.
    mr.taskfn = function()
        --logger:debug("Getting map task")
        local tasks = read_source()  -- read source is utility function defined to read data source
        for key, value in pairs(tasks) do
            coroutine.yield(key, value)
        end
    end
Here read_source() is a local function defined as below. NOTE: it uses luafilesystem (lfs) module to read the files
    local function read_source()
      --local file_path = system.pathForFile( "*.lua", lfs.currentdir() )
      local file_path = lfs.currentdir()
      --logger:debug("Current directory path:" .. file_path)
      local source_table = {}
      for file in lfs.dir(file_path) do
        if(string.find(file, ".lua") ~= nil) then
        --  logger:debug("File name:" .. file_path .. "/" .. file)
            local c = read_file(file_path .. "/" .. file)
        --  logger:debug("file:" .. file .. ", length:" .. #c)

            if( c ~= nil) then
                source_table[file]=c
            end
        end
      end
      return source_table
    end
  1. finalfn : How to output final result. Here it prints on the console.
    mr.finalfn = function (results)
        print("Final results of the task:")
        for key, value in pairs(results) do
            print( key .. ":" .. value)
            coroutine.yield()
        end
    end
    

Client required functions:

  1. mapfn: Map function which processes the task (content of the file here) and splits into lines and each lines into words
    -- Map function : Here it splits the content of the files into lines and each line into words
    mr.mapfn = function(key, value)
        --logger:debug("mapfn with key:" .. key .. ", value :" .. value .. "\r\n\r\n")
        local file_words = {}
    
    
    local lines = value:split("[^\r\n%s]+")
    
    -- logger:debug("Number of lines in " .. #lines .. " in the file " .. key)
    for k, w in  ipairs(lines) do
    
        if(w ~= nil) then
    
            local words = {}
            string.gsub(w, "(%a+)", function (word)
                table.insert(words, string.lower(word))
            end)
    
            --local words = w:split("[^ %s]+")
    
            if(words ~= nil) then
                --logger:debug("Number of words in line " .. k .. " are " .. #words)
                for j=1, #words do
                    --logger:debug("mapfn:yielding " .. words[j])
                    coroutine.yield(words[j], 1)
    
                end
            end
        end
    end
    
    end
  1. reducefn : Reduce function which processes the number of occurrences for a word. Here it returns the size of an array.
    ---Reduce function: It returns the numbe of entries for the values

    mr.reducefn = function (key, value)
        --logger:debug("reducefn: for key:" .. key ..  ", number of words  :" .. #value)
        coroutine.yield(key, #value)
    end

 Todo

  1. Add support to handled failed task. currently if client disconnect, the task handled by the client is lost
  2. Support for multiple client connections based on number of cores available on the computer. Use copas for async
  3. Ability to send multiple task-files to the server.
  4. Add more example of task-files
  5. Possibly integrate with apache-mesos

No comments: