Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(dispatch aws plugin): adds support for decompressing signals #5523

Open
wants to merge 2 commits into
base: master
Choose a base branch
from

Conversation

mvilanova
Copy link
Contributor

Summary of Changes in plugin.py

Imports

  • Added imports for base64 and gzip.
  • Reordered imports for better organization.

New Function

  • Added a new function decompress_json to handle decompression of base64 encoded gzipped JSON strings.
def decompress_json(compressed_str: str) -> str:
    """Decompress a base64 encoded gzipped JSON string."""
    decoded = base64.b64decode(compressed_str)
    decompressed = gzip.decompress(decoded)
    return decompressed.decode("utf-8")

Class AWSSQSSignalConsumerPlugin

  • Updated the description attribute for better readability.
  • Added logic to handle decompression of messages if they are marked as compressed with gzip.
  • Improved logging messages for better clarity and consistency.

Detailed Changes

  • Description Update: Changed description from "Uses sqs to consume signals" to "Uses SQS to consume signals."
  • Message Handling: Added logic to check for a compressed attribute in MessageAttributes and decompress the message body if necessary.
  • Logging Improvements: Enhanced log messages to be more descriptive and consistent.
if message_attributes.get("compressed", {}).get("StringValue") == "gzip":
    # Message is compressed, decompress it
    message_body = decompress_json(message_body)
  • Error Handling: Improved error messages for ValidationError, IntegrityError, and general exceptions to provide more context.
log.warning(
    f"Received a signal instance that does not conform to the `SignalInstanceCreate` structure. Skipping creation: {e}"
)
log.info(
    f"Received a signal instance that already exists in the database. Skipping creation: {signal_instance_in.raw['id']}"
)
log.exception(
    f"Encountered an Integrity error when trying to create a signal instance: {e}"
)
log.exception(f"Unable to create signal instance: {e}")
  • Signal Logging: Updated log message for received signals to include both the signal name and ID.
log.debug(
    f"Received a signal with name {signal_instance.signal.name} and id {signal_instance.signal.id}"
)

These changes enhance the functionality of the AWSSQSSignalConsumerPlugin by adding support for compressed messages and improving the clarity and consistency of log messages.

@mvilanova mvilanova added the enhancement New feature or request label Nov 22, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants