Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add @atomic_output decorator for outputs that get written to in blocks #157

Merged
merged 3 commits into from
Nov 7, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
129 changes: 129 additions & 0 deletions src/dolphin/_decorators.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
from __future__ import annotations

import functools
import shutil
import tempfile
from pathlib import Path
from typing import Any, Callable

from dolphin._log import get_log

logger = get_log(__name__)

__all__ = [
"atomic_output",
]


def atomic_output(
output_arg: str = "output_file",
is_dir: bool = False,
use_tmp: bool = False,
) -> Callable:
"""Use a temporary file/directory for the `output_arg` until the function finishes.

Decorator is used on a function which writes to an output file/directory in blocks.
If the function were interrupted, the file/directory would be partially complete.

This decorator replaces the final output name with a temp file/dir, and then
renames the temp file/dir to the final name after the function finishes.

Note that when `is_dir=True`, `output_arg` can be a directory (if multiple files
are being written to). In this case, the entire directory is temporary, and
renamed after the function finishes.

Parameters
----------
output_arg : str, optional
The name of the argument to replace, by default 'output_file'
is_dir : bool, default = False
If `True`, the output argument is a directory, not a file
use_tmp : bool, default = False
If `False`, uses the parent directory of the desired output, with
a random suffix added to the name to distinguish from actual output.
If `True`, uses the `/tmp` directory (or wherever the default is
for the `tempfile` module).

Returns
-------
Callable
The decorated function

Raises
------
FileExistsError
if the file for `output_arg` already exists (if out_dir=False), or
if the directory at `output_arg` exists and is non-empty.

Notes
-----
The output at `output_arg` *must not* exist already, or the decorator will error
(though if `is_dir=True`, it is allowed to be an empty directory).
The function being decorated *must* be called with keyword args for `output_arg`.
"""

def decorator(func: Callable) -> Callable:
@functools.wraps(func)
def wrapper(*args, **kwargs) -> Any:
# Extract the output file path
if output_arg in kwargs:
final_out_name = kwargs[output_arg]
else:
raise FileExistsError(
f"Argument {output_arg} not found in function {func.__name__}:"
f" {kwargs}"
)

final_path = Path(final_out_name)
# Make sure the desired final output doesn't already exist
_raise_if_exists(final_path, is_dir=is_dir)
# None means that tempfile will use /tmp
tmp_dir = final_path.parent if not use_tmp else None

# Make the tempfile start the same as the desired output
prefix = final_path.name
if is_dir:
# Create a temporary directory
temp_path = tempfile.mkdtemp(dir=tmp_dir, prefix=prefix)
else:
# Create a temporary file
_, temp_path = tempfile.mkstemp(dir=tmp_dir, prefix=prefix)
logger.debug("Writing to temp file %s instead of %s", temp_path, final_path)

try:
# Replace the output file path with the temp file
# It would be like this if we only allows keyword:
kwargs[output_arg] = temp_path
# Execute the original function
result = func(*args, **kwargs)
# Move the temp file to the final location
logger.debug("Moving %s to %s", temp_path, final_path)
shutil.move(temp_path, final_path)

return result
finally:
logger.debug("Cleaning up temp file %s", temp_path)
# Different cleanup is needed
if is_dir:
shutil.rmtree(temp_path, ignore_errors=True)
else:
Path(temp_path).unlink(missing_ok=True)

return wrapper

return decorator


def _raise_if_exists(final_path: Path, is_dir: bool):
if final_path.exists():
err_msg = f"{final_path} already exists"
if is_dir and final_path.is_dir():
try:
final_path.rmdir()
except OSError as e:
if "Directory not empty" not in e.args[0]:
raise e
else:
raise FileExistsError(err_msg)
else:
raise FileExistsError(err_msg)
97 changes: 97 additions & 0 deletions tests/test_decorators.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
import threading
import time
from pathlib import Path

from dolphin._decorators import atomic_output


def _long_write(filename, pause: float = 0.2):
"""Simulate a long writing process"""
f = open(filename, "w")
f.write("aaa\n")
time.sleep(pause)
f.write("bbb\n")
f.close()


@atomic_output(output_arg="outname")
def default_write_newname(outname="out.txt"):
_long_write(outname)


@atomic_output(output_arg="outname", use_tmp=True)
def default_write_newname_tmp(outname="out.txt"):
_long_write(outname)


@atomic_output(output_arg="output_dir", is_dir=True)
def default_write_dir(output_dir="some_dir", filename="testfile.txt"):
p = Path(output_dir)
p.mkdir(exist_ok=True, parents=True)
outname = p / filename
_long_write(outname)


@atomic_output(output_arg="output_dir", is_dir=True, use_tmp=True)
def default_write_dir_tmp(output_dir="some_dir", filename="testfile.txt"):
p = Path(output_dir)
p.mkdir(exist_ok=True, parents=True)
outname = p / filename
_long_write(outname)


def test_atomic_output(tmpdir):
with tmpdir.as_cwd():
default_write_newname(outname="out1.txt")
default_write_newname(outname="out2.txt")
for fn in ["out1.txt", "out2.txt"]:
assert Path(fn).exists()


def test_atomic_output_tmp(tmpdir):
with tmpdir.as_cwd():
default_write_newname_tmp(outname="out1.txt")
assert Path("out1.txt").exists()


def test_atomic_output_dir(tmp_path):
out_dir = tmp_path / "out"
filename = "testfile.txt"
out_dir.mkdir()
default_write_dir(output_dir=out_dir, filename=filename)
assert Path(out_dir / filename).exists()


def test_atomic_output_dir_tmp(tmp_path):
out_dir = tmp_path / "out"
filename = "testfile.txt"
out_dir.mkdir()
default_write_dir(output_dir=out_dir, filename=filename)
assert Path(out_dir / filename).exists()


def test_atomic_output_name_swap_file(tmpdir):
with tmpdir.as_cwd():
outname2 = "out3.txt"
t = threading.Thread(target=default_write_newname, kwargs={"outname": outname2})
t.start()
# It should NOT exist, yet
assert not Path(outname2).exists()
time.sleep(0.5)
t.join()
assert Path(outname2).exists()


def test_atomic_output_dir_swap(tmp_path):
# Kick off the writing function in the background
# so we see if a different file was created
# Check it works providing the "args"
out_dir = tmp_path / "out"
out_dir.mkdir()
t = threading.Thread(target=default_write_dir, kwargs={"output_dir": out_dir})
t.start()
# It should NOT exist, yet
assert not Path(out_dir / "testfile.txt").exists()
time.sleep(0.5)
t.join()
assert Path(out_dir / "testfile.txt").exists()