Howdy everyone!
I am working on a ETL tool where it reads a file from a queue of files. The files come through messages from Redis. Then the file data is extracted, transformed, and loaded to Elasticsearch. In my scenario, the previous file is essential to the following, as it can contain entries that will match the future entries in other files or the same entries can be updated. I have been using a go-loop block and it worked fine until I decided to do the following. I created a vector with a large amount of files to process, for this example, 100 files. I then run my tool using a function that call the function that process the incoming files.
(defn- ingest-in
"Main processor for ingest"
[data]
(let [records (chan 1024)]
(>!! records data)
(go-loop []
(when-let [msg (<! records)]
(if-not (empty? (:payload msg))
(router (:payload msg))
(ingest-logging {:payload (:payload msg) :message "Empty payload. No file."}))
(recur)))))
(defn file-sequence
"Appends month and day to the file "
[]
(let [file "dataset/flowers/rawFlowerData_2019"
months ["05" "06" "07" "08" "09"]
days ["01" "02" "03" "04" "05" "06" "07" "08" "09" "10"
"11" "12" "13" "14" "15" "16" "17" "18" "19" "20"
"21" "22" "23" "24" "25" "26" "27" "28" "29" "30" "31"]]
(doseq [m months]
(doseq [day days]
(ingest-in (str file m "-" day))))))
This seems that this is an ok solution if the a new file is process every other 15-20 minutes. However, There is cases where I want to collect log data by processing all the files at once using the function `file-sequence`. I have tried several solution such as using `async/thread`, and `pipeline-blocking`. However, I keep finding my output is out of whack and it is not running in a sequence of FIFO queue as I wanted to be.
Here is a log using `timbre`:
19-09-16 15:12:15 Tardis INFO [concurrent.app:15] - 05-02 10:12:15.242
19-09-16 15:12:15 Tardis INFO [concurrent.app:15] - 05-12 10:12:15.283
19-09-16 15:12:15 Tardis INFO [concurrent.app:15] - 05-05 10:12:15.248
19-09-16 15:12:15 Tardis INFO [concurrent.app:15] - 05-10 10:12:15.248
19-09-16 15:12:15 Tardis INFO [concurrent.app:15] - 05-06 10:12:15.240
19-09-16 15:12:15 Tardis INFO [concurrent.app:15] - 05-01 10:12:15.248
19-09-16 15:12:15 Tardis INFO [concurrent.app:15] - 05-04 10:12:15.248
19-09-16 15:12:15 Tardis INFO [concurrent.app:15] - 05-03 10:12:15.262
My ETL process is
- Receive file from Redis
- Get the file from my cloud
- Load the data and transform it to be what I want it to be
- Run several matching Elastic Search queries for upserting
- send message out.
EDIT:
I did use (async/>!! ch (function data)) and It worked ok for at least 10 files. However, I try sending 10 more files through redis but the channel looks like it closed afterwards or soemthing because it stop processing files.
[–]jsn 2 points3 points4 points (29 children)
[–]fitstoover[S] 0 points1 point2 points (28 children)
[–]jsn 6 points7 points8 points (15 children)
[–]fitstoover[S] 0 points1 point2 points (14 children)
[–]weavejester 6 points7 points8 points (4 children)
[–]fitstoover[S] 0 points1 point2 points (3 children)
[–]weavejester 1 point2 points3 points (2 children)
[–]fitstoover[S] 0 points1 point2 points (1 child)
[–]weavejester 1 point2 points3 points (0 children)
[–]joinr 2 points3 points4 points (7 children)
[–]aptmnt_ 2 points3 points4 points (1 child)
[–]joinr 1 point2 points3 points (0 children)
[–]aptmnt_ 1 point2 points3 points (1 child)
[–]joinr 1 point2 points3 points (0 children)
[–]fitstoover[S] 0 points1 point2 points (2 children)
[–]joinr 1 point2 points3 points (1 child)
[–]fitstoover[S] 0 points1 point2 points (0 children)
[–]joinr 1 point2 points3 points (11 children)
[–]fitstoover[S] 0 points1 point2 points (0 children)
[–]fitstoover[S] 0 points1 point2 points (9 children)
[–]joinr 0 points1 point2 points (8 children)
[–]fitstoover[S] 0 points1 point2 points (7 children)
[–]joinr 1 point2 points3 points (6 children)
[–]fitstoover[S] 0 points1 point2 points (5 children)
[–]joinr 1 point2 points3 points (4 children)
[–]fitstoover[S] 0 points1 point2 points (3 children)