Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Blocks (CAR) of firehose commits could be missed #186

Open
emilyhunt opened this issue Nov 10, 2023 · 16 comments
Open

Blocks (CAR) of firehose commits could be missed #186

emilyhunt opened this issue Nov 10, 2023 · 16 comments

Comments

@emilyhunt
Copy link

As of today, I switched the astronomy feeds FirehoseSubscribeReposClient to use the new BGS at wss://bsky.network/xrpc. I am getting intermittent errors on a very small fraction of posts when atproto.CAR.from_bytes is called on certain commits. This happens once every ~10 minutes (so must be caused by only a small fraction of posts); otherwise, the feed is running fine.

This is on Ubuntu 20.04, with Python 3.11.4 and atproto 0.0.30. This is the commit in the GitHub repo where issues started happening.

Stack trace of the error:

[firehose] [2023-11-10 16:43:23] thread '<unnamed>' panicked at src/lib.rs:164:67:
[firehose] [2023-11-10 16:43:23] called `Result::unwrap()` on an `Err` value: Parsing("failed to parse uvarint for header")
[firehose] [2023-11-10 16:43:23] note: run with `RUST_BACKTRACE=1` environment variable to display a backtrace
[firehose] [2023-11-10 16:43:23] Process Process-1:
[firehose] [2023-11-10 16:43:23] Traceback (most recent call last):
[firehose] [2023-11-10 16:43:23]   File "/workspace/.heroku/python/lib/python3.11/multiprocessing/process.py", line 314, in _bootstrap
[firehose] [2023-11-10 16:43:23]     self.run()
[firehose] [2023-11-10 16:43:23]   File "/workspace/.heroku/python/lib/python3.11/multiprocessing/process.py", line 108, in run
[firehose] [2023-11-10 16:43:23]     self._target(*self._args, **self._kwargs)
[firehose] [2023-11-10 16:43:23]   File "/workspace/server/firehose.py", line 99, in worker_main
[firehose] [2023-11-10 16:43:23]     _worker_loop(receiver)
[firehose] [2023-11-10 16:43:23]   File "/workspace/server/firehose.py", line 85, in _worker_loop
[firehose] [2023-11-10 16:43:23]     operations_callback(_get_ops_by_type(commit))
[firehose] [2023-11-10 16:43:23]                         ^^^^^^^^^^^^^^^^^^^^^^^^
[firehose] [2023-11-10 16:43:23]   File "/workspace/server/firehose.py", line 30, in _get_ops_by_type
[firehose] [2023-11-10 16:43:23]     car = CAR.from_bytes(commit.blocks)
[firehose] [2023-11-10 16:43:23]           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
[firehose] [2023-11-10 16:43:23]   File "/workspace/.heroku/python/lib/python3.11/site-packages/atproto/car/__init__.py", line 51, in from_bytes
[firehose] [2023-11-10 16:43:23]     header, blocks = libipld.decode_car(data)
[firehose] [2023-11-10 16:43:23]                      ^^^^^^^^^^^^^^^^^^^^^^^^
[firehose] [2023-11-10 16:43:23] pyo3_runtime.PanicException: called `Result::unwrap()` on an `Err` value: Parsing("failed to parse uvarint for header")

I've added more logging on the feed and set RUST_BACKTRACE=1. Will update this issue if I can work out which commits are causing the problem.

@emilyhunt
Copy link
Author

Full stacktrace with RUST_BACKTRACE=1, maybe something here helps:

thread '<unnamed>' panicked at src/lib.rs:164:67:
called `Result::unwrap()` on an `Err` value: Parsing("failed to parse uvarint for header")
stack backtrace:
   0: rust_begin_unwind
             at /rustc/cc66ad468955717ab92600c770da8c1601a4ff33/library/std/src/panicking.rs:595:5
   1: core::panicking::panic_fmt
             at /rustc/cc66ad468955717ab92600c770da8c1601a4ff33/library/core/src/panicking.rs:67:14
   2: core::result::unwrap_failed
             at /rustc/cc66ad468955717ab92600c770da8c1601a4ff33/library/core/src/result.rs:1652:5
   3: libipld::_::__pyfunction_decode_car
   4: pyo3::impl_::trampoline::trampoline
   5: libipld::_::<impl libipld::decode_car::MakeDef>::DEF::trampoline
   6: _PyEval_EvalFrameDefault
             at /tmp/python-build.20230707110648.1880/Python-3.11.4/Python/ceval.c:5096:29
   7: _PyEval_EvalFrame
             at /tmp/python-build.20230707110648.1880/Python-3.11.4/./Include/internal/pycore_ceval.h:73:16
   8: _PyEval_Vector
             at /tmp/python-build.20230707110648.1880/Python-3.11.4/Python/ceval.c:6439:24
   9: _PyVectorcall_Call
             at /tmp/python-build.20230707110648.1880/Python-3.11.4/Objects/call.c:245:16
  10: _PyObject_Call
             at /tmp/python-build.20230707110648.1880/Python-3.11.4/Objects/call.c:328:16
  11: do_call_core
             at /tmp/python-build.20230707110648.1880/Python-3.11.4/Python/ceval.c:7357:12
  12: _PyEval_EvalFrameDefault
             at /tmp/python-build.20230707110648.1880/Python-3.11.4/Python/ceval.c:5381:22
  13: _PyEval_EvalFrame
             at /tmp/python-build.20230707110648.1880/Python-3.11.4/./Include/internal/pycore_ceval.h:73:16
  14: _PyEval_Vector
             at /tmp/python-build.20230707110648.1880/Python-3.11.4/Python/ceval.c:6439:24
  15: _PyObject_FastCallDictTstate
             at /tmp/python-build.20230707110648.1880/Python-3.11.4/Objects/call.c:141:15
  16: _PyObject_Call_Prepend
             at /tmp/python-build.20230707110648.1880/Python-3.11.4/Objects/call.c:482:24
  17: slot_tp_init
             at /tmp/python-build.20230707110648.1880/Python-3.11.4/Objects/typeobject.c:7863:15
  18: type_call
             at /tmp/python-build.20230707110648.1880/Python-3.11.4/Objects/typeobject.c:1112:19
  19: _PyObject_MakeTpCall
             at /tmp/python-build.20230707110648.1880/Python-3.11.4/Objects/call.c:214:18
  20: _PyEval_EvalFrameDefault
             at /tmp/python-build.20230707110648.1880/Python-3.11.4/Python/ceval.c:4774:23
  21: _PyEval_EvalFrame
             at /tmp/python-build.20230707110648.1880/Python-3.11.4/./Include/internal/pycore_ceval.h:73:16
  22: _PyEval_Vector
             at /tmp/python-build.20230707110648.1880/Python-3.11.4/Python/ceval.c:6439:24
  23: PyEval_EvalCode
             at /tmp/python-build.20230707110648.1880/Python-3.11.4/Python/ceval.c:1154:21
  24: run_eval_code_obj
             at /tmp/python-build.20230707110648.1880/Python-3.11.4/Python/pythonrun.c:1714:9
  25: run_mod
             at /tmp/python-build.20230707110648.1880/Python-3.11.4/Python/pythonrun.c:1735:19
  26: PyRun_StringFlags
             at /tmp/python-build.20230707110648.1880/Python-3.11.4/Python/pythonrun.c:1605:15
  27: PyRun_SimpleStringFlags
             at /tmp/python-build.20230707110648.1880/Python-3.11.4/Python/pythonrun.c:487:9
  28: pymain_run_command
             at /tmp/python-build.20230707110648.1880/Python-3.11.4/Modules/main.c:255:11
  29: pymain_run_python
             at /tmp/python-build.20230707110648.1880/Python-3.11.4/Modules/main.c:592:21
  30: Py_RunMain
             at /tmp/python-build.20230707110648.1880/Python-3.11.4/Modules/main.c:680:5
  31: pymain_main
             at /tmp/python-build.20230707110648.1880/Python-3.11.4/Modules/main.c:710:12
  32: Py_BytesMain
             at /tmp/python-build.20230707110648.1880/Python-3.11.4/Modules/main.c:734:12
  33: __libc_start_main
  34: _start

@MarshalX
Copy link
Owner

Let's isolate from the feed-generator and try to reproduce it with this example: https://github.com/MarshalX/atproto/blob/main/examples/firehose/process_commits.py

i changed the 90th line from this:

client = FirehoseSubscribeReposClient(params)

to this:

client = FirehoseSubscribeReposClient(params, 'wss://bsky.network/xrpc')

and run locally on a mac; the error doesn't happen at least at start. could you try too?

@MarshalX
Copy link
Owner

MarshalX commented Nov 10, 2023

the source of error comes from iroh-car lib:

https://github.com/n0-computer/iroh-car/blob/bf4ce988a120d8b74de73421f8fc6c7e10479c18/src/reader.rs#L24-L41

but the main question, for now, is what the value of "commit.blocks" passed to CAR.from_bytes. could you pls log it?

@MarshalX
Copy link
Owner

example works fine for more than 20 mins. but i reproduce it. pls double-check did you modified it somehow or not in comparison with the example that I mentioned above. the error happens because of an empty binary string that passed for decoding

from atproto import CAR
CAR.from_bytes(b"")
image

@emilyhunt
Copy link
Author

I tried running the firehose for a bit and yep, it looks like an empty binary string. These are the arguments in the commit that broke it:

commit.repo: 'did:plc:7ccq5bfdp3jk4acvcnnevds4'
commit.ops: []  # i.e. an empty list
commit.blocks: b''  # empty binary string - same as you

I get the same results when doing CAR.from_bytes(b''):

image

@MarshalX
Copy link
Owner

my firehose example still running fine. idk what kind of commit you are receiving :(

@emilyhunt
Copy link
Author

That's pretty strange!

My code is a little different for handling the firehose. I removed saving the cursor, and it certainly doesn't continually update the client's params (this was back before we had a wonderful rust library to speed up the python sdk.) Could that be the issue?

There's also only one worker thread (because of AWS limitations, can't use multiprocessing.Queue) and stuff is passed from the main thread to the worker with a multiprocessing.Pipe object instead. I'd be surprised if that's an issue, though, as I bet Queue uses pipes to communicate between threads too...

I'll try running the minimal example linked above (process_commits.py) for a while and see if I can reproduce the issue on my machine. I at least now have a bugfix that stops CAR.from_bytes() getting called on empty strings, so my feed is working for now =)

@emilyhunt
Copy link
Author

It didn't crash after 150 minutes, so I assume the issue has something to do with my specific code (maybe the cursor stuff? not sure.)

Either ways, thanks for the help! ^^

@AurelWu
Copy link

AurelWu commented Nov 15, 2023

We stumbled upon the same error and for now circumvent the error by just catching the panic (which throws a BaseException and not a normal Exception). This way the malformed message just gets ignored and so far it runs stable since 3 days now.

try:
    commit = parse_subscribe_repos_message(message)
    # we need to be sure that it's commit message with .blocks inside
    if not isinstance(commit, models.ComAtprotoSyncSubscribeRepos.Commit):
        return

    repo = commit.repo

    car = CAR.from_bytes(commit.blocks)
except Exception as e:
    print(f"Error parsing Commit\n{e}")
    return
except BaseException as e:
    print(f"Error parsing Commit\n{e}")
    return

@MarshalX
Copy link
Owner

MarshalX commented Nov 15, 2023

We stumbled upon the same error and for now circumvent the error by just catching the panic (which throws a BaseException and not a normal Exception). This way the malformed message just gets ignored and so far it runs stable since 3 days now.

try:

    commit = parse_subscribe_repos_message(message)

    # we need to be sure that it's commit message with .blocks inside

    if not isinstance(commit, models.ComAtprotoSyncSubscribeRepos.Commit):

        return



    repo = commit.repo



    car = CAR.from_bytes(commit.blocks)

except Exception as e:

    print(f"Error parsing Commit\n{e}")

    return

except BaseException as e:

    print(f"Error parsing Commit\n{e}")

    return

Could you try to replace try-catch with "if not commit.blocks: return" pls

@emilyhunt
Copy link
Author

Could you try to replace try-catch with "if not commit.blocks: return" pls

This is almost exactly what I've been doing (and the feed is stable), may be a bit safer than having a catch all for all kinds of exception:

if commit.blocks == b'' or len(commit.ops) == 0:
    return operation_by_type

if not commit.blocks: return should work too (actually, that's neater, and I might switch to that...)

@thieflord06
Copy link

Author

@emilyhunt Did you have to change anything to not do multiprocessing? I think I may have that same limitation and my firehose has not been acting right in production.

@emilyhunt
Copy link
Author

@emilyhunt Did you have to change anything to not do multiprocessing? I think I may have that same limitation and my firehose has not been acting right in production.

Do you mean with not using a multiprocessing.Queue to allow the firehose to run on AWS? If that, then yes - here is the current firehose code I'm running, which just uses a single worker process and communicates between the main process and the worker with a multiprocessing.Pipe. This StackOverflow post has more context on this. As it stands, I don't think the default Python feed generator project can run on AWS at all, the code in firehose.py has to be modified. (It may be worth opening a new issue/discussion to chat about this further, or suggest a fix)

@MarshalX
Copy link
Owner

MarshalX commented Dec 2, 2023

image

pls add if statement like i did in the updated firehose example. feed generator repo has been updated too. i am closing it, thanks 42b74d4

@MarshalX MarshalX closed this as completed Dec 2, 2023
@MarshalX
Copy link
Owner

Hello from 2024. I am not satisfied with this at all. It is getting worse. Gonna reopen it. We need to find the real root of the problem and do something with it. I've started and got something around here bluesky-social/atproto#2893

@MarshalX MarshalX reopened this Oct 19, 2024
@MarshalX MarshalX changed the title Intermittent error on atproto.CAR.from_bytes when using new BGS firehose: "failed to parse uvarint for header" Blocks (CAR) of firehose commits could be missed Oct 19, 2024
@MarshalX
Copy link
Owner

One step closer. My PR has been merged. The investigation continues here: bluesky-social/indigo#780

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

4 participants