API Streaming Response with Oracle and Java

Data Stream is a fascinating thing to work with. We see this in many places nowadays. What we do not see often is Java application getting streamed data out of Oracle. Getting data out of Oracle in a streamed fashion is extremely useful when the expected data set is large. I will share the details around the use-case, problem, solution, implementation, and advantages for the same.

Use Case

A few months ago, I ran into a situation where it was needed to fetch close to million records from the Oracle DB. Now Oracle may not be the best database to handle such a request, but if you depend on Oracle database, you need to make it work.

Consider a scenario where one needs to get all productId that satisfies the given criteria (criteria can change for each and every request). Once we get the productIds, for each we need to take some action, say fetch quantity.

Problem

Our traditional Java processing would go something like this:

  • Make a database call. Get the List of productId
  • For each productId get a quantity

Seems straight forward, right? But the issue comes when we do it for million records, and that too with parallel request and shared DB connections. Our object will be huge leading to memory issues. Initial caller, who wanted to know the product and its quantity will have to wait till we finish processing all records. Not a great user experience.

Solution

The first thing that comes to our mind is pagination. Pagination is a way to break this into chunks and feed small data at a time. This may not be feasible always. And with microservices, we may not want someone calling a thousand times to get one requirement.

So the next thing that comes to our mind is what if we want the quantity of a product as soon as we know the productId. Hence streaming!

Now it will not be beneficial if we do a partial stream. We need our source (Oracle database here) to give us data in a streamed manner. If our API is capable to give out a response stream rather than one bulky response, that would be even more awesome. This will enable the API caller to process data as soon as it arrives.

Implementation

Here we have a rest endpoint (JAX-RS).

Note: There are no changes needed on the Oracle side. This is a Java implementation which is using Oracle as a database.

@Produces("application/octet-stream")
@GET
@Path("/product/ids")
public Object getProductIds(@QueryParam("c1") String condition1, @QueryParam("c2") String condition2, ...);

Oracle JDBC call:

public StreamingOutput getproductIdStream(Map<String, String> conditions) {
        StreamingOutput stream = null;
        try {
            stream = new StreamingOutput() {
                @Override
                public void write(OutputStream os) throws IOException, WebApplicationException {
                    namedParameterJdbcTemplate.query(sql, conditions, new productResultSetExtractor(os));
                }
            };
        } catch (Exception e) {
             e.printStackTrace();
        }
        return stream;
    }

ProductResultSetExtractor class:

public class ProductResultSetExtractor implements ResultSetExtractor<Void> {

    private final OutputStream data;

    public ProductResultSetExtractor(final OutputStream data) {
        this.data = data;
    }

    @Override
    public Void extractData(final ResultSet resultSet) {
        final ObjectMapper objectMapper = new ObjectMapper();
        try (JsonGenerator jsonGenerator = objectMapper.getFactory().createGenerator(data, JsonEncoding.UTF8)) {
            prepareJson(resultSet, jsonGenerator);
            jsonGenerator.flush();

        } catch (IOException | SQLException e) {
            throw new RuntimeException(e);
        }
        return null;
    }

    private static void prepareJson(final ResultSet resultSet, final JsonGenerator )
            throws SQLException, IOException {
        final ResultSetMetaData metaData = resultSet.getMetaData();
        final int columnCount = metaData.getColumnCount();
        JsonGenerator.writeStartArray();
        while (resultSet.next()) {
            JsonGenerator.writeStartObject();
            for (int i = 1; i <= columnCount; i++) {
                JsonGenerator.writeObjectField(metaData.getColumnName(i), resultSet.getObject(i));
            }
            JsonGenerator.writeEndObject();
        }
        JsonGenerator.writeEndArray();
    }
}

This will give out the following stream:

[
  { "productId": "p1"},
  { "productId": "p2"},
  { "productId": "p3"},
  ...
  { "productId": "pN"}
]

Here caller can process as soon as { "productId": "p1"} is received.

Advantages

Adapting to streaming data from Oracle has few advantages when the returning dataset is large.

  • Low memory consumption
  • Early first record. This is extremely useful. The caller does not need to wait until the complete object is prepared.
  • API performance does not decrease much with an increase in response dataset. (Assuming caller is processing as soon as the first record is received)

Disadvantage

If the stream breaks, it becomes difficult to find out how much data is processed or what was the last successful record received on the caller side. This may lead to the whole resend of the dataset.

Conclusion

Streaming may not be needed in all the places, but when such a use case comes, it becomes extremely helpful. The early first record is very helpful when you do not worry about how much total time will it take as you can work on the next set of instructions in parallel.