Skip to content

Commit

Permalink
Fix #384: change handling of SmileBufferRecycler to be thread-safe
Browse files Browse the repository at this point in the history
  • Loading branch information
cowtowncoder committed Jun 19, 2023
1 parent a87e07a commit b652d5f
Show file tree
Hide file tree
Showing 4 changed files with 93 additions and 19 deletions.
5 changes: 5 additions & 0 deletions release-notes/CREDITS-2.x
Original file line number Diff line number Diff line change
Expand Up @@ -283,3 +283,8 @@ Kyle Silver (kyle-silver@github)

* Reported *379: (avro) `logback-test.xml` in wrong place (avro/src/main/resources)
(2.15.2)

Simon Daudin (@simondaudin)

* Reported #384: `Smile` decoding issue with `NonBlockingByteArrayParser`, concurrency
(2.15.3)
5 changes: 5 additions & 0 deletions release-notes/VERSION-2.x
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,11 @@ Active maintainers:
=== Releases ===
------------------------------------------------------------------------

2.15.3 (not yet released)

#384: `Smile` decoding issue with `NonBlockingByteArrayParser`, concurrency
(reported by Simon D)

2.15.2 (30-May-2023)

#379: (avro) `logback-test.xml` in wrong place (avro/src/main/resources)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package com.fasterxml.jackson.dataformat.smile;

import java.util.concurrent.atomic.AtomicReference;

/**
* Simple helper class used for implementing simple reuse system for Smile-specific
* buffers that are used.
Expand All @@ -12,40 +14,27 @@ public class SmileBufferRecycler<T>

public final static int DEFAULT_STRING_VALUE_BUFFER_LENGTH = 64;

protected T[] _seenNamesBuffer;
protected AtomicReference<T[]> _seenNamesBuffer = new AtomicReference<>();

protected T[] _seenStringValuesBuffer;
protected AtomicReference<T[]> _seenStringValuesBuffer = new AtomicReference<>();

public SmileBufferRecycler() { }

public T[] allocSeenNamesBuffer()
{
// 11-Feb-2011, tatu: Used to alloc here; but due to generics, can't easily any more
T[] result = _seenNamesBuffer;
if (result != null) {
// let's ensure we don't retain it here, unless returned
_seenNamesBuffer = null;
// note: caller must have cleaned it up before returning
}
return result;
return _seenNamesBuffer.getAndSet(null);
}

public T[] allocSeenStringValuesBuffer()
{
// 11-Feb-2011, tatu: Used to alloc here; but due to generics, can't easily any more
T[] result = _seenStringValuesBuffer;
if (result != null) {
_seenStringValuesBuffer = null;
// note: caller must have cleaned it up before returning
}
return result;
return _seenStringValuesBuffer.getAndSet(null);
}

public void releaseSeenNamesBuffer(T[] buffer) {
_seenNamesBuffer = buffer;
_seenNamesBuffer.set(buffer);
}

public void releaseSeenStringValuesBuffer(T[] buffer) {
_seenStringValuesBuffer = buffer;
_seenStringValuesBuffer.set(buffer);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
package com.fasterxml.jackson.dataformat.smile.async;

import java.io.IOException;
import java.util.*;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import com.fasterxml.jackson.core.*;
import com.fasterxml.jackson.core.async.ByteArrayFeeder;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.*;
import com.fasterxml.jackson.databind.util.TokenBuffer;
import com.fasterxml.jackson.dataformat.smile.SmileFactory;

// for [dataformats-binary#384]
public class ConcurrentAsyncTest extends AsyncTestBase
{
public void testConcurrentHandling() throws Exception
{
Map<String, Map<String, String>> tags = new HashMap<>();
for (int i = 0; i < 10; i++) {
Map<String, String> value = new HashMap<>();
for (int j = 0; j < 10; j++) {
value.put("key_" + j, "val" + j);
}
tags.put("elt_" + i, value);
}

JsonFactory jsonFactory = new SmileFactory();
ObjectMapper objectMapper = new ObjectMapper();
ObjectWriter objectWriter = objectMapper.writer().with(jsonFactory);
jsonFactory.setCodec(objectMapper);
byte[] json = objectWriter.writeValueAsBytes(tags);
TypeReference<Map<String, Map<String, String>>> typeReference = new TypeReference<Map<String, Map<String, String>>>() {
};

ExecutorService executorService = Executors.newFixedThreadPool(10);
List<CompletableFuture<?>> futures = new ArrayList<>();

// Exact count varies but this seems to be enough to produce the problem
int count = 10_000;
for (int i = 0; i < count; i++) {
JsonParser parser = jsonFactory.createNonBlockingByteArrayParser();
ByteArrayFeeder inputFeeder = (ByteArrayFeeder) parser.getNonBlockingInputFeeder();
futures.add(CompletableFuture.supplyAsync(() -> {
try {
inputFeeder.feedInput(json, 0, json.length);
@SuppressWarnings("resource")
TokenBuffer tokenBuffer = new TokenBuffer(parser);
while (true) {
JsonToken token = parser.nextToken();
if (token == JsonToken.NOT_AVAILABLE || token == null) {
break;
}

tokenBuffer.copyCurrentEvent(parser);
}
return tokenBuffer.asParser(jsonFactory.getCodec()).readValueAs(typeReference);
} catch (IOException e) {
throw new RuntimeException(e);
} finally {
try {
inputFeeder.endOfInput();
parser.close();
} catch (IOException e) {
throw new RuntimeException(e);
}
}
}, executorService));
}

CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()])).get();
}
}

0 comments on commit b652d5f

Please sign in to comment.