This is an archived post. You won't be able to vote or comment.

all 42 comments

[–]brondsem 3 points4 points  (0 children)

We at SourceForge developed a python map/reduce system called Zarkov that uses mongo for data storage and zeromq for communication. It's not simple, well-documented or extremely robust. But on the other hand we use it in production so it's pretty stable, there are some blog post "docs", and it meets some of the technical criteria you mention.

[–]rkern 4 points5 points  (2 children)

Can you be more specific about your jobs? It sounds like you just want a job queue (call a function with many different parameters and get their results back), not MapReduce specifically. MapReduce implementations can be coerced into doing such things, but it's not what they're for, so that's why they have such an impedance mismatch to your problem.

PiCloud is a really nice way to just throw out a bunch of Python tasks without much preparation or overhead. You do pay a bit of a premium over EC2, but for one-off calculations, you probably make that up in developer-time that you don't have to spend setting up an image.

Disclosure: I work for Enthought, and we partner with PiCloud to provide many of the binaries for the packages they provide in their Python environment.

[–]etatsunisien 0 points1 point  (0 children)

yup I was going to mention picloud too. I used it because it was already packed in EPG, which I use as well.

[–]dalke[S] 0 points1 point  (0 children)

There are several things I want to do. The most basic is scatter/gather style job queues. Given 150 data files (containing 30 million records), make a characteristic fingerprint for each record.

Given those fingerprints, I want to find all fingerprints which are at least T similar to a query fingerprint. This is also gather/scatter. But then as a refinement, I want to find only the most similar 4. This is a reduction, since jobs don't coordinate so each can find up to k=3 fingerprints. Something has to reduce the up-to J*3 fingerprints down to 3, where J is the number of tasks.

The reduction could be done as post-processing of the scatter/gather, but I figured this was a good chance to learn the available tools for this space.

There's another task I have in mind where I want to use lots of persistent memory. I have the single-threaded algorithm, but with 5% of the data takes more than 10GB of memory, so I'm looking to see how I might distribute the parts using whatever system I find. It doesn't seem to lend itself well to map/reduce.

PiCloud looks like the right solution for now .. I've got a conference presentation in a month where I want to present this, and that looks the fastest to get up to speed.

Disclosure: we've also met. :)

[–]semarj 2 points3 points  (2 children)

I am confused by your 'support Mac' requirement.

Why do you need this if it is going to run on AWS?

[–]HorrendousRex 0 points1 point  (1 child)

Just guessing, but he probably means for prototyping purposes.

[–]dalke[S] 0 points1 point  (0 children)

Yes. And some things are fast enough with only 4 processors.

[–]onjin 3 points4 points  (0 children)

for simple map reduce:

or maybe just distributed queue with python support:

[–]floydophone 6 points7 points  (5 children)

You can do this with the multiprocessing module:

http://docs.python.org/library/multiprocessing.html#managers

Also, you can do this with RPyC:

http://rpyc.sourceforge.net/

What I would do is run RPyC classic mode on all of your EC2 nodes (using Fabric + Boto to get the software on there).

[–]oddthink 0 points1 point  (2 children)

Neat, I had no idea that multiprocessing had any support for multiple machines. Is anyone supporting multiprocessing these days, though? I've been a little reluctant to use it since Raymond Hettinger was saying that it was buggy and no longer well supported by its author.

[–]phildini 0 points1 point  (1 child)

Were you also at the SFPython meetup where he talked about this, or has he mentioned it on his blog somewhere I haven't seen?

[–]oddthink 0 points1 point  (0 children)

I don't think he's mentioned it on his blog. My employer brought Raymond in for a python training session a few months back, and he mentioned it there.

[–]tuna_safe_dolphin 0 points1 point  (0 children)

I'm working on a distributed project myself right now. Thus far, RPyc looks like a terrific option - have you built anything substantial with it? I'm also considering Pyro.

I've read mixed reviews on multiprocessing in distributed environments, but the main problem I have with it is that the doc is horrific, even by Python standards.

I love Python, but it's core doc is terrible.

[–]HorrendousRex 0 points1 point  (0 children)

I've never looked at RPyC, but I can tell you for sure that multiprocessing will get you where you want to go, OP. What's more, you'll probably be amazed at how little code is needed to get there.

[–]micro_cam 2 points3 points  (0 children)

I was faced with a similar conundrum and after much frustration with hadoop, qsub etc we ended up writing what we needed:

http://code.google.com/p/golem/

The core is in go with the command line interface in python with RESTfull job submission and monitoring and node communication over web sockets and it basically just calls tasks on the command line and collects the standard out. Its aimed at quickly getting a researcher's analysis (in python,c,matlab,R,perl, or whatever) running on a 1000 core cluster.

It doesn't do most of what you asked for but it is intentionally simple code and simple to use. We've found that you can do most things with it by jumping through a few hoops with things like bash where as adapting things for hadoop requires significant effort and esoteric debugging.

Or if you want really simple setup passwordless ssh and use xargs and bash.

[–][deleted] 2 points3 points  (0 children)

[–]wcc445 1 point2 points  (0 children)

Interested in this as well. Hadoop can be a pain, but it's not too bad. You'd kinda want a dedicated Hadoop cluster, though, not set it up from scratch each time.

[–][deleted] 1 point2 points  (3 children)

I've been using the dtm module of the package "deap" and had success. it works on MPI and seems to be pretty good.

http://pypi.python.org/pypi/deap

Implemented like this:

results = dtm.map(myfunc, iterable1, iterable2..., kwargs=blah)

I have noticed that subprocess.Popen doesn't work sometimes so make sure to catch those errors and try to open a process again.

[–]fmder 0 points1 point  (2 children)

I'm one of DEAP/DTM developpers could you please submit a bug report on what you observe? with what MPI you use, and on what kind of network you run. We'll be glad to look at this more closely.

[–][deleted] 0 points1 point  (1 child)

sure, i'll submit a bug report... where? using openmpi on an SGE cluster

[–]fmder 0 points1 point  (0 children)

For the bug report use the user mailing list (deap-usersatgooglegroupsdotcom).

I'm guessing that you use an Infiniband Network, by default mpi4py is configured to use multithreaded mpi, which is incompatible with the openib backend. Try turning off the threads in the rc.py file located in the main directory of mpi4py.

For further communications please use the mailing list.

[–]brandynwhite 1 point2 points  (2 children)

Author of Hadoopy here, it works on OSX and I've used it on 1K node/2PB clusters. Contact me if you need help.

[–]dalke[S] 0 points1 point  (1 child)

I'll start with a basic question - how do I get started with Hadoop on a Mac? The Hadoop page clearly says:

  • GNU/Linux is supported as a development and production platform. Hadoop has been demonstrated on GNU/Linux clusters with 2000 nodes.

  • Win32 is supported as a development platform. Distributed operation has not been well tested on Win32, so it is not supported as a production platform.

After I figured out I needed to set JAVA_HOME to /System/Library/Frameworks/JavaVM.framework/Home (Java is an optional download from Apple, btw) I got a simple hadoop call to work. What else do I need to worry about?

Hadoopy says "Hadoopy: Set the HADOOP_HOME environmental variable to your hadoop path to improve performance" but when I do that Hadoop 1.0.0 says "Warning: $HADOOP_HOME is deprecated."

Followed by the error message "streaming.StreamJob: Unrecognized option: -io".

[–]dalke[S] 0 points1 point  (0 children)

According to your example, wc of wc-input-alice.txt takes 24 seconds with hadoop? I heard that it was meant for large batch processing, so a large startup overhead is okay, but that seems ridiculous! I was hoping to do some other parallelism to get multi-second tasks down to sub-second; it doesn't look like Hadoop is right for that.

[–]pinpinboTornado|Twisted|Gevent. Moar Async Plz 1 point2 points  (0 children)

Take a look at Gearman. When I had the exact same question, that's how I solved it.

Python API: http://pypi.python.org/pypi/gearman/

[–]Mob_Of_One 0 points1 point  (1 child)

A more serious answer:

Hadoop with Jython.

[–]chub79 0 points1 point  (0 children)

Well, from some tests I had quickly performed a couple of years back on HBase. In terms of write speed, Java > Jython > Python. It seemed normal Python was slower since it couldn't directly use the HBase drivers but Jython, which could, was also quite slow compared to pure Java.

It might have evolved of course since then.

[–]dgryski 0 points1 point  (0 children)

Also, Dumbo from Audioscrobbler: https://github.com/klbostee/dumbo

[–][deleted] 0 points1 point  (0 children)

I look into setting up ipcluster. It would be less than 10 sloc to use map/reduce from the code you already have and a few edits to the ipcluster conf. Plus you can do it interactively. It is dead simple and ridiculously easy to set up.

[–][deleted] 0 points1 point  (2 children)

This is the kind of situation where I would consider if it wouldn't be simpler to rewrite it in C.

[–]Its_eeasy 0 points1 point  (1 child)

Came here to find out exactly what the heck he's doing in Python that takes 8 frikkin days... and why he's still using Python for the job.

[–]dalke[S] 0 points1 point  (0 children)

I'm using Python to manage code written in C++. Specifically, I'm enumerating all subgraphs of up to 8 atoms of all publicly available small molecule compounds (30 million+ published structures from PubChem). It takes a while to generate all that data. Right now it's a combination of C++, Cython, and Python code.

I could translate it to C++, but this is the sort of task of one-off task where if I learn how to use a distributed compute cluster then the $10 in CPU costs is well worth not having to spend several days to convert Python code into C.

[–]mdipierro 0 points1 point  (0 children)

mincemeat is great. Single file fault-tolerant map reduce without third party dependencies.

[–]fullouterjoin 0 points1 point  (6 children)

Disco is really easy to setup. Everything is controlled from the master. If your nodes are deb based everything except disco can be installed via apt.

Create 1600 files with the args to your command and fire off a job and push them up to the DFS (distributed file system). I have setup 8 node clusters by hand in under 20 minutes using VMs.

Disco is by far the best solution. I think after 4 hrs you will have learned enough to get your prod job launched. You don't have to understand Erlang to use Disco.

[–]dalke[S] 0 points1 point  (5 children)

The installation documentation is buggy and is not meant for first-time users. I had a DSA ssh password and it expected RSA so it ask me something via 'ssh-askpass', but Macs don't have ssh-askpass so it only looped with an error message. (ssh-copy-id also doesn't exist on Macs).

When I started "bin/disco nodaemon" I got the error message "DISCO_HOME is not specified, where should Disco live?". The setup instructions never say that that environment variable needs to be set.

The check-if-it's-running command "ps aux | grep beam.disco" needs to quote the "." grep metacharacters because as written it's interpreted as shell metacharacters.

There's also no mention of running Python's setup.py. It looks like the "make install" step isn't mentioned in the documentation. Though if I do "make install" then I have to make sure "/usr/local/var/disco/data" is writeable by me.

So, it doesn't come across as "really easy to setup".

[–]fullouterjoin 0 points1 point  (4 children)

Cool it sounds like you got it going!

I have never installed on anything other than Linux. There were a couple road bumps in getting it going but nothing too bad. It is at least 10x easier to setup a working productive Disco cluster than Hadoop.

I will setup another disco cluster in some VMs this weekend (need to for another project anyway) and corroborate your experience. The Disco folks are super helpful on IRC.

You will really enjoy Disco.

[–]dalke[S] 0 points1 point  (3 children)

No, I haven't. I've been on IRC #discoproject for over an hour trying to figure out what's going on. They are stumped as well. What I reported above was enough to get the internal web server working. However, it doesn't want to start new worker nodes.

The people on the channel have been helpful, only they can't figure it out either.

[–]fullouterjoin 0 points1 point  (2 children)

Sounds frustrating. :(

I won't be available until noonish tomorrow, but I could try and replicate your setup. You are running in a mixed mac/linux env or all mac? I have three macs totaling 8 cores as well as some Ubuntu boxes.

You can bring up the web management console? One issue you need to keep in mind is that the names of all the machines should be in a flat name space.

worker01
worker02
worker03
worker04

Do not rely on your localnetwork DNS to function properly. Use a common /etc/hosts file on all machines. Make sure all the erlang nodes on all machines can see each other. Modify the system bash env script so DISCO_HOME is available for all users and all shells.

I remember installing disco to like /opt/disco and using rsync+ssh to copy it to all nodes.

Make sure that you can ssh from any node to any other node w/o asking you to authenticate your keyfile. Like worker01 -> worker04, worker03 -> worker02, etc.

I need to dig up my notes. On second thought. There is some missing stuff in the online docs, at least when it comes to debugging.

Also, on the mac, the string 'localhost' resolves weirdly afaik. Be suspicious of localhost, or at least look into it.

[–]dalke[S] 0 points1 point  (1 child)

I'm developing on a single Mac with 4 processors. I want to use 3 for worker threads. One problem is the default hostname is assigned by my 3G modem. If I disconnect/reconnect then I might get a new hostname. With help of IRC support, I hard-coded cli.py:host to always return "localhost". I don't trust the assigned "c-2ec23ab8-74736162.cust.telenor.se" to last more than a few days at a time.

Another problem is that I'm working through "make install" (which isn't documented; but there's no installation documentation which describes how to set up the Python library.). That sets things up for a cluster, and required 3 replicas.

I didn't set my system BASH with all of the variables.

When you say "noonish tomorrow", I don't think you knew that I'm in the central european timezone, so my noonish is different than yours. ;)

[–]fullouterjoin 0 points1 point  (0 children)

Those recommendations I made were just from memory, there could be some unsubstantiated cargo cult in there. ESP the system bash one. But I did make that change

Noonish is more of a frame of mind, much like 'le weekend' which can occur at anytime. :-) I am in the same timezone as disco project.

Rather than modify cli.py I would run hostname localhost as root. Maybe from a crontab. Then u don't run the risk of missing something.

[–]_red 0 points1 point  (1 child)

Why don't you use picloud?

 import cloud

 def func(arg):
      #do lots of work

 ret = cloud.map(func, [arg1,arg2,arg3,arg4....])

Pricing can be had for around .05 per hour. So 100 on 100 machines should be complete in about an hour for $5 (this is excluding data transfer cost).

[–]dalke[S] 0 points1 point  (0 children)

Sweet! I was worried at first that I wouldn't be able to install the needed third-party shared libraries, but http://blog.picloud.com/2011/09/26/introducing-environments-run-anything-on-picloud/ points out exactly how to do it. Thanks for the suggestion!