Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

msgpack errors when using iter() with intervals between each batch call #121

Open
BurnzZ opened this issue May 4, 2019 · 11 comments
Open
Labels

Comments

@BurnzZ
Copy link
Member

BurnzZ commented May 4, 2019

Good Day!

I've encountered this peculiar issue when trying to save up memory by processing the items in chunks. Here's a strip down version of the code for reproduction of the issue:

import pandas as pd

from scrapinghub import ScrapinghubClient

def read_job_items_by_chunk(jobkey, chunk=10000):
    """In order to prevent OOM issues, the jobs' data must be read in
    chunks.

    This will return a generator of pandas DataFrames.
    """

    client = ScrapinghubClient("APIKEY123")

    item_generator = client.get_job(jobkey).items.iter()

    while item_generator:
        yield pd.DataFrame(
            [next(item_generator) for _ in range(chunk)]
        )

for df_chunk in read_job_items_by_chunk('123/123/123'):
    # having a small chunk-size like 10000 won't have any problems

for df_chunk in read_job_items_by_chunk('123/123/123', chunk=25000):
    # having a bug chunk-size like 25000 will throw out errors like the one below

Here's the common error it throws:

<omitted stack trace above>

    [next(item_generator) for _ in range(chunk)]
  File "/usr/local/lib/python2.7/site-packages/scrapinghub/client/proxy.py", line 115, in iter
    _path, requests_params, **apiparams
  File "/usr/local/lib/python2.7/site-packages/scrapinghub/hubstorage/serialization.py", line 33, in mpdecode
    for obj in unpacker:
  File "msgpack/_unpacker.pyx", line 459, in msgpack._unpacker.Unpacker.__next__ (msgpack/_unpacker.cpp:459)
  File "msgpack/_unpacker.pyx", line 390, in msgpack._unpacker.Unpacker._unpack (msgpack/_unpacker.cpp:390)
  File "/usr/local/lib/python2.7/encodings/utf_8.py", line 16, in decode
    return codecs.utf_8_decode(input, errors, True)
UnicodeDecodeError: 'utf8' codec can't decode byte 0x8b in position 67: invalid start byte

Moreover, it throws out a different error when using a much bigger chunk-size, like 50000:

<omitted stack trace above>

    [next(item_generator) for _ in range(chunk)]
  File "/usr/local/lib/python2.7/site-packages/scrapinghub/client/proxy.py", line 115, in iter
    _path, requests_params, **apiparams
  File "/usr/local/lib/python2.7/site-packages/scrapinghub/hubstorage/serialization.py", line 33, in mpdecode
    for obj in unpacker:
  File "msgpack/_unpacker.pyx", line 459, in msgpack._unpacker.Unpacker.__next__ (msgpack/_unpacker.cpp:459)
  File "msgpack/_unpacker.pyx", line 390, in msgpack._unpacker.Unpacker._unpack (msgpack/_unpacker.cpp:390)
TypeError: unhashable type: 'dict'

I find that the workaround/solution for this would be to have a lower value for chunk. So far, 1000 works great.

This uses scrapy:1.5 stack in Scrapy Cloud.

I'm guessing this might have something to do with the long waiting time that happens when processing the pandas DataFrame chunk, and when the next batch of items are being iterated, the server might have deallocated the pointer to it or something.

May I ask if there might be a solution for this? since a much bigger chunk size will help with the speed of our jobs.

I've marked it as bug for now as this is quite an unexpected/undocumented behavior.

Cheers!

@BurnzZ BurnzZ changed the title Msgpack errors msgpack errors when using iter() with intervals between each batch call May 4, 2019
@BurnzZ BurnzZ added the bug label May 4, 2019
@vshlapakov
Copy link
Contributor

Hey Kevin, thanks for the report! Short question, have you used latest 2.1.1 version? There was a hidden bug up to the version which could lead to the wrong iteration behavior.

@vshlapakov
Copy link
Contributor

If yes, there's something else you could try. In the current implementation you try to iterate through all items at once, and assuming the amount of items is huge and processing data takes time - it's possible to reach some timeout. But as you know the desired chunk size beforehand and it's large enough, you could send a request per chunk using pagination parameters and handle the data with pandas as long as you need, something like this:

In [34]: def read_job_items_by_chunk(job, chunk=10000):
    ...:     processed = 0
    ...:     while True:
    ...:         next_key = job.key + '/' + str(processed)
    ...:         items = [item for item in job.items.iter(count=chunk, start=next_key, meta='_key')]
    ...:         yield items
    ...:         processed += len(items)
    ...:         if len(items) < chunk:
    ...:             break

@BurnzZ
Copy link
Member Author

BurnzZ commented May 19, 2019

Thanks @vshlapakov! The project was using 2.0.3. I'll try to use 2.1.1 to confirm if it indeed fixes up the wrong iteration behavior. I should have some results to verify in some weeks.

Otherwise, I'll try to use the the pagination suggestion you've introduced. Cheers!

@BurnzZ
Copy link
Member Author

BurnzZ commented Jun 10, 2019

Hi @vshlapakov, reporting in that version 2.1.1 still has this problem in it. I'll now proceed in trying out the pagination behavior you've suggested :)

@vshlapakov
Copy link
Contributor

Gotcha, thanks for the update! Let me know when you test the approach 👍

@BurnzZ
Copy link
Member Author

BurnzZ commented Oct 4, 2019

Hi @vshlapakov, I've made a PR in #133 from your suggestion. I think having this convenient method would be really helpful in cases where we're processing a large number of items.

@manycoding, I see that this might also be of use to arche from your issue in scrapinghub/arche#140.

Thanks!

@hermit-crab
Copy link
Contributor

hermit-crab commented Nov 23, 2019

Would it not be nicer to have it as a default (at some point) behavior in normal .iter()? I imagine people will start using it for no other reason then to avoid connection issues and everyone ends up with a useless nesting level in their code. Backward compatible alternative:

def iter(..., buffer: Optional[int] = None, in_chunks: bool = False):
    if not buffer:
        ... # proceed as usual
        return

    for chunk in self._list_iter(...):
        if in_chunks:
            # for those actually needing chunks
            yield chunk
        else:
            yield from chunk

This also addresses #135 as it would implicitly enable pagination. Never mind this comment, I've confused the resource being iterated in there.

@BurnzZ
Copy link
Member Author

BurnzZ commented Nov 30, 2019

I do get where you're coming from @hermit-crab, though I think iter() and iter_by_chunks() will have a different structure of items being returned. iter() will be a generator of items in this case whilst iter_by_chunks() is a generator of list of items.

@hermit-crab
Copy link
Contributor

hermit-crab commented Nov 30, 2019

Thank you for the reply @BurnzZ. Yes, I understand they will generate different structures but to elaborate more on what I mean is I believe this solution creates a situation where you have 2 distinct methods which do roughly the same thing (retrieving a resource of a job) with one of them being clearly preferable over the other despite slightly different and less commonly needed output format of the same data. At that point why would you ever use iter()? If the docs will mention on iter() that in case your data is large (TODO define large), to avoid memory and connection issues see iter_by_chunks(), then the most reasonable thing a user would do is to just use that method right off the bat to avoid dealing with issues later when their data may grow. Even if that means extra nesting level that they won't need.

For instance on the issue mentioned above scrapinghub/shub-workflow#5, the solution will end up being something like in here:

    def _process_job_items(self, scrapername, spider_job):
        first_keyprefix = None
        items_gen = (item for chunk in spider_job.items.iter_in_chunks() for item in chunk)
        # or any other variation of flattening a list
        for item in items_gen:

While I think it would be nicer to just provide a flag to spider_job.items.iter() to enable a chunk read or for it to be enabled implicitly via a flag to the sh client on creation.

That would be similar style to how pandas do it to allow for io/memory efficient reads with chunksize parameter on their read_*() methods. For pandas tho they don't have a choice but to start generating chunks since DataFrame/Series are more or less immutable objects that you cannot easily stream. In case of iter() we can abstract the buffer away and provide a flag for cases where it's really needed.

@vshlapakov
Copy link
Contributor

vshlapakov commented Dec 17, 2019

@hermit-crab That makes sense to me, the methods are very close to each other, the only major difference is the output format, while the list_iter "pagination" logic would be preferable in a default case in the long term. At the same time, I would try avoiding the situation when the public iter method returns a different result type depending on input, but it should be solvable via a common internal helper serving both iter and list_iter.

I'm going to close this issue as the original problem is solved, but I'm looking forward to improve it if possible, when we agree on the implementation.

@theumairahmed
Copy link

theumairahmed commented Aug 15, 2023

Hi @BurnzZ, @hermit-crab, @vshlapakov, @Gallaecio,

I have observed a situation (rare and random) where using iter() with even a low value of count=1000 is causing msgpack errors in this job: https://app.zyte.com/p/435191/873/1150/log?line=35838.

You can see in the urllib3 debug log that it's making the following API call in the backend:
[stderr] 2023-08-13 13:18:29 [urllib3.connectionpool] DEBUG: [https://storage.scrapinghub.com:443](https://storage.scrapinghub.com/) "GET /items/435191/897/44?start=435191%2F897%2F44%2F35697000&count=1000&meta=_key HTTP/1.1" 200 None.

Worth mentioning here that even though count=1000 but the start value (435191/897/44/35697000) is huge as the job is processing around 43M items in chunks of 1000. Would converting the iterator to a list help solve this issue? Let me know if I should open up an issue for this or if you need more input from me. Thanks.

@theumairahmed theumairahmed reopened this Aug 16, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

4 participants