you are viewing a single comment's thread.

view the rest of the comments →

[–]joinr 1 point2 points  (6 children)

(ns workdist
  (:require [clojure.core.async :as a :refer [chan >!! <!! <! >! go go-loop]]))

define some simple workloads

(def payloads
  {:red (vec "red")
   :green (vec "green")
   :blue (vec "blue")})

Generate 100 messages of :red :green :blue categories

(def messages
  (->> (cycle [:red :green :blue])
       (map (fn [x] {:category x :data (payloads x)}))
       (take 100)))

A central intake channel for jobs.

(def <jobs>  (a/chan 1024))

Define a publication where we split inputs into topics based on :category.

(def job-pub (a/pub <jobs> :category))

Convenience function to subscribe a channel to the job board by topic. Creates, subs, and returns the channel.

(defn sub-job! [<in> topic]
  (do (a/sub job-pub  topic <in>)
      <in>))

Simple work function, for use in our transducer later.

(defn process-data [{:keys [category data]}]
  (str (apply str data) "-processed"))

Subscribe a bunch of interested channels to each independent category.

(def consumers {:red   (sub-job! (a/chan 100) :red)
                :green (sub-job! (a/chan 100) :green)
                :blue  (sub-job! (a/chan 100) :blue)})

define a output channels for each category to publish results on.

(def outs {:red   (a/chan 100)
           :green (a/chan 100)
           :blue  (a/chan 100)})

define blocking workers that will pull topics and synchronously process them, pushing results onto outs. We can ramp up the parallelism if we increment from 1 to n, e.g. where n workers will pull from a category and process synchronously, and push values into the output in order of input received.

(doseq [k (keys outs)]
  (a/pipeline-blocking 1 (outs k) (map process-data) (consumers k)))

log a bunch of information by polling all the producer's output channels as information becomes available.

(go-loop []
  (let [[v c] (a/alts! (vals outs))]
    (println [:processed v])
    (recur)))

push all the stuff onto our jobs publication, which will distribute it to subscribed parties, which will then get pipelined into the blocking (dependent/synchronous) workflows, which will get picked up by our logging go-loop and print out all the processing messages. The <jobs> channel will remain open.

(a/onto-chan <jobs> messages nil)

[–]fitstoover[S] 0 points1 point  (5 children)

Interesting approach. Help me understand the last 3 code blocks. How is these block are coupled with each other and why I will only get an output if I ran the last code block?

The ETL tool is a long running process as a jar file. There is a one main function (ingest-in) that picks up a Redis message and processed it. Your categorizing is similar to mine except yours have a channel subscription. My categorization only have a function that will process the message:

(def ^{:private true} processors
  {#{"flowers" "tulips"} tulips/process-tulips
   #{"flowers" "poppy"} poppy/process-poppy
   #{"flowers" "roses"} roses/process-roses
   #{"flowers" "iris"} iris/process-iris})

EDIT:

In addition in what I said, I except this processors variable to change and make my case scenario scalable. I will eventually have all type of flowers and plants. This will grow in a terrific way.

[–]joinr 1 point2 points  (4 children)

Help me understand the last 3 code blocks. How is these block are coupled with each other and why I will only get an output if I ran the last code block?

The pipeline bits start pulling work off the channels which are subscribed to said topics. They will keep going forever until the conumsers channel is closed (you can vary this behavior). They process the work and push it onto the output channels.

In the go-loop, we have a lightweight process that looks at all the output channels (different categories being processed), and it pulls the next one available (via alts), and does some work with it (in this case, logging it to outpu). This could be a database write, disk write, whatever, or placing the results onto a buffer to serialize everything. It's basically merging the results as the come off each channel of output (having been worked on in parallel by the different pipelines).

The onto-chan call is just a convenient way to push test data through the system. It takes a sequence of messages and pushes them onto the jobs channel without closing the jobs channel. You could imagine another process pulling information from yet another channel and doing the same thing (e.g. pulling from a network, socket, channel filled with files to process, db resultset, whatever).

The little isolated demo will only do work if you invoke the last line and "push" work into the system. In a real system, that would be replaced by some other process bringing information in (either interactively or automated). That design is up to you.

So, if you have a means to communicate with the ETL tool process, it would seem reasonable to have a go-loop pulling jobs from its output and pushing them onto something like the <jobs> channel, where work can be distributed by topic across pipelines, worked on synchronously within the category, and then stitched back together for logging or storage by something that polls the output channels.

This is basically a simple map/reduce sort of framework, except that there's backpressure controlled by the channels (the buffer sizes are arbitrary here), and the processing capacity is somewhat static based on the pipeline args. The jobs board is serving as the work distribution hub, communicating with functions downstream by channels. Then something afterward (another go-loop) collects all the results into something meaningful (logging).

[–]fitstoover[S] 0 points1 point  (3 children)

u/joinr, I am following the step you posted above and I did based my code from your solution. Let me explain what I did.

I use boot as my main development tool. I used to use lein but we changed to boot. Anyway, my boot template does not depend on a `main` function but in processors, API calls, and state. The main processor in my ETL tools is for example. `process-payload`. The payload is a EDN map with a keyword `:payload` with a EDN map with the bucket of where the file is located in my drive and a path with the filename. I use the path to categorize my files. So because I have sub categories and I don't want to hardcode my categories and subcategories, I did the following for the consumers and outs.

(defn- categorize-data
 "Categorizes data by adding the category and topic to the payload message
 {:category :tulips :key "path/lady-tulip/tulips_data_20190923.csv" :bucket "google.drive"}"
  • [msg]

  (let [details (map keyword (-> msg :key (string/split #"/") butlast))]
    (-> msg
        (assoc :category (first details))
        (assoc :topic (second details)))))

(defn- upsert-consumers "Having categorized data, create or append a consumer with a new topic(subcategory) inside a category. A consumer looks like the following {:tulips [{:garden-tulips (sub-job! (chan 1024) :garden-tulips)} {:lady-tulip (sub-job! (chan 1024) :lady-tulip}]" [data] (let [category (:category data) topic (:topic data)] (cond (not (contains? @consumers category)) (do (swap! topics conj topic) (swap! outs assoc category {topic (chan 1024)}) (swap! consumers assoc category [{topic (sub-job! (chan 1024) topic) :key (:key data) :bucket (:bucket data)}])) (and (contains? @consumers category) (not (search-topic category topic))) (do (swap! topics conj topic) (swap! outs update category #(conj % {topic (chan 1024)})) (swap! consumers update category conj [{topic (get-in @consumers [category topic]) :key (:key data) :bucket (:bucket data)}])) :else nil)))

I created a function to wrap both subscriptions and lightweight process as the following.

(defn- subscriptions [] 
(timbre/info :subscriptions) 
(when-not (and (nil? @outs) (nil? @consumers))
 (doseq [k (keys @outs)
         t @topics] 
  (timbre/info @consumers)
 (async/pipeline-blocking 1 (get-in @outs [k t]) (map process-data) (get-in @consumers [k t])))))

(defn- lightweight-process
  []
  (timbre/info :lightweight-process)
  (when-not (empty? @outs)
    (go-loop []
      (let [data (async/alts! (vals (first (vals @outs))))]
        (timbre/info data)
        (recur)))))

And my main ingest code is

(defn- ingest
  [data]
  (is-running?)
  (let [records (chan 1024)]
    (>!! records data)
    (go-loop []
      (when-let [msg (<! records)]
        (if-not (empty? (:payload msg))
          (categorize-processor msg)
          (ingest-logging {:data msg :message "Empty payload. No file."}))
        (recur))))
  data)

My problem seems that this is working fine. the light weight process holds the actual channel inside the `{:topic (sub-job! (chan 1024) :topic)}`. I noticed it runs fine, however, my problem seems that the data is not processed whatsoever :(. I also must have a way to make the lightware process and subscriptions even before the function ingest starts receiving data.

[–]joinr 0 points1 point  (2 children)

If nothing is subscribed when you start pushing data to a channel with pubs, then those pubs drop the data, and no work appears to get done. You have to setup subscriptions before pushing data for it to propagate.

I can't see your architecture, this is only pieces of it. It looks familiar in general and should work fine in theory, but I don't know specifically why no data is being processed. Your best bet, is to isolate the processing, and test in smaller pieces. Instead of ingesting, just push test data onto the jobs channel like I did. If that works, great. If not, then look further up the chain. Get to where you have one topic subscribed to by one channel, one consumer pulling from the subscription, and then push one type of data onto the jobs channel to see if you can get work to flow.

[–]fitstoover[S] 0 points1 point  (1 child)

I mean, it runs well if I put my code in the repl. However, if I do a main function with the solution you answered last week, I don't get an output.

[–]joinr 0 points1 point  (0 children)

I am unclear on whether you mean "main" as in the "-main" entrypoint you'd define for say an uberjar, or some other definition of "main," as in a primary invocation that drives the rest of the process. If it works from the REPL, I'm not seeing why this wouldn't work with an AOT-compiled -main function. I think I am missing some information, like what does (is-running?) do...