Suppose if we have a file, that need to be uploaded as a multi part body, then we can use FileSystemResource from Spring to build up the Multipart body and use it as a payload for the http request.
Close to real example would look something similar to this
// Get the file from resources folder and add as a part to mutipart body
val file = ClassPathResource("upload.txt").file
val multipartBody = MultipartBodyBuilder().apply {
part("file", FileSystemResource(file)).filename("upload-.txt")
}.build()
//Send it using the webclient
WebClient.create("host")
.post()
.uri("path")
.bodyValue(multipartBody)
.retrieve()
.toBodilessEntity()
.block()Note the block() call in the end of WebClient invocation. Since this is made from the non reactive stack, it will block until a response comes. Where, if this is executed in a reactive stack (e.g. WebFlux), then the block() call would probably result in a error. Therefore, it should be handled in a async manner.
With above code we can upload a file, but suppose what we have is a Mono<InputStream>, how shall we upload it?
Lets go though the steps on how we can do the upload in the async manner, if the content is available in an async publisher.
Of course we can block() the publisher, get the InputStream and process it in a traditional way, but it's not an option for the reactive stack.
1. Create a Server
Lets create a SpringBoot server with a POST API that accepts MultipartFile body.
Versions used.
SpringBoot version: 3.2.1
Kotlin version: 1.9.25RestController for the API would look similar to this
@RestController
@RequestMapping("/import")
class ImportController {
@PostMapping(consumes = [MediaType.MULTIPART_FORM_DATA_VALUE])
@ResponseStatus(HttpStatus.CREATED)
fun importFile(@RequestPart("file") file: MultipartFile): ResponseEntity<Void> {
// save file
return ResponseEntity.status(HttpStatus.CREATED).build()
}
}2. Create Client
Versions used.
SpringBoot version: 3.3.5
Kotlin version: 1.9.25Next, we can create a multipart body as a payload to send it for the above created API using the WebClient. For this we can use the asyncPart method of the MultipartBodyBuilder.
Note: There is no support to transform InputStream directly into multipart content, therefore, it has to be converted as array of byte. Note that in kotlin, array of byte is represented by the ByteArray class.
fun uploadFile(contentPublisher: Mono<InputStream>) {
val byteArrayContentPublisher: Mono<ByteArray> = contentPublisher.map { it.readAllBytes() }
val multipartBody = MultipartBodyBuilder().apply {
asyncPart("file", byteArrayContentPublisher, ByteArray::class.java).filename("upload.txt")
}
}Now this multipart body can be used as a payload for the WebClient call.
3. Optimize memory usage
While above given solution might work for files that are smaller in size, it might not work when the file size is larger and if there not enough heap allocated for the application.
Lets try to startup up the client application with the limited heap size of 30 Mib. We can easily do that by parsing the jvm argument -Xmx30m with application startup parameters.
Following is the heap usage obtained through VisualVM program, before attempting to upload a file.
When an attempt to upload a file with the size of ~12Mib, application crashed with OutOfMemoryError.
Caused by: java.lang.OutOfMemoryError: Java heap space
at java.base/java.nio.HeapByteBuffer.<init>(HeapByteBuffer.java:71)
at java.base/java.nio.ByteBuffer.allocate(ByteBuffer.java:391)
at org.springframework.http.client.reactive.JdkClientHttpRequest.toByteBuffer(JdkClientHttpRequest.java:141)
at org.springframework.http.client.reactive.JdkClientHttpRequest$$Lambda/0x00000257b03915f8.apply(Unknown Source)and the heap usage showed a steep increase in the used memory, until it got crashed.
Reason for this behavior is that we read the complete InputStream at once using it.readAllBytes() method call.
Which would simply attempt to read all the bytes into the memory before sending it.
Therefore, reading all bytes once not going to be a better solution and we need to read the file by buffering it, so we don't load up all the file content at once, which would crash the application if there is not enough heap space left for the application.
In order to solve that, we can transform the Mono<InputStream> as lazily fetched Flux<ByteArray> instead of Mono<ByteArray>. With this approach we can read the file content into smaller buffer and send it as soon as it reads into buffer, until all bytes read like this.
fun uploadFile(contentPublisher: Mono<InputStream>) {
val byteArrayStreamContentPublisher: Flux<ByteArray> = contentPublisher.flatMapMany { inputStream ->
Flux.generate { sink ->
val buffer = ByteArray(1024) // read by buffer of 1 Mib
val bytesRead = inputStream.read(buffer)
if (bytesRead != -1) {
sink.next(buffer.copyOf(bytesRead))
} else {
sink.complete()
}
}
}
val multipartBody = MultipartBodyBuilder().apply {
asyncPart("file", byteArrayStreamContentPublisher, ByteArray::class.java).filename("upload.txt")
}
}The beauty of this solution is each chunk read lazily based on consumer invocation through the reactor sink pattern, which mean the next buffer of bytes in line is not read until the current buffer of bytes which are already read streamed down to the server, by eliminating the need of having a free heap size that is equivalent to the file size.
Now lets analyze the heap usage for this
Heap before uploading the file
Now lets upload the same file (~12 Mib) again and observe the heap usage. This time file got uploaded without any heap error.The result shows the latter solution did not consume much heap, since it is read by
1 Mib at a time. With that hopefully we now have a better idea on how we shall upload a async content as a multipart payload.




No comments:
Post a Comment