Skip to content

Commit

Permalink
error handling and directory and file check improved
Browse files Browse the repository at this point in the history
  • Loading branch information
Xin Zheng committed Oct 17, 2024
1 parent 1088679 commit ff8c2f9
Show file tree
Hide file tree
Showing 3 changed files with 64 additions and 47 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ public class FileReadStream<T> implements ReadStream<T>, Closeable {
private Handler<Throwable> exceptionHandler;
private final InboundBuffer<Buffer> queue;

private int readBufferSize = DEFAULT_READ_BUFFER_SIZE;
private final int readBufferSize = DEFAULT_READ_BUFFER_SIZE;
private long writtenBytes = 0;

/**
Expand Down Expand Up @@ -159,7 +159,6 @@ private synchronized void doRead(ByteBuffer bb) {
}

private void doRead(Buffer writeBuff, int offset, ByteBuffer buff, long position, Handler<AsyncResult<Buffer>> handler) {

// ReadableByteChannel doesn't have a completion handler, so we wrap it into
// an executeBlocking and use the future there
vertx.executeBlocking(future -> {
Expand Down Expand Up @@ -243,6 +242,10 @@ private void check() {
}
}

public boolean isClosed() {
return this.closed;
}

private void checkContext() {
if (!vertx.getOrCreateContext().equals(context)) {
throw new IllegalStateException("AsyncInputStream must only be used in the context that created it, expected: " + this.context
Expand All @@ -257,7 +260,6 @@ private synchronized void closeInternal(Handler<AsyncResult<Void>> handler) {
}

private void doClose(Handler<AsyncResult<Void>> handler) {

try {
ch.close();
if (handler != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,7 @@
import org.swisspush.reststorage.CollectionResource;
import org.swisspush.reststorage.DocumentResource;
import org.swisspush.reststorage.Resource;
import software.amazon.nio.spi.s3.S3FileSystem;

import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
Expand All @@ -32,12 +30,11 @@ public class S3FileSystemDirLister {
private static final Logger log = LoggerFactory.getLogger(S3FileSystemDirLister.class);
private final Vertx vertx;
private final String root;
private final S3FileSystem fileSystem;

public S3FileSystemDirLister(Vertx vertx, S3FileSystem fileSystem, String root) {

public S3FileSystemDirLister(Vertx vertx, String root) {
this.vertx = vertx;
this.root = root;
this.fileSystem = fileSystem;
}

public void handleListingRequest(Path path, final int offset, final int count, final Handler<Resource> handler) {
Expand Down Expand Up @@ -119,17 +116,4 @@ private void listDirBlocking(Path path, int offset, int count, Promise<Collectio
log.warn("May we should do something here. I've no idea why old implementation did nothing.");
}
}

private String canonicalizeVirtualPath(String path) {
return canonicalizeRealPath(root + path);
}

private static String canonicalizeRealPath(String path) {
try {
return new File(path).getCanonicalPath();
} catch (IOException e) {
throw new RuntimeException(e);
}
}

}
83 changes: 57 additions & 26 deletions src/main/java/org/swisspush/reststorage/s3/S3FileSystemStorage.java
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ public S3FileSystemStorage(Vertx vertx, RestStorageExceptionFactory exceptionFac
throw new RuntimeException(e);
}

this.fileSystemDirLister = new S3FileSystemDirLister(vertx, fileSystem, root.toString());
this.fileSystemDirLister = new S3FileSystemDirLister(vertx, root.toString());
// Unify format for simpler work.
String tmpRoot;
tmpRoot = root.toString();
Expand All @@ -113,19 +113,30 @@ public void get(String path, String etag, final int offset, final int count, fin
DocumentResource d = new DocumentResource();
try {
final InputStream inputStream = Files.newInputStream(fullFilePath);
final FileReadStream<io.vertx.core.buffer.Buffer> readStream = new FileReadStream<>(vertx, d.length, path, inputStream);
d.length = Files.size(fullFilePath);
log.debug("Successfully opened '{}' which is {} bytes in size.", path, d.length);

final FileReadStream<io.vertx.core.buffer.Buffer> readStream = new FileReadStream<>(vertx, d.length, path, inputStream);
d.readStream = readStream;
d.closeHandler = v -> {
log.debug("Resource got closed. Close file now '{}'", path);

final Runnable cleanUp = () -> {
if (!readStream.isClosed()){
readStream.close();
}
try {
inputStream.close();
} catch (IOException e) {
log.debug("Failed to close input stream", e);
}
};
d.closeHandler = v -> {
log.debug("Resource got closed. Close file now '{}'", path);
cleanUp.run();
};
d.addErrorHandler(event -> {
log.error("Get resource failed.", exceptionFactory.newException("Close file now '" + path + "'", event));
cleanUp.run();
});
} catch (IOException e) {
log.warn("Failed to open '{}' for read", path, e);
d.error = true;
Expand Down Expand Up @@ -195,22 +206,24 @@ private void putFile(final Handler<Resource> handler, final Path fullPath) {

FileWriteStream fileWriteStream = new FileWriteStream(outputStream);
d.writeStream = fileWriteStream;
d.closeHandler = event -> {

final Runnable cleanUp = () -> {

try {
outputStream.close();
fileWriteStream.end();
d.endHandler.handle(null);
} catch (IOException e) {
log.error("Failed to close output stream:", e);
}
fileWriteStream.end();
};

d.closeHandler = event -> {
cleanUp.run();
d.endHandler.handle(null);
};
d.addErrorHandler(err -> {
log.error("Put file failed:", err);
try {
outputStream.close();
} catch (IOException e) {
log.error("Failed to close output stream:", e);
}
cleanUp.run();
});
handler.handle(d);
} catch (IOException e) {
Expand All @@ -221,14 +234,32 @@ private void putFile(final Handler<Resource> handler, final Path fullPath) {
@Override
public void delete(String path, String lockOwner, LockMode lockMode, long lockExpire, boolean confirmCollectionDelete,
boolean deleteRecursive, final Handler<Resource> handler) {
final Path fullPath = canonicalize(path, false);

// at this stage I don't know what of this path point to
final Path fullFilePath = canonicalize(path, false);
final Path fullDirPath = canonicalize(path, true);

Resource resource = new Resource();
boolean deleteRecursiveInFileSystem = true;
if (confirmCollectionDelete && !deleteRecursive) {
deleteRecursiveInFileSystem = false;
}
boolean finalDeleteRecursiveInFileSystem = deleteRecursiveInFileSystem;
if (Files.exists(fullPath, LinkOption.NOFOLLOW_LINKS)) {

// try to check as a file
boolean exists = Files.exists(fullFilePath, LinkOption.NOFOLLOW_LINKS);
Path fullPath = fullFilePath;

if (!exists) {
// not exists, maybe is a directory
exists = Files.exists(fullDirPath, LinkOption.NOFOLLOW_LINKS);
if (exists) {
// so it is a directory
fullPath = fullDirPath;
}
}

if (exists) {
try {
deleteRecursive(fullPath, finalDeleteRecursiveInFileSystem);
deleteEmptyParentDirs(fullPath.getParent());
Expand All @@ -248,6 +279,16 @@ public void delete(String path, String lockOwner, LockMode lockMode, long lockEx
}
}

@Override
public void cleanup(Handler<DocumentResource> handler, String cleanupResourcesAmount) {
// nothing to do here
}

@Override
public void storageExpand(String path, String etag, List<String> subResources, Handler<Resource> handler) {
throw new UnsupportedOperationException("Method 'storageExpand' not supported in S3FileSystemStorage");
}

/**
* Deletes all empty parent directories starting at specified directory.
*
Expand Down Expand Up @@ -296,7 +337,7 @@ private void deleteEmptyParentDirs(Path path) {
} else {
// This case should not happen. At least up to now i've no idea of a valid
// scenario for this one.
log.error("Unexpected error while deleting empty directories.", cause);
log.error("Unexpected error while deleting empty directories.", exceptionFactory.newException(cause));
}
}
}
Expand Down Expand Up @@ -338,18 +379,8 @@ private boolean isDirEmpty(final Path path) {
try (DirectoryStream<Path> dirStream = Files.newDirectoryStream(path)) {
return !dirStream.iterator().hasNext();
} catch (IOException e) {
log.debug("Error to detects is directory empty or not '{}'.", path, e);
log.debug("Error to detects is directory empty or not '{}'.", path, exceptionFactory.newException(e));
return false;
}
}

@Override
public void cleanup(Handler<DocumentResource> handler, String cleanupResourcesAmount) {
// nothing to do here
}

@Override
public void storageExpand(String path, String etag, List<String> subResources, Handler<Resource> handler) {
throw new UnsupportedOperationException("Method 'storageExpand' not supported in S3FileSystemStorage");
}
}

0 comments on commit ff8c2f9

Please sign in to comment.