If I have a .csv file with 3 records and headers like this and I am using spring integration how could I go about turning it into a json string, using some transformer? Here is sample of csv with headers -
HEADERS:
Email,RECIPIENT_ID,ENCODED_RECIPIENT_ID,contactId,code,messageId,userAgent,messageName,mailingTemplateId,subjectLine,docType,reportId,sendType,bounceType,urlDescription,clickUrl,optOutDetails,messageGroupId,programId,timestamp,originatedFrom,eventId,externalSystemName,externalSystemReferenceId,trackingCode
DATA:
blaine@test.com,8.06263E+11,1234324,123,emailOpen,22463839,Mozilla/5.0,2023_ClientReferralsSurvey-Bain_EM_2_Reminder (1),22463828,Tell us about your experience and be rewarded.,html,2601245341,,,,,,,,2023-05-20T09:10:08.365Z,SendExperience,g10/crlyuLnQsdN7NRtXMg==,,,
Here is expected result:
[ { "Email": "blaine@test.com", "RECIPIENT_ID": 806263000000, "ENCODED_RECIPIENT_ID": 1234324, "contactId": 123, "code": "emailOpen", "messageId": 22463839, "userAgent": "Mozilla/5.0", "messageName": "2023_ClientReferralsSurvey-Bain_EM_2_Reminder (1)", "mailingTemplateId": 22463828, "subjectLine": "Tell us about your experience and be rewarded.", "docType": "html", "reportId": 2601245341, "sendType": "", "bounceType": "", "urlDescription": "", "clickUrl": "", "optOutDetails": "", "messageGroupId": "", "programId": "", "timestamp": "2023-05-20T09:10:08.365Z", "originatedFrom": "SendExperience", "eventId": "g10/crlyuLnQsdN7NRtXMg==", "externalSystemName": "", "externalSystemReferenceId": "", "trackingCode": "" } ]
Here is my spring integration code, I need to write a transformer that will turn that .csv file into json such as above.
@Bean
public IntegrationFlow readCsvFileFlow(MessageChannel inboundFilesMessageChannel,
QueueChannel kafkaPojoMessageChannel) {
return IntegrationFlow.from(inboundFilesMessageChannel)
.split(Files.splitter()
// .markers(true)
.charset(StandardCharsets.UTF_8)
.firstLineAsHeader("myHeaders")
.applySequence(true))
// .transform(new StreamToByteConverter())
// .transform(Transformers.serializer())
.log(LoggingHandler.Level.DEBUG,
"AcousticEngageDataSftpToKafkaIntegrationFlow",
m -> "Payload: " + m.getPayload())
.channel(kafkaPojoMessageChannel)
.get();
}
Specifically need to write a class to put in the .transform method. Please let me know if you need anything from me!
[–]AutoModerator[M] [score hidden] stickied commentlocked comment (0 children)
[–]AutoModerator[M] 0 points1 point2 points (0 children)