Skip to content

Commit

Permalink
Add @atomic_output decorator for outputs that get written to in blo…
Browse files Browse the repository at this point in the history
…cks (#157)

* add decorator to swap a tempfile which gets written to in batches

closes #110

* make `@atomic_output` work with args or kwargs

* simplify logic by requiring kwargs
  • Loading branch information
scottstanie authored Nov 7, 2023
1 parent 3c8e702 commit 39a657a
Show file tree
Hide file tree
Showing 2 changed files with 226 additions and 0 deletions.
129 changes: 129 additions & 0 deletions src/dolphin/_decorators.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
from __future__ import annotations

import functools
import shutil
import tempfile
from pathlib import Path
from typing import Any, Callable

from dolphin._log import get_log

logger = get_log(__name__)

__all__ = [
"atomic_output",
]


def atomic_output(
output_arg: str = "output_file",
is_dir: bool = False,
use_tmp: bool = False,
) -> Callable:
"""Use a temporary file/directory for the `output_arg` until the function finishes.
Decorator is used on a function which writes to an output file/directory in blocks.
If the function were interrupted, the file/directory would be partially complete.
This decorator replaces the final output name with a temp file/dir, and then
renames the temp file/dir to the final name after the function finishes.
Note that when `is_dir=True`, `output_arg` can be a directory (if multiple files
are being written to). In this case, the entire directory is temporary, and
renamed after the function finishes.
Parameters
----------
output_arg : str, optional
The name of the argument to replace, by default 'output_file'
is_dir : bool, default = False
If `True`, the output argument is a directory, not a file
use_tmp : bool, default = False
If `False`, uses the parent directory of the desired output, with
a random suffix added to the name to distinguish from actual output.
If `True`, uses the `/tmp` directory (or wherever the default is
for the `tempfile` module).
Returns
-------
Callable
The decorated function
Raises
------
FileExistsError
if the file for `output_arg` already exists (if out_dir=False), or
if the directory at `output_arg` exists and is non-empty.
Notes
-----
The output at `output_arg` *must not* exist already, or the decorator will error
(though if `is_dir=True`, it is allowed to be an empty directory).
The function being decorated *must* be called with keyword args for `output_arg`.
"""

def decorator(func: Callable) -> Callable:
@functools.wraps(func)
def wrapper(*args, **kwargs) -> Any:
# Extract the output file path
if output_arg in kwargs:
final_out_name = kwargs[output_arg]
else:
raise FileExistsError(
f"Argument {output_arg} not found in function {func.__name__}:"
f" {kwargs}"
)

final_path = Path(final_out_name)
# Make sure the desired final output doesn't already exist
_raise_if_exists(final_path, is_dir=is_dir)
# None means that tempfile will use /tmp
tmp_dir = final_path.parent if not use_tmp else None

# Make the tempfile start the same as the desired output
prefix = final_path.name
if is_dir:
# Create a temporary directory
temp_path = tempfile.mkdtemp(dir=tmp_dir, prefix=prefix)
else:
# Create a temporary file
_, temp_path = tempfile.mkstemp(dir=tmp_dir, prefix=prefix)
logger.debug("Writing to temp file %s instead of %s", temp_path, final_path)

try:
# Replace the output file path with the temp file
# It would be like this if we only allows keyword:
kwargs[output_arg] = temp_path
# Execute the original function
result = func(*args, **kwargs)
# Move the temp file to the final location
logger.debug("Moving %s to %s", temp_path, final_path)
shutil.move(temp_path, final_path)

return result
finally:
logger.debug("Cleaning up temp file %s", temp_path)
# Different cleanup is needed
if is_dir:
shutil.rmtree(temp_path, ignore_errors=True)
else:
Path(temp_path).unlink(missing_ok=True)

return wrapper

return decorator


def _raise_if_exists(final_path: Path, is_dir: bool):
if final_path.exists():
err_msg = f"{final_path} already exists"
if is_dir and final_path.is_dir():
try:
final_path.rmdir()
except OSError as e:
if "Directory not empty" not in e.args[0]:
raise e
else:
raise FileExistsError(err_msg)
else:
raise FileExistsError(err_msg)
97 changes: 97 additions & 0 deletions tests/test_decorators.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
import threading
import time
from pathlib import Path

from dolphin._decorators import atomic_output


def _long_write(filename, pause: float = 0.2):
"""Simulate a long writing process"""
f = open(filename, "w")
f.write("aaa\n")
time.sleep(pause)
f.write("bbb\n")
f.close()


@atomic_output(output_arg="outname")
def default_write_newname(outname="out.txt"):
_long_write(outname)


@atomic_output(output_arg="outname", use_tmp=True)
def default_write_newname_tmp(outname="out.txt"):
_long_write(outname)


@atomic_output(output_arg="output_dir", is_dir=True)
def default_write_dir(output_dir="some_dir", filename="testfile.txt"):
p = Path(output_dir)
p.mkdir(exist_ok=True, parents=True)
outname = p / filename
_long_write(outname)


@atomic_output(output_arg="output_dir", is_dir=True, use_tmp=True)
def default_write_dir_tmp(output_dir="some_dir", filename="testfile.txt"):
p = Path(output_dir)
p.mkdir(exist_ok=True, parents=True)
outname = p / filename
_long_write(outname)


def test_atomic_output(tmpdir):
with tmpdir.as_cwd():
default_write_newname(outname="out1.txt")
default_write_newname(outname="out2.txt")
for fn in ["out1.txt", "out2.txt"]:
assert Path(fn).exists()


def test_atomic_output_tmp(tmpdir):
with tmpdir.as_cwd():
default_write_newname_tmp(outname="out1.txt")
assert Path("out1.txt").exists()


def test_atomic_output_dir(tmp_path):
out_dir = tmp_path / "out"
filename = "testfile.txt"
out_dir.mkdir()
default_write_dir(output_dir=out_dir, filename=filename)
assert Path(out_dir / filename).exists()


def test_atomic_output_dir_tmp(tmp_path):
out_dir = tmp_path / "out"
filename = "testfile.txt"
out_dir.mkdir()
default_write_dir(output_dir=out_dir, filename=filename)
assert Path(out_dir / filename).exists()


def test_atomic_output_name_swap_file(tmpdir):
with tmpdir.as_cwd():
outname2 = "out3.txt"
t = threading.Thread(target=default_write_newname, kwargs={"outname": outname2})
t.start()
# It should NOT exist, yet
assert not Path(outname2).exists()
time.sleep(0.5)
t.join()
assert Path(outname2).exists()


def test_atomic_output_dir_swap(tmp_path):
# Kick off the writing function in the background
# so we see if a different file was created
# Check it works providing the "args"
out_dir = tmp_path / "out"
out_dir.mkdir()
t = threading.Thread(target=default_write_dir, kwargs={"output_dir": out_dir})
t.start()
# It should NOT exist, yet
assert not Path(out_dir / "testfile.txt").exists()
time.sleep(0.5)
t.join()
assert Path(out_dir / "testfile.txt").exists()

0 comments on commit 39a657a

Please sign in to comment.