From ce68a2b2ea471dcfbef86153b31f7599001bbebd Mon Sep 17 00:00:00 2001 From: Tatu Saloranta Date: Mon, 19 Jun 2023 13:25:31 -0700 Subject: [PATCH] Fix #384: change handling of SmileBufferRecycler to be thread-safe (#385) --- release-notes/CREDITS-2.x | 5 ++ release-notes/VERSION-2.x | 5 ++ .../dataformat/smile/SmileBufferRecycler.java | 27 ++----- .../smile/async/ConcurrentAsyncTest.java | 75 +++++++++++++++++++ 4 files changed, 93 insertions(+), 19 deletions(-) create mode 100644 smile/src/test/java/com/fasterxml/jackson/dataformat/smile/async/ConcurrentAsyncTest.java diff --git a/release-notes/CREDITS-2.x b/release-notes/CREDITS-2.x index f8242fbec..ce6311564 100644 --- a/release-notes/CREDITS-2.x +++ b/release-notes/CREDITS-2.x @@ -283,3 +283,8 @@ Kyle Silver (kyle-silver@github) * Reported *379: (avro) `logback-test.xml` in wrong place (avro/src/main/resources) (2.15.2) + +Simon Daudin (@simondaudin) + +* Reported #384: `Smile` decoding issue with `NonBlockingByteArrayParser`, concurrency + (2.15.3) diff --git a/release-notes/VERSION-2.x b/release-notes/VERSION-2.x index 8ab908d70..a4649cbdf 100644 --- a/release-notes/VERSION-2.x +++ b/release-notes/VERSION-2.x @@ -14,6 +14,11 @@ Active maintainers: === Releases === ------------------------------------------------------------------------ +2.15.3 (not yet released) + +#384: `Smile` decoding issue with `NonBlockingByteArrayParser`, concurrency + (reported by Simon D) + 2.15.2 (30-May-2023) #379: (avro) `logback-test.xml` in wrong place (avro/src/main/resources) diff --git a/smile/src/main/java/com/fasterxml/jackson/dataformat/smile/SmileBufferRecycler.java b/smile/src/main/java/com/fasterxml/jackson/dataformat/smile/SmileBufferRecycler.java index 90ae41d69..c3b9b03b4 100644 --- a/smile/src/main/java/com/fasterxml/jackson/dataformat/smile/SmileBufferRecycler.java +++ b/smile/src/main/java/com/fasterxml/jackson/dataformat/smile/SmileBufferRecycler.java @@ -1,5 +1,7 @@ package com.fasterxml.jackson.dataformat.smile; +import java.util.concurrent.atomic.AtomicReference; + /** * Simple helper class used for implementing simple reuse system for Smile-specific * buffers that are used. @@ -12,40 +14,27 @@ public class SmileBufferRecycler public final static int DEFAULT_STRING_VALUE_BUFFER_LENGTH = 64; - protected T[] _seenNamesBuffer; + protected AtomicReference _seenNamesBuffer = new AtomicReference<>(); - protected T[] _seenStringValuesBuffer; + protected AtomicReference _seenStringValuesBuffer = new AtomicReference<>(); public SmileBufferRecycler() { } public T[] allocSeenNamesBuffer() { - // 11-Feb-2011, tatu: Used to alloc here; but due to generics, can't easily any more - T[] result = _seenNamesBuffer; - if (result != null) { - // let's ensure we don't retain it here, unless returned - _seenNamesBuffer = null; - // note: caller must have cleaned it up before returning - } - return result; + return _seenNamesBuffer.getAndSet(null); } public T[] allocSeenStringValuesBuffer() { - // 11-Feb-2011, tatu: Used to alloc here; but due to generics, can't easily any more - T[] result = _seenStringValuesBuffer; - if (result != null) { - _seenStringValuesBuffer = null; - // note: caller must have cleaned it up before returning - } - return result; + return _seenStringValuesBuffer.getAndSet(null); } public void releaseSeenNamesBuffer(T[] buffer) { - _seenNamesBuffer = buffer; + _seenNamesBuffer.set(buffer); } public void releaseSeenStringValuesBuffer(T[] buffer) { - _seenStringValuesBuffer = buffer; + _seenStringValuesBuffer.set(buffer); } } diff --git a/smile/src/test/java/com/fasterxml/jackson/dataformat/smile/async/ConcurrentAsyncTest.java b/smile/src/test/java/com/fasterxml/jackson/dataformat/smile/async/ConcurrentAsyncTest.java new file mode 100644 index 000000000..e7dd448c3 --- /dev/null +++ b/smile/src/test/java/com/fasterxml/jackson/dataformat/smile/async/ConcurrentAsyncTest.java @@ -0,0 +1,75 @@ +package com.fasterxml.jackson.dataformat.smile.async; + +import java.io.IOException; +import java.util.*; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + +import com.fasterxml.jackson.core.*; +import com.fasterxml.jackson.core.async.ByteArrayFeeder; +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.*; +import com.fasterxml.jackson.databind.util.TokenBuffer; +import com.fasterxml.jackson.dataformat.smile.SmileFactory; + +// for [dataformats-binary#384] +public class ConcurrentAsyncTest extends AsyncTestBase +{ + public void testConcurrentHandling() throws Exception + { + Map> tags = new HashMap<>(); + for (int i = 0; i < 10; i++) { + Map value = new HashMap<>(); + for (int j = 0; j < 10; j++) { + value.put("key_" + j, "val" + j); + } + tags.put("elt_" + i, value); + } + + JsonFactory jsonFactory = new SmileFactory(); + ObjectMapper objectMapper = new ObjectMapper(); + ObjectWriter objectWriter = objectMapper.writer().with(jsonFactory); + jsonFactory.setCodec(objectMapper); + byte[] json = objectWriter.writeValueAsBytes(tags); + TypeReference>> typeReference = new TypeReference>>() { + }; + + ExecutorService executorService = Executors.newFixedThreadPool(10); + List> futures = new ArrayList<>(); + + // Exact count varies but this seems to be enough to produce the problem + int count = 10_000; + for (int i = 0; i < count; i++) { + JsonParser parser = jsonFactory.createNonBlockingByteArrayParser(); + ByteArrayFeeder inputFeeder = (ByteArrayFeeder) parser.getNonBlockingInputFeeder(); + futures.add(CompletableFuture.supplyAsync(() -> { + try { + inputFeeder.feedInput(json, 0, json.length); + @SuppressWarnings("resource") + TokenBuffer tokenBuffer = new TokenBuffer(parser); + while (true) { + JsonToken token = parser.nextToken(); + if (token == JsonToken.NOT_AVAILABLE || token == null) { + break; + } + + tokenBuffer.copyCurrentEvent(parser); + } + return tokenBuffer.asParser(jsonFactory.getCodec()).readValueAs(typeReference); + } catch (IOException e) { + throw new RuntimeException(e); + } finally { + try { + inputFeeder.endOfInput(); + parser.close(); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + }, executorService)); + } + + CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()])).get(); + } +}