Monday, January 13, 2025

How to Upload an Async InputStream as a Multipart Body Using the Kotlin flavored SpringBoot

Suppose if we have a file, that need to be uploaded as a multi part body, then we can use FileSystemResource from Spring to build up the Multipart body and use it as a payload for the http request.

Close to real example would look something similar to this

// Get the file from resources folder and add as a part to mutipart body
val file = ClassPathResource("upload.txt").file
val multipartBody = MultipartBodyBuilder().apply {
    part("file", FileSystemResource(file)).filename("upload-.txt")
}.build()

//Send it using the webclient
WebClient.create("host")
    .post()
    .uri("path")
    .bodyValue(multipartBody)
    .retrieve()
    .toBodilessEntity()
    .block()

Note the block() call in the end of WebClient invocation. Since this is made from the non reactive stack, it will block until a response comes. Where, if this is executed in a reactive stack (e.g. WebFlux), then the block() call would probably result in a error. Therefore, it should be handled in a async manner.

With above code we can upload a file, but suppose what we have is a Mono<InputStream>, how shall we upload it?

Lets go though the steps on how we can do the upload in the async manner, if the content is available in an async publisher.

Of course we can block() the publisher, get the InputStream and process it in a traditional way, but it's not an option for the reactive stack.

1. Create a Server

Lets create a SpringBoot server with a POST API that accepts MultipartFile body.

Versions used.

SpringBoot version: 3.2.1
Kotlin version: 1.9.25

RestController for the API would look similar to this

@RestController
@RequestMapping("/import")
class ImportController {

    @PostMapping(consumes = [MediaType.MULTIPART_FORM_DATA_VALUE])
    @ResponseStatus(HttpStatus.CREATED)
    fun importFile(@RequestPart("file") file: MultipartFile): ResponseEntity<Void> {

        // save file

        return ResponseEntity.status(HttpStatus.CREATED).build()
    }
}

2. Create Client

Versions used.

SpringBoot version: 3.3.5
Kotlin version: 1.9.25

Next, we can create a multipart body as a payload to send it for the above created API using the WebClient. For this we can use the asyncPart method of the MultipartBodyBuilder.

Note: There is no support to transform InputStream directly into multipart content, therefore, it has to be converted as array of byte. Note that in kotlin, array of byte is represented by the ByteArray class.

fun uploadFile(contentPublisher: Mono<InputStream>) {
    val byteArrayContentPublisher: Mono<ByteArray> = contentPublisher.map { it.readAllBytes() }
    val multipartBody = MultipartBodyBuilder().apply {
        asyncPart("file", byteArrayContentPublisher, ByteArray::class.java).filename("upload.txt")
    }
}

Now this multipart body can be used as a payload for the WebClient call.

3. Optimize memory usage

While above given solution might work for files that are smaller in size, it might not work when the file size is larger and if there not enough heap allocated for the application.

Lets try to startup up the client application with the limited heap size of 30 Mib. We can easily do that by parsing the jvm argument -Xmx30m with application startup parameters.

Following is the heap usage obtained through VisualVM program, before attempting to upload a file.

When an attempt to upload a file with the size of ~12Mib, application crashed with OutOfMemoryError.

Caused by: java.lang.OutOfMemoryError: Java heap space
	at java.base/java.nio.HeapByteBuffer.<init>(HeapByteBuffer.java:71)
	at java.base/java.nio.ByteBuffer.allocate(ByteBuffer.java:391)
	at org.springframework.http.client.reactive.JdkClientHttpRequest.toByteBuffer(JdkClientHttpRequest.java:141)
	at org.springframework.http.client.reactive.JdkClientHttpRequest$$Lambda/0x00000257b03915f8.apply(Unknown Source)

and the heap usage showed a steep increase in the used memory, until it got crashed.

Reason for this behavior is that we read the complete InputStream at once using it.readAllBytes() method call.

Which would simply attempt to read all the bytes into the memory before sending it.

Therefore, reading all bytes once not going to be a better solution and we need to read the file by buffering it, so we don't load up all the file content at once, which would crash the application if there is not enough heap space left for the application.

In order to solve that, we can transform the Mono<InputStream> as lazily fetched Flux<ByteArray> instead of Mono<ByteArray>. With this approach we can read the file content into smaller buffer and send it as soon as it reads into buffer, until all bytes read like this.

fun uploadFile(contentPublisher: Mono<InputStream>) {
    val byteArrayStreamContentPublisher: Flux<ByteArray> = contentPublisher.flatMapMany { inputStream ->
        Flux.generate { sink ->
            val buffer = ByteArray(1024) // read by buffer of 1 Mib
            val bytesRead = inputStream.read(buffer)
            if (bytesRead != -1) {
                sink.next(buffer.copyOf(bytesRead))
            } else {
                sink.complete()
            }
        }
    }
    val multipartBody = MultipartBodyBuilder().apply {
        asyncPart("file", byteArrayStreamContentPublisher, ByteArray::class.java).filename("upload.txt")
    }
}

The beauty of this solution is each chunk read lazily based on consumer invocation through the reactor sink pattern, which mean the next buffer of bytes in line is not read until the current buffer of bytes which are already read streamed down to the server, by eliminating the need of having a free heap size that is equivalent to the file size.

Now lets analyze the heap usage for this

Heap before uploading the file


Now lets upload the same file (~12 Mib) again and observe the heap usage. This time file got uploaded without any heap error.

The result shows the latter solution did not consume much heap, since it is read by 1 Mib at a time. 

With that hopefully we now have a better idea on how we shall upload a async content as a multipart payload.

Thursday, December 26, 2024

Use OpenShift OAuth server as login option for the SpringBoot Application

OpenShift is a kubernetes cluster with many addition features that are production ready. One such feature is an authorization server, which can be used to authenticate ourself using OAuth2 protocol.

If we have an application deployed in the cluster, then we can use the OpenShift oauth server(authorization server) as a login option for that application. 

So, lets go through the steps that are required to secure our SpringBoot application, using the OpenShift oauth server.

This authorization workflow we gonna use can be better understood by referring the guide rfc6749#section-4.

Moreover, SpringBoot project explained in the spring-boot-oauth2 guide can be used as a starting point to configure the application with OpenShift oauth server, where we will use OpenShift instead GitHub.

1. Obtain OpenShift OAuth Server Properties

First thing we require is the oauth server connection properties, to get them, as explain in the oauth server metadata documentation, a GET request to the endpoint https://openshift.default.svc/.well-known/oauth-authorization-server should be made from within the cluster.

The API would return a payload similar to this

{
  "issuer": "https://<namespace_route>", 
  "authorization_endpoint": "https://<namespace_route>/oauth/authorize", 
  "token_endpoint": "https://<namespace_route>/oauth/token", 
  "scopes_supported": [ 
    "user:full",
    "user:info",
    "user:check-access",
    "user:list-scoped-projects",
    "user:list-projects"
  ],
  "response_types_supported": [ 
    "code",
    "token"
  ],
  "grant_types_supported": [ 
    "authorization_code",
    "implicit"
  ],
  "code_challenge_methods_supported": [ 
    "plain",
    "S256"
  ]
}

where, we will use authorization_endpointtoken_endpoint and optionally scopes from the SpringBoot application for authentication.

Note that oauth server doesn't provide a user info endpoint, which requires to fetch user information, such as username, roles, etc, upon a successful login. Later in the guide an alternative way is mentioned to fetch these information.

2. Register SpringBoot Application as a Client Application

For the oauth server to allow our application as client application to initiate the login request, it should be registered as client application in oauth server.

For that, openshift provides a kubernetes CRD (custom resource definition) OAuthClient, which should be used to register our client with the oauth server as described in registering an additional OAuth client guide.

Pay attention to following properties while registering a client

  • metadata.name
  • secret
  • redirectURIs

where, metadata.name used as the client-id and secret used as client-secret (this should be treated as a password).

Value given for redirectURI will be used to redirect the request when the login is success in oauth server side for further processing the authentication by the client, therefore it should be uri supported by the client, in our case SpringBoot.

By default, SpringBoot uses following redirect template, {baseUrl}/login/oauth2/code/{registrationId}, where the registrationId is what comes under property spring.security.oauth2.client.registration.

Therefore if we use id as openshift then redirect uri becomes {baseUrl}/login/oauth2/code/openshift. Note: remember to replace the {baseUrl} with actual value.

With all that, for a sample demo app deployed under https://demo-app, an OAuthClient resource would look like this

kind: OAuthClient
apiVersion: oauth.openshift.io/v1
metadata:
  name: demo-app
secret: demo-app-secret
redirectURIs:
  - "https://demo-app/login/oauth2/code/openshift"
grantMethod: prompt

Note the grantMethod, prompt means that user need to explicitly approve the scope when login is performed by oauth server, another possible value for that field is auto, in that case no explicit approval required.

Resource is registered in cluster level, therefore, the one who register it should have cluster wide permission to create this resource.

3. Configure SpringBoot Application

Once a client is registerd in oauth server using OAuthClient resource, its properties can be used to register a oauth2 client configuration from SpringBoot app.

For the above OAuthClient configuration, following SpringBoot configuration should be performed. 

spring:
  security:
    oauth2:
      client:
        registration:
          openshift:
            authorization-grant-type: authorization_code
	    client-name: Demo App
            client-id: demo-app
            client-secret: demo-app-secret
            provider: openshift
            redirect-uri: https://demo-app/login/oauth2/code/openshift
            scope: user:info
        provider:
          openshift:
            authorization-uri: <openshift-oauth-server-host>/oauth/authorize
            token-uri: <openshift-oauth-server-host>/oauth/token

where authorizatio-uri and token-uri should be replaced with valued from the response we got for the GET request made for oauth server .well-known/oauth-authorization-server endpoint.

With that we should be able to trigger a login using the path /oauth2/authorization/openshift, which will then redirect the login request to OpenShift oauth server and redirect the request back to app when login success in oauth server side. 

Note that the default template for the login url comes as the /oauth2/authorization/{registrationId} 

4. Fetch User Information

You might have noticed from the last step, even though the login success from oauth server side, after a redirect to the client app, it fails to authenticate further. This is because Spring security now needs user information to set up an authentication (principal). Therefore, some additional configs required to fetch these user information from spring security perspective.

As part of oauth2 workflow, user information is fetched though the uri configured in user-info-uri and user-name-attribute, however, OpenShift oauth server doesn't provide a way to fetch this user information, instead we shall use the OpenShift user api to fetch these information by using the authorization_code we received from oauth server.

OpenShift provide following user apis, among those, we can use GET /apis/user.openshift.io/v1/users/~. Mind the ~ in the end of path, it implies fetching the current logged in user, for the authorization code we pass with the GET request.

The API returns JSON response with the structure explained in the specification. Where username can be obtained from metadata.name field.

With that SpringBoot application need two more additional configs to the openshift provider configuration, those are 

spring:
  security:
    oauth2:
      client:
        provider:
          openshift:
            user-info-uri: <openshift-api-host>/apis/user.openshift.io/v1/users/~
            user-name-attribute: username

Note that we have configured username as the field name to look for username in the response, however, no such field available as explained in the specification. The username actually resides in nested field metadata.name. Therefore, this value should be flatten in spring security side, for it to be able to fetch the value and construct the authentication object.

Spring security provides a way to flatten fields for easy access through the DefaultOAuth2UserService#setAttributesConverter method.

Customization of the user service in kotlin would look similar to this.

http {
  oauth2Login {
    userInfoEndpoint {
      userService = DefaultOAuth2UserService().apply {
        setAttributesConverter { _ ->
          Converter {
            val username = (it["metadata"] as? Map<*, *>)?.get("name") as? String
            if (username != null) {
              it["username"] = username
            }
            it
          }
        }
      }
    }
  }
}

Now we have flatten metada.name field value into username field, that can be accessed by spring security.

With all that, we should be able to use the OpenShift oauth for the authentication of our SpringBoot application upon triggering the login.


Tuesday, October 29, 2024

Create a zip and return it as response from SpringBoot WebFlux reactive API - Kotlin

SpringBoot WebFlux provides the framework where we can create reactive APIs.

What if we need to return a zip file as a response to such an API? lets go through the steps that require to send a zip file as a response from a reactive REST API

1. Create Route

We first need to create a GET route which can be used to request for a zip file.

@Configuration(proxyBeanMethods = false)
class RouterConfiguration {
    @Bean
    fun route(): RouterFunction<ServerResponse> {
        return RouterFunctions
            .route(GET("/download").and(accept(MediaType.APPLICATION_OCTET_STREAM))) { ServerResponse.ok().build() }
    }
}

this configures a GET route called /download which match to the request only if it supports receiving octet stream, by declaring it in its request header.

Route upon invocation will just return 200 OK status with an empty response body.

2. Create Controller Function

We now need an function that can build this zip file, so that can be called from the route invocation and returned as a response.

If we understand properly, zip is a series of bytes, which can represented by a Spring provided flux of DataBuffer.

Therefore it can be defined as this to return that flux of databuffers as a response body for a request.

@Component
class DownloadHandler() {
    fun createZip(request: ServerRequest): Mono<ServerResponse> {
        return ServerResponse.ok().contentType(MediaType.parseMediaType("application/zip"))
            .body(BodyInserters.fromDataBuffers(Flux.empty()))
    }
}

The method accepts a ServerRequest and builds ServerResponse for empty list of bytes with content-type as zip.

Above created controller method can now invoked from the route configuration for request handling. 

With above change, altered route configuration method will look as this.

@Bean
fun route(downloadHandler: DownloadHandler): RouterFunction<ServerResponse> {
  return RouterFunctions
    .route(GET("/download").and(accept(MediaType.APPLICATION_OCTET_STREAM))) { downloadHandler.createZip(it) }
}

3. Generate Zip Content

The last step that is remaining is building a zip file from list of files. 

Lets assume we have two files /one/first.txt and /two/second.txt, then the Zip content can be build and streamed as DataBuffer.

private fun createZipDataBuffer(dataBufferFactory: DataBufferFactory): Flux<DataBuffer> {
    val paths = listOf(Path("/one/first.txt"), Path("/two/second.txt"))
    // Create a data buffer of 5 kibibytes
    val dataBuffer = dataBufferFactory.allocateBuffer(1024 * 5) // 5 kibibyte
    // Create a zip output stream using the data buffer as the output stream
    val zipOutputStream = ZipOutputStream(dataBuffer.asOutputStream())

    return Flux.create { emitter ->
        try {
            zipOutputStream.use {
                paths.forEach { path ->
                    zipOutputStream.putNextEntry(ZipEntry(path.subpath(0, path.nameCount - 1).toString()))
                    Files.copy(path, zipOutputStream)
                    zipOutputStream.closeEntry()
                }
                zipOutputStream.finish()
                emitter.next(dataBuffer)
                emitter.complete()
            }
        } catch (e: Exception) {
            emitter.error(e)
        }
    }
}

Method requires DataBufferFactory to create the DataBuffer sized with 5 kibibyte. and the factory from the request should be used for it.

To represent a zip file,ZipOutputStream is created and the generated data buffer used as its output stream and upon completion, the generated data buffer emitted from the flux, so it can be used to create the response.

With that this method can now invoked from DonloadHandler createZip method like this.

fun createZip(request: ServerRequest): Mono<ServerResponse> {
    return ServerResponse.ok().contentType(MediaType.parseMediaType("application/zip"))
        .body(BodyInserters.fromDataBuffers(
	    // use the bufferFactory from the request
            createZipDataBuffer(request.exchange().response.bufferFactory()))
        )
}

With all that, when a GET request is invoked to the /download url, then a zip file, with both files, will be send as a response.

That's all and now it should be possible to create a zip content as a response for a reactive SpringBoot WebFlux API 👌

Saturday, August 31, 2024

What really is the `Connection prematurely closed BEFORE response` exception from Netty HttpClient?

Have you tried calling a HTTP api using Netty client? probably you would have indirectly using it if you are using the SpringBoot webclient to make such API calls, cause the WebClient internally uses netty's HttpClient for making the calls in non blocking way.

The client designed to keep the connection in a connection pool in order to reuse it for the next time, when a request is initiated for the same server. However, rarely this client might throw reactor.netty.http.client.PrematureCloseException: Connection prematurely closed BEFORE response exception.

This often probably means, that the connection is closed by the destination server, while a request sending is initiated using this connection by the netty's HttpClient.

Suppose the connection is closed by the destination server, while the connection is idle in the netty's connection pool, then this exception will not occur if the destination server property send its signal to close the connection. In such case netty will disregard that idle connection and will initiate new one when next time new request comes for the same destination server.

Therefore, this exception is difficult to recreate and occur rarely.

Possible scenario

Following is one possible scenario that this exception can be observed occasionally.

Assume you have a Http API deployed using the tomcat based spring boot server and you started to make API call using netty's HttpClient for every 60 seconds.

Then rarely you might observe few PrematureCloseException over time. This is because the tomcat, by default, keeps the connection idle for 60 seconds and when while netty try to use that connection, if tomcat mark it for close, due to the 60 seconds timeout, then call fails with PrematureCloseException.

Above mentioned problem can be fixed if the tomcat connection idle timeout set to more than 60 seconds or netty's http connection idle timeout set to less than 60 seconds.

How to set connection idle timeout for Tomcat

Mostly keepAliveTimeout setting, which set to the value of connectionTimeout by default, mistakenly taken as connection idle timeout setting to be adjusted. However, it has nothing to do with keeping the connection idle after serving a request.

keepAliveTimeout used to wait for another read from the same request, before concluding it as completed.

Once response sent, and client completes the request, then the tomcat connection left with OPEN_READ status, which means it is ready to start receiving new request, in that state, it uses connectionTimeout to decided whether it should idle the connection further or close the connection.

Therefore tomcat connectionTimeout is the one that should be changed, if you want to change how long a connection should be kept idle.

Also note that, if the connection is reused by tomcat(if got a new request from same client within connectionTimeout), then if a TLS handshake is already performed, it will be skipped next time removing the overhead of TLS handshake.

How to set connection idle timeout for Netty client

From netty's pool perspective, connection can be set to expire after being idle using the property maxIdleTimeout.


With that, it should be possible to handle the PrematureCloseException from netty's HttpClient.