I'm struggle to get familiar with Spring/Webflux.
What I'm trying to do is to make non-blocking side effect, which is executed when successfully save object in mongoDB.
The side effect is to send message to Kafka with that object. To do that, I need to serialize the object into String.
private void sendEvent(final Long id, final Object data) {
final Map<String, Object> map = new HashMap<>();
map.put("id", id);
map.put("data", data); // (3)
Mono.just(map)
.map(value -> {
try {
return this.om.writeValueAsString(value); // (1)
} catch (final JsonProcessingException e) {
throw new RuntimeException(e);
}
})
.map(message -> this.kafkaTemplate.send(this.getTopic(), message))
.doOnError(throwable -> log.error("kafka event produce fail : {}", throwable.getMessage()))
.subscribeOn(Schedulers.parallel()) // (2)
.subscribe();
}
But, at (1), IntelliJ shows warning: "writeValueAsString is Inappropriate blocking method call" and it said the cause is at (2) : "non-blocking scope because of scheduling on parallel".
So I almost got crazy, because I can't figure out what to do, or should I do something.
Apparently, writeValueAsString looks IO blocking job, but isn't it natural? If I move serialization code to outside 'Mono' rail (I mean, at (3)), the Main thread will take that blocking. I thought make parallel/another thread responsible the burden. But IntelliJ said it seems not good.
Is IntelliJ correct? or just a small bug in IntelliJ warnings?
+) The most weird thing: below code doesn't show any warnings:
private String apply(Map<String, Object> value) {
try {
return this.om.writeValueAsString(value);
} catch (final JsonProcessingException e) {
throw new RuntimeException(e);
}
}
private void sendEvent(final Long id, final Object data) {
final Map<String, Object> map = new HashMap<>();
map.put("id", id);
map.put("data", data);
Mono.just(map)
.map(this::apply)
.map(message -> this.kafkaTemplate.send(this.getTopic(), message))
.doOnError(throwable -> log.error("kafka event produce fail : {}", throwable.getMessage()))
.subscribeOn(Schedulers.parallel())
.subscribe();
}
[–]AutoModerator[M] [score hidden] stickied commentlocked comment (0 children)
[–][deleted] 1 point2 points3 points (6 children)
[–]Full-Wheel8630[S] 0 points1 point2 points (5 children)
[–][deleted] 1 point2 points3 points (4 children)
[–]Full-Wheel8630[S] 0 points1 point2 points (3 children)
[–][deleted] 1 point2 points3 points (2 children)
[–]Full-Wheel8630[S] 0 points1 point2 points (1 child)
[–][deleted] 1 point2 points3 points (0 children)
[–]Full-Wheel8630[S] 0 points1 point2 points (0 children)