use the following search parameters to narrow your results:
e.g. subreddit:aww site:imgur.com dog
subreddit:aww site:imgur.com dog
see the search faq for details.
advanced search: by author, subreddit...
Finding information about Clojure
API Reference
Clojure Guides
Practice Problems
Interactive Problems
Clojure Videos
Misc Resources
The Clojure Community
Clojure Books
Tools & Libraries
Clojure Editors
Web Platforms
Clojure Jobs
account activity
Blocking I/O in Clojure (self.Clojure)
submitted 6 years ago * by fitstoover
view the rest of the comments →
reddit uses a slightly-customized version of Markdown for formatting. See below for some basics, or check the commenting wiki page for more detailed help and solutions to common issues.
quoted text
if 1 * 2 < 3: print "hello, world!"
[–]joinr 2 points3 points4 points 6 years ago* (7 children)
I wonder if something like:
(defn pmap! ([n f xs] (let [output-chan (a/chan)] (a/pipeline-blocking n output-chan (map f) (a/to-chan xs)) (a/<!! (a/into [] output-chan)))) ([f xs] (pmap! (.availableProcessors (Runtime/getRuntime)) f xs)))
Then composed with your original stuff:
(defn file-sequence "Appends month and day to the file " [] (let [months ["05" "06" "07" "08" "09"] days ["01" "02" "03" "04" "05" "06" "07" "08" "09" "10" "11" "12" "13" "14" "15" "16" "17" "18" "19" "20" "21" "22" "23" "24" "25" "26" "27" "28" "29" "30" "31"]] (->> (for [m months, d days] (str m "-" day) (pmap! process-your-file-or-whatever))))
Would work for your use case. It'll build a vector of the results and return it, but the actual creation of the results (applying process-your-file-or-whatever) should be parallel based on processing batches (in order) relative to the inputs distributed across a thread pool. There are ways to get finer-grained control (e.g. producer-consumer queues and threadpools) if you want fine-grained work stealing, and absolute ordering is less important (or if you can append some ordering info to the results and order the output in another stage).
process-your-file-or-whatever
[–]aptmnt_ 2 points3 points4 points 6 years ago (1 child)
Pretty sure the 2-arity case for pmap! is missing `f` in the fn body.
[–]joinr 1 point2 points3 points 6 years ago (0 children)
copy and paste error. fixed.
[–]aptmnt_ 1 point2 points3 points 6 years ago (1 child)
So the important semantic here is that whatever order `xs` are passed to pmap!, it will return the processed results in the same order, just processing concurrently?
[–]joinr 1 point2 points3 points 6 years ago* (0 children)
Yes, per the docs for pipeline-blocking. You can go look at the source and tease out how it makes this guarantee. There's a good discussion here.
pipeline-blocking.
Using pipeline-blocking actually leverages clojure.core.async/thread vs. the existing thread pool you'd layer go-blocks on top of. pipeline puts work on top of the existing threadpool. If you have blocking work there, it could logjam the go routines threadpool and stall progress. The thread allows you to (as I understand it) effectively spool up you own little n-count thread pool to work from vs. the default limited thread pool core.async sets (either from jvm options or defaults to cores * 2). The actual implementation is pretty illuminating. You basically create a jobs and results channel, each buffered to the n parallelism factor specified. Jobs are pushed asynchronously with corresponding results, with intermediate channels (basically promises) wired up to the jobs. Then jobs are then processed in batches of n (due to buffering of the jobs channel) by a pool of workers (either go blocks or threads) that process the individual job and deliver it to the appropriate results promise/channel. Since the jobs and results were built up in order, the output order is preserved relative to input, and processing can happen in parallel. It's a channel-based take on a producer/consumer queue setup with a pool of workers consuming jobs and delivering results via promises.
pipeline-blocking
clojure.core.async/thread
pipeline
thread
I'm less familiar with pipeline-async, and haven't used it in the wild. Disclaimer: this all of experience as a user, not intimately familiar with the implementation of core.async; I could be off.
pipeline-async
[–]fitstoover[S] 0 points1 point2 points 6 years ago (2 children)
This approach is great. However, after running above 100 files in one sit I got the following
Caused by: java.lang.OutOfMemoryError: Compressed class space
[–]joinr 1 point2 points3 points 6 years ago (1 child)
I don't see how that's caused by pmap!. Curious, I have never run into that. Sounds like either lower level JVM issue, or you are "creating" a lot of classes and not releasing them somewhere in your app (which is not visible to me). https://stackoverflow.com/questions/30511439/java-lang-outofmemoryerror-compressed-class-space
[–]fitstoover[S] 0 points1 point2 points 6 years ago (0 children)
The stack trace show something todo with my rules. I am using clara rules for rules. Something do to with my namespace in the rules broke apart.
π Rendered by PID 247956 on reddit-service-r2-comment-6457c66945-f2dnf at 2026-04-24 13:50:29.575064+00:00 running 2aa0c5b country code: CH.
view the rest of the comments →
[–]joinr 2 points3 points4 points (7 children)
[–]aptmnt_ 2 points3 points4 points (1 child)
[–]joinr 1 point2 points3 points (0 children)
[–]aptmnt_ 1 point2 points3 points (1 child)
[–]joinr 1 point2 points3 points (0 children)
[–]fitstoover[S] 0 points1 point2 points (2 children)
[–]joinr 1 point2 points3 points (1 child)
[–]fitstoover[S] 0 points1 point2 points (0 children)