lua-mapreduce is a fast and easy MapReduce implementation for lua inspired by other ma-reduce implementation and particularly octopy in python.
It doesn't aim to meet all your distributed computing needs, but its simple approach is amendable to a large proportion of parallelizable tasks. If your code has a for-loop, there's a good chance that you can make it distributed with just a few small changes.
It uses following lua modules.
- lausocket: tcp client-server connectivity
- copas: Coroutine Oriented Portable Asynchronous Services for Lua
- lualogging
- serialize(included in this project)
- luafilesystem: Used only in the task-file example to list files from the directory. lua-mapreduce client/server doesn't depend on this module
For windows, you can install luaforwindows which includes these modules.
For Linux/Unix/MacOS and Windows: you can use LuaDist
Directory structure:
- lua-mapreduce-server.lua : It is a map-reduce server which receives the connections from clients, sends them task-file and than sends them tasks to perform map/reduce functionality.
- lua-mapreduce-client.lua : It connects to the server, receives the task and executes map/reduce functions defines in the task-file
- utils/utils.lua : Provides utility functionality
- utils/serialize.lua : Provides table serialization functionality
- example/word-count-taskfile.lua : Example task-file for counting words from all .lua files in current directory More details on how to create task file is given in word-count example page of wiki.
Usage:
- Start Server: lua-mapreduce-server.lua -t task-file.lua [-s server-ip -p port -l loglevel]
2. Start Client: lua-mapreduce-client.lua [-s server-ip -p port -l loglevel]
Creating a task file:
example/word-counttask-file.lua is a sample task-file for work count which implements following functions. Each of these functions are invoked using coroutines to avoid non-blocking calls so as well as returning results one entry at a time to save the memory.
All these functions must be defines as part of table and return this table in function mapreduce()
Server required functions:
- taskfn: It reads the source and creates the map of the tasks. E.g For word count, it reads the all file with .lua from current directory and creates a map with key as a file name and content as a value.
mr.taskfn = function() --logger:debug("Getting map task") local tasks = read_source() -- read source is utility function defined to read data source for key, value in pairs(tasks) do coroutine.yield(key, value) end end
Here read_source() is a local function defined as below. NOTE: it uses luafilesystem (lfs) module to read the files
local function read_source() --local file_path = system.pathForFile( "*.lua", lfs.currentdir() ) local file_path = lfs.currentdir() --logger:debug("Current directory path:" .. file_path) local source_table = {} for file in lfs.dir(file_path) do if(string.find(file, ".lua") ~= nil) then -- logger:debug("File name:" .. file_path .. "/" .. file) local c = read_file(file_path .. "/" .. file) -- logger:debug("file:" .. file .. ", length:" .. #c) if( c ~= nil) then source_table[file]=c end end end return source_table end
- finalfn : How to output final result. Here it prints on the console.
mr.finalfn = function (results) print("Final results of the task:") for key, value in pairs(results) do print( key .. ":" .. value) coroutine.yield() end end
Client required functions:
- mapfn: Map function which processes the task (content of the file here) and splits into lines and each lines into words
-- Map function : Here it splits the content of the files into lines and each line into words mr.mapfn = function(key, value) --logger:debug("mapfn with key:" .. key .. ", value :" .. value .. "\r\n\r\n") local file_words = {}
local lines = value:split("[^\r\n%s]+")
-- logger:debug("Number of lines in " .. #lines .. " in the file " .. key)for k, w in ipairs(lines) do if(w ~= nil) then local words = {} string.gsub(w, "(%a+)", function (word) table.insert(words, string.lower(word)) end) --local words = w:split("[^ %s]+") if(words ~= nil) then --logger:debug("Number of words in line " .. k .. " are " .. #words) for j=1, #words do --logger:debug("mapfn:yielding " .. words[j]) coroutine.yield(words[j], 1) end end end end
end
- reducefn : Reduce function which processes the number of occurrences for a word. Here it returns the size of an array.
---Reduce function: It returns the numbe of entries for the values mr.reducefn = function (key, value) --logger:debug("reducefn: for key:" .. key .. ", number of words :" .. #value) coroutine.yield(key, #value) end
Todo
- Add support to handled failed task. currently if client disconnect, the task handled by the client is lost
- Support for multiple client connections based on number of cores available on the computer. Use copas for async
- Ability to send multiple task-files to the server.
- Add more example of task-files
- Possibly integrate with apache-mesos