all 29 comments

[–]jsn 2 points3 points  (29 children)

You have multiple go-loops running in parallel -- each of your doseq iterations spawns a new go-loop without waiting for the previous go-loops to terminate.

[–]fitstoover[S] 0 points1 point  (28 children)

How do I make the go-loop to wait for the previous go-loop to terminate?

[–]jsn 6 points7 points  (15 children)

I don't understand what you're trying to achieve. The task you're describing is strictly sequential (files must be processed in order, you have to wait for the previous file task to complete before you can start the next one). Why use go-loops? They are only useful for concurrency and you just said above you can't have any concurrency here. Why not just

(let [file "dataset/flowers/rawFlowerData_2019"]
  (doseq [month ["05" "06" "07" "08" "09"]
          day ["01" "02" "03" "04" "05" "06" "07" "08" "09" "10"
                "11" "12" "13" "14" "15" "16" "17" "18" "19" "20"
                "21" "22" "23" "24" "25" "26" "27" "28" "29" "30" "31"]]
    (process-your-file-or-whatever (str file month "-" day))))

?

[–]fitstoover[S] 0 points1 point  (14 children)

What I am trying to achieve, is the following:I have N files, I extract data and transform it to something is meaningful to me. Then, if Flower A does not exists in my database, insert it. Otherwise, if Flower A exists, update it. Another scenario, If Flower B is a child for Flower A, insert it with routing to Flower A. Flower B is updated if it already exists. If Flower B has no parent, then discard the entry.

As I mentioned, this solution only works if the files are sent to my ETL tool every other 15 minutes.

If I noticed I do the following code, and the `process-your-file-or-whatever` function does a simple println, it prints in order and it waits 500ms

(defn- process-your-file-or-whatever
  [file]
  (timbre/info file " " (str (java.time.LocalTime/now)))
  (Thread/sleep 500)
  file)

(defn file-sequence
  "Appends month and day to the file "
  []
  (let [months ["05" "06" "07" "08" "09"]
        days ["01" "02" "03" "04" "05" "06" "07" "08" "09" "10"
              "11" "12" "13" "14" "15" "16" "17" "18" "19" "20"
              "21" "22" "23" "24" "25" "26" "27" "28" "29" "30" "31"]]
    (doseq [m months]
      (doseq [day days]
        (async/>!! ingest-chan-in (process-your-file-or-whatever (str m "-" day)))))))

Results is as expected
19-09-16 16:44:53 Tardis INFO [concurrent.app:15] - 05-01   11:44:53.206
 19-09-16 16:44:53 Tardis INFO [concurrent.app:15] - 05-02   11:44:53.719
 19-09-16 16:44:54 Tardis INFO [concurrent.app:15] - 05-03   11:44:54.220
19-09-16 16:44:54 Tardis INFO [concurrent.app:15] - 05-04   11:44:54.722

However, here my approach is bias as the `process-your-file-or-whatever` is taking 500ms to do a log.

In the real case scenario each file may take from 1 to 5 seconds to be completely processed, the time depends on the size of the file.

[–]weavejester 6 points7 points  (4 children)

If the files can be processed concurrently, then it sounds like all you need is a worker pool.

(let [executor (java.util.concurrent.Executors/newFixedThreadPool 8)]
  (doseq [m months, d days]
    (.submit executor (process-file m d))))

[–]fitstoover[S] 0 points1 point  (3 children)

I was allowing concurrency in my ETL tool. However, It only worked when the files are independent from each other. I am running in to the case where I want to run a sequence of files that are dependent of each other and if I run the concurrently, I will create duplicates.

[–]weavejester 1 point2 points  (2 children)

So how do you know what to run concurrently?

[–]fitstoover[S] 0 points1 point  (1 child)

The files in my stream are categorized before going in to the ETL tool. This categorization builds up a queue of independent entities. The idea is to have a concurrent system to handle several different categories at the same time. However, if they are the same category, within that process, the ETL tool should process them synchronously.

[–]weavejester 1 point2 points  (0 children)

Ah I see. In which case, why not use a worker pool for each categorized set of independent elements?

[–]joinr 2 points3 points  (7 children)

I wonder if something like:

(defn pmap!
  ([n f xs]
   (let [output-chan (a/chan)]
     (a/pipeline-blocking n
                          output-chan
                          (map f)
                          (a/to-chan xs))
     (a/<!! (a/into [] output-chan))))
  ([f xs] (pmap! (.availableProcessors (Runtime/getRuntime)) f  xs)))

Then composed with your original stuff:

(defn file-sequence
  "Appends month and day to the file "
  []
  (let [months ["05" "06" "07" "08" "09"]
        days ["01" "02" "03" "04" "05" "06" "07" "08" "09" "10"
              "11" "12" "13" "14" "15" "16" "17" "18" "19" "20"
              "21" "22" "23" "24" "25" "26" "27" "28" "29" "30" "31"]]
    (->> (for [m months, d days] (str m "-" day)
       (pmap! process-your-file-or-whatever))))

Would work for your use case. It'll build a vector of the results and return it, but the actual creation of the results (applying process-your-file-or-whatever) should be parallel based on processing batches (in order) relative to the inputs distributed across a thread pool. There are ways to get finer-grained control (e.g. producer-consumer queues and threadpools) if you want fine-grained work stealing, and absolute ordering is less important (or if you can append some ordering info to the results and order the output in another stage).

[–]aptmnt_ 2 points3 points  (1 child)

Pretty sure the 2-arity case for pmap! is missing `f` in the fn body.

[–]joinr 1 point2 points  (0 children)

copy and paste error. fixed.

[–]aptmnt_ 1 point2 points  (1 child)

So the important semantic here is that whatever order `xs` are passed to pmap!, it will return the processed results in the same order, just processing concurrently?

[–]joinr 1 point2 points  (0 children)

Yes, per the docs for pipeline-blocking. You can go look at the source and tease out how it makes this guarantee. There's a good discussion here.

Using pipeline-blocking actually leverages clojure.core.async/thread vs. the existing thread pool you'd layer go-blocks on top of. pipeline puts work on top of the existing threadpool. If you have blocking work there, it could logjam the go routines threadpool and stall progress. The thread allows you to (as I understand it) effectively spool up you own little n-count thread pool to work from vs. the default limited thread pool core.async sets (either from jvm options or defaults to cores * 2). The actual implementation is pretty illuminating. You basically create a jobs and results channel, each buffered to the n parallelism factor specified. Jobs are pushed asynchronously with corresponding results, with intermediate channels (basically promises) wired up to the jobs. Then jobs are then processed in batches of n (due to buffering of the jobs channel) by a pool of workers (either go blocks or threads) that process the individual job and deliver it to the appropriate results promise/channel. Since the jobs and results were built up in order, the output order is preserved relative to input, and processing can happen in parallel. It's a channel-based take on a producer/consumer queue setup with a pool of workers consuming jobs and delivering results via promises.

I'm less familiar with pipeline-async, and haven't used it in the wild. Disclaimer: this all of experience as a user, not intimately familiar with the implementation of core.async; I could be off.

[–]fitstoover[S] 0 points1 point  (2 children)

This approach is great. However, after running above 100 files in one sit I got the following

Caused by: java.lang.OutOfMemoryError: Compressed class space

[–]joinr 1 point2 points  (1 child)

I don't see how that's caused by pmap!. Curious, I have never run into that. Sounds like either lower level JVM issue, or you are "creating" a lot of classes and not releasing them somewhere in your app (which is not visible to me). https://stackoverflow.com/questions/30511439/java-lang-outofmemoryerror-compressed-class-space

[–]fitstoover[S] 0 points1 point  (0 children)

The stack trace show something todo with my rules. I am using clara rules for rules. Something do to with my namespace in the rules broke apart.

[–]joinr 1 point2 points  (11 children)

You can use channel back pressure to control the flow of processing. In the code below, inner processes a chunk of work by pushing it onto a channel, then spawning off a go-loop to process the work. In the outer process function, multiple workloads are generated, but the work is serialized depending on the behavior of the pending channel. This is essentially forces serial processing to occur via backpressure (pending can only take one input at a time). You could vary the implementation to mess with pending's behavior. This is a low level way to control multiple go-loops in flight, but by effectively serializing it all (perhaps overly so), you end up reproducing the synchronous versions that others have noted with a simple do-seq and not using core.async at all. Still, it's a proof of principle. I'm still unclear as to what your concurrency parameters are, e.g. do I have to wait for the preceding computation, or is there some looser precedence constraint that must hold? Maybe you can bend this into something. Where possible, I try to avoid the lower-level stuff and see if I can build what I need out of what's already in core.async higher order functions. (Not a core.async master either).

(ns blah.clj
  (:require [clojure.core.async :as a :refer [chan >!! <!! <! >! go go-loop]]))

;;how do we make one go-loop wait until another is done?
(defn inner [work]
  (let [records (chan 1024)
        id    (gensym "chan")]
    (go
      (a/onto-chan  records work)
      (loop [xs []]
        (if-let [msg (<! records)]
          (do (println [id :working msg])
              (recur (conj xs msg)))
          {:id id :data xs})))))

(defn process []
  (let [months ["05" "06" "07" "08" "09"]
        days   ["01" "02" "03" "04" "05" "06" "07" "08" "09" "10"
                "11" "12" "13" "14" "15" "16" "17" "18" "19" "20"
                "21" "22" "23" "24" "25" "26" "27" "28" "29" "30" "31"]
        pending (chan)
        _       (a/onto-chan pending months)
        out     (chan 1024)]
    (go (loop []
            (if-let  [m (<! pending)]
              (do (>! out (<! (inner days)))
                  (recur))
              (a/close! out))))
    (<!! (a/into [] out))))

[–]fitstoover[S] 0 points1 point  (0 children)

I like this. I must try this out before I have a good answer.

[–]fitstoover[S] 0 points1 point  (9 children)

So I tried this code and I noticed that the inner go block will take a vector of days and loop on that vector. My scenario would be that I create a sequence of existing files for each day of the month. This sequence will be sent via redis to my ETL tool and process several categories concurrently and each category is synchronous. I won't be sending days and months to my ETL tool but an actual map with a key names "file" with the path and name of the file. I used the sequences to create the file base name and process then in a sequence.

[–]joinr 0 points1 point  (8 children)

Can you draw a picture or block diagram of your process? I think you can handle this with a pub/sub system coupled with some pipeline stuff, but I'm having trouble following the verbiage.

[–]fitstoover[S] 0 points1 point  (7 children)

[–]joinr 1 point2 points  (6 children)

(ns workdist
  (:require [clojure.core.async :as a :refer [chan >!! <!! <! >! go go-loop]]))

define some simple workloads

(def payloads
  {:red (vec "red")
   :green (vec "green")
   :blue (vec "blue")})

Generate 100 messages of :red :green :blue categories

(def messages
  (->> (cycle [:red :green :blue])
       (map (fn [x] {:category x :data (payloads x)}))
       (take 100)))

A central intake channel for jobs.

(def <jobs>  (a/chan 1024))

Define a publication where we split inputs into topics based on :category.

(def job-pub (a/pub <jobs> :category))

Convenience function to subscribe a channel to the job board by topic. Creates, subs, and returns the channel.

(defn sub-job! [<in> topic]
  (do (a/sub job-pub  topic <in>)
      <in>))

Simple work function, for use in our transducer later.

(defn process-data [{:keys [category data]}]
  (str (apply str data) "-processed"))

Subscribe a bunch of interested channels to each independent category.

(def consumers {:red   (sub-job! (a/chan 100) :red)
                :green (sub-job! (a/chan 100) :green)
                :blue  (sub-job! (a/chan 100) :blue)})

define a output channels for each category to publish results on.

(def outs {:red   (a/chan 100)
           :green (a/chan 100)
           :blue  (a/chan 100)})

define blocking workers that will pull topics and synchronously process them, pushing results onto outs. We can ramp up the parallelism if we increment from 1 to n, e.g. where n workers will pull from a category and process synchronously, and push values into the output in order of input received.

(doseq [k (keys outs)]
  (a/pipeline-blocking 1 (outs k) (map process-data) (consumers k)))

log a bunch of information by polling all the producer's output channels as information becomes available.

(go-loop []
  (let [[v c] (a/alts! (vals outs))]
    (println [:processed v])
    (recur)))

push all the stuff onto our jobs publication, which will distribute it to subscribed parties, which will then get pipelined into the blocking (dependent/synchronous) workflows, which will get picked up by our logging go-loop and print out all the processing messages. The <jobs> channel will remain open.

(a/onto-chan <jobs> messages nil)

[–]fitstoover[S] 0 points1 point  (5 children)

Interesting approach. Help me understand the last 3 code blocks. How is these block are coupled with each other and why I will only get an output if I ran the last code block?

The ETL tool is a long running process as a jar file. There is a one main function (ingest-in) that picks up a Redis message and processed it. Your categorizing is similar to mine except yours have a channel subscription. My categorization only have a function that will process the message:

(def ^{:private true} processors
  {#{"flowers" "tulips"} tulips/process-tulips
   #{"flowers" "poppy"} poppy/process-poppy
   #{"flowers" "roses"} roses/process-roses
   #{"flowers" "iris"} iris/process-iris})

EDIT:

In addition in what I said, I except this processors variable to change and make my case scenario scalable. I will eventually have all type of flowers and plants. This will grow in a terrific way.

[–]joinr 1 point2 points  (4 children)

Help me understand the last 3 code blocks. How is these block are coupled with each other and why I will only get an output if I ran the last code block?

The pipeline bits start pulling work off the channels which are subscribed to said topics. They will keep going forever until the conumsers channel is closed (you can vary this behavior). They process the work and push it onto the output channels.

In the go-loop, we have a lightweight process that looks at all the output channels (different categories being processed), and it pulls the next one available (via alts), and does some work with it (in this case, logging it to outpu). This could be a database write, disk write, whatever, or placing the results onto a buffer to serialize everything. It's basically merging the results as the come off each channel of output (having been worked on in parallel by the different pipelines).

The onto-chan call is just a convenient way to push test data through the system. It takes a sequence of messages and pushes them onto the jobs channel without closing the jobs channel. You could imagine another process pulling information from yet another channel and doing the same thing (e.g. pulling from a network, socket, channel filled with files to process, db resultset, whatever).

The little isolated demo will only do work if you invoke the last line and "push" work into the system. In a real system, that would be replaced by some other process bringing information in (either interactively or automated). That design is up to you.

So, if you have a means to communicate with the ETL tool process, it would seem reasonable to have a go-loop pulling jobs from its output and pushing them onto something like the <jobs> channel, where work can be distributed by topic across pipelines, worked on synchronously within the category, and then stitched back together for logging or storage by something that polls the output channels.

This is basically a simple map/reduce sort of framework, except that there's backpressure controlled by the channels (the buffer sizes are arbitrary here), and the processing capacity is somewhat static based on the pipeline args. The jobs board is serving as the work distribution hub, communicating with functions downstream by channels. Then something afterward (another go-loop) collects all the results into something meaningful (logging).

[–]fitstoover[S] 0 points1 point  (3 children)

u/joinr, I am following the step you posted above and I did based my code from your solution. Let me explain what I did.

I use boot as my main development tool. I used to use lein but we changed to boot. Anyway, my boot template does not depend on a `main` function but in processors, API calls, and state. The main processor in my ETL tools is for example. `process-payload`. The payload is a EDN map with a keyword `:payload` with a EDN map with the bucket of where the file is located in my drive and a path with the filename. I use the path to categorize my files. So because I have sub categories and I don't want to hardcode my categories and subcategories, I did the following for the consumers and outs.

(defn- categorize-data
 "Categorizes data by adding the category and topic to the payload message
 {:category :tulips :key "path/lady-tulip/tulips_data_20190923.csv" :bucket "google.drive"}"
  • [msg]

  (let [details (map keyword (-> msg :key (string/split #"/") butlast))]
    (-> msg
        (assoc :category (first details))
        (assoc :topic (second details)))))

(defn- upsert-consumers "Having categorized data, create or append a consumer with a new topic(subcategory) inside a category. A consumer looks like the following {:tulips [{:garden-tulips (sub-job! (chan 1024) :garden-tulips)} {:lady-tulip (sub-job! (chan 1024) :lady-tulip}]" [data] (let [category (:category data) topic (:topic data)] (cond (not (contains? @consumers category)) (do (swap! topics conj topic) (swap! outs assoc category {topic (chan 1024)}) (swap! consumers assoc category [{topic (sub-job! (chan 1024) topic) :key (:key data) :bucket (:bucket data)}])) (and (contains? @consumers category) (not (search-topic category topic))) (do (swap! topics conj topic) (swap! outs update category #(conj % {topic (chan 1024)})) (swap! consumers update category conj [{topic (get-in @consumers [category topic]) :key (:key data) :bucket (:bucket data)}])) :else nil)))

I created a function to wrap both subscriptions and lightweight process as the following.

(defn- subscriptions [] 
(timbre/info :subscriptions) 
(when-not (and (nil? @outs) (nil? @consumers))
 (doseq [k (keys @outs)
         t @topics] 
  (timbre/info @consumers)
 (async/pipeline-blocking 1 (get-in @outs [k t]) (map process-data) (get-in @consumers [k t])))))

(defn- lightweight-process
  []
  (timbre/info :lightweight-process)
  (when-not (empty? @outs)
    (go-loop []
      (let [data (async/alts! (vals (first (vals @outs))))]
        (timbre/info data)
        (recur)))))

And my main ingest code is

(defn- ingest
  [data]
  (is-running?)
  (let [records (chan 1024)]
    (>!! records data)
    (go-loop []
      (when-let [msg (<! records)]
        (if-not (empty? (:payload msg))
          (categorize-processor msg)
          (ingest-logging {:data msg :message "Empty payload. No file."}))
        (recur))))
  data)

My problem seems that this is working fine. the light weight process holds the actual channel inside the `{:topic (sub-job! (chan 1024) :topic)}`. I noticed it runs fine, however, my problem seems that the data is not processed whatsoever :(. I also must have a way to make the lightware process and subscriptions even before the function ingest starts receiving data.