Skip to content

Commit

Permalink
add 2m delay to receive tasks for deletes
Browse files Browse the repository at this point in the history
for #1361
  • Loading branch information
snarfed committed Nov 29, 2024
1 parent 5d6ef85 commit 20dc814
Show file tree
Hide file tree
Showing 5 changed files with 31 additions and 7 deletions.
5 changes: 3 additions & 2 deletions activitypub.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@
)
from ids import BOT_ACTOR_AP_IDS
from models import fetch_objects, Follower, Object, PROTOCOLS, User
from protocol import activity_id_memcache_key, Protocol
from protocol import activity_id_memcache_key, DELETE_TASK_DELAY, Protocol
import webfinger

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -1138,9 +1138,10 @@ def inbox(protocol=None, id=None):
if user and not user.existing:
logger.info(f'Automatically enabled AP server actor {actor_id} for {user.enabled_protocols}')

delay = DELETE_TASK_DELAY if type in ('Delete', 'Undo') else None
return create_task(queue='receive', id=id, as2=activity,
source_protocol=ActivityPub.LABEL, authed_as=authed_as,
received_at=util.now().isoformat())
received_at=util.now().isoformat(), delay=delay)


# protocol in subdomain
Expand Down
5 changes: 4 additions & 1 deletion atproto_firehose.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
USER_AGENT,
)
from models import Object, reset_protocol_properties
from protocol import DELETE_TASK_DELAY
from web import Web

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -330,9 +331,11 @@ def _handle(op):
logger.error(f'Unknown action {action} for {op.repo} {op.path}')
return

delay = DELETE_TASK_DELAY if op.action == 'delete' else None
try:
create_task(queue='receive', id=obj_id, source_protocol=ATProto.LABEL,
authed_as=op.repo, received_at=op.time, **record_kwarg)
authed_as=op.repo, received_at=op.time, delay=delay,
**record_kwarg)
# when running locally, comment out above and uncomment this
# logger.info(f'enqueuing receive task for {at_uri}')
except ContextError:
Expand Down
1 change: 1 addition & 0 deletions protocol.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
)

OBJECT_REFRESH_AGE = timedelta(days=30)
DELETE_TASK_DELAY = timedelta(minutes=2)

# require a follow for users on these domains before we deliver anything from
# them other than their profile
Expand Down
18 changes: 16 additions & 2 deletions tests/test_activitypub.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
from flask_app import app
from models import Follower, Object, Target
import protocol
from protocol import DELETE_TASK_DELAY
from web import Web

# have to import module, not attrs, to avoid circular import
Expand Down Expand Up @@ -662,13 +663,26 @@ def test_inbox_actor_id_on_opted_out_web_domain(self, mock_create_task, *_):
def test_inbox_create_receive_task(self, mock_create_task, *mocks):
common.RUN_TASKS_INLINE = False

author = self.make_user(ACTOR['id'], cls=ActivityPub, obj_as2=ACTOR)
self.make_user(ACTOR['id'], cls=ActivityPub, obj_as2=ACTOR)
resp = self.post('/ap/sharedInbox', json=NOTE)
self.assert_task(mock_create_task, 'receive', id='http://mas.to/note/as2',
source_protocol='activitypub', as2=NOTE,
authed_as=NOTE['actor'],
authed_as=ACTOR['id'],
received_at='2022-01-02T03:04:05+00:00')

@patch('oauth_dropins.webutil.appengine_config.tasks_client.create_task')
def test_inbox_delete_receive_task(self, mock_create_task, *mocks):
common.RUN_TASKS_INLINE = False

self.make_user(ACTOR['id'], cls=ActivityPub, obj_as2=ACTOR)
resp = self.post('/ap/sharedInbox', json=DELETE)
delayed_eta = util.to_utc_timestamp(NOW) + DELETE_TASK_DELAY.total_seconds()
self.assert_task(mock_create_task, 'receive', id=DELETE['id'],
source_protocol='activitypub', as2=DELETE,
authed_as=ACTOR['id'],
received_at='2022-01-02T03:04:05+00:00',
eta_seconds=delayed_eta)

def test_inbox_reply_object(self, mock_head, mock_get, mock_post):
self._test_inbox_reply(REPLY_OBJECT, mock_head, mock_get, mock_post)

Expand Down
9 changes: 7 additions & 2 deletions tests/test_atproto_firehose.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import common
from models import Object, Target
import protocol
from protocol import DELETE_TASK_DELAY
from .testutil import TestCase
from .test_atproto import DID_DOC
from web import Web
Expand Down Expand Up @@ -490,9 +491,11 @@ def test_delete_post(self, mock_create_task):
'actor': 'did:plc:user',
'object': obj_id,
}
delayed_eta = util.to_utc_timestamp(NOW) + DELETE_TASK_DELAY.total_seconds()
self.assert_task(mock_create_task, 'receive', id=delete_id,
our_as1=expected_as1, source_protocol='atproto',
authed_as='did:plc:user', received_at='1900-02-04')
authed_as='did:plc:user', received_at='1900-02-04',
eta_seconds=delayed_eta)

def test_delete_block_is_undo(self, mock_create_task):
commits.put(Op(repo='did:plc:user', action='delete', seq=789,
Expand All @@ -510,9 +513,11 @@ def test_delete_block_is_undo(self, mock_create_task):
'actor': 'did:plc:user',
'object': obj_id,
}
delayed_eta = util.to_utc_timestamp(NOW) + DELETE_TASK_DELAY.total_seconds()
self.assert_task(mock_create_task, 'receive', id=undo_id,
our_as1=expected_as1, source_protocol='atproto',
authed_as='did:plc:user', received_at='1900-02-04')
authed_as='did:plc:user', received_at='1900-02-04',
eta_seconds=delayed_eta)

def test_unsupported_type(self, mock_create_task):
orig_objs = Object.query().count()
Expand Down

0 comments on commit 20dc814

Please sign in to comment.