Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Faster INSERT methods #497

Open
JoaoPaes-at-Dynamox opened this issue Sep 26, 2022 · 12 comments
Open

Faster INSERT methods #497

JoaoPaes-at-Dynamox opened this issue Sep 26, 2022 · 12 comments
Assignees
Labels
api: bigquery Issues related to the googleapis/python-bigquery-sqlalchemy API. priority: p3 Desirable enhancement or fix. May not be included in next release. type: feature request ‘Nice-to-have’ improvement, new feature or different behavior or design.

Comments

@JoaoPaes-at-Dynamox
Copy link

JoaoPaes-at-Dynamox commented Sep 26, 2022

Is your feature request related to a problem? Please describe.
I need to work with large amounts of rows on an ETL service and wanted to use SQLAlchemy ORM backed by BigQuery dialect to write more pythonic code, but it is extremely slow to upload more than tens of rows.

Describe the solution you'd like
Be able to use ORM-Enabled INSERT like the SQLAlchemy docs suggest, or at least the soon-to-be legacy Session.bulk_insert_mappings and have better performance

Describe alternatives you've considered
I've tried using all methods provided by this SQLA FAQ and none could be faster than about 900 seconds for a minimum of 200 rows.

Aditional context
While the examples shows that methods like the bulk_insert_mappings could do 0.4 seconds for hundreds of thousands of rows, using BigQuery dialect I've faced 2 h 30min to upload about 4k rows. I don't expect that BQ could reach anything close to the 0.4 seconds for 100k rows, but something around 5, 6 minutes I do believe is possible, since it is the average time the BigQuery client from google.cloud.bigquery can do for an upload of about 90k rows.

@product-auto-label product-auto-label bot added the api: bigquery Issues related to the googleapis/python-bigquery-sqlalchemy API. label Sep 26, 2022
@JoaoPaes-at-Dynamox
Copy link
Author

This is the result from a test run using the referenced FAQ

Python: 3.8.10 (default, Jun 22 2022, 20:18:18)  [GCC 9.4.0]
sqlalchemy v1.4.27 (future=True)

SQLA ORM:
          Total time for 10 records 44.846 secs

SQLA ORM pk given:
          Total time for 10 records 37.176 secs

SQLA ORM-Enabled INSERT:
          Total time for 10 records 42.995 secs

SQLA ORM bulk_save_objects:
          Total time for 10 records 22.663 secs

SQLA ORM bulk_save_objects, return_defaults:
          Total time for 10 records 48.917 secs

SQLA ORM bulk_insert_mappings:
          Total time for 10 records 48.129 secs

SQLA ORM bulk_insert_mappings, return_defaults:
          Total time for 10 records 22.078 secs

SQLA Core:
          Total time for 10 records 48.751 secs

BQ Client:
          Total time for 10 records 0.727 secs

This is the code used to test the different possibilities

# -*- coding: utf-8 -*-

from datetime import datetime
from pprint import pprint
from typing import Dict, List
from uuid import uuid4
import contextlib
import time
import sys

from google.cloud.bigquery import Client, Table
from google.api_core import retry

from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import Session
from sqlalchemy import __version__, Column, DateTime, String, create_engine, insert

def make_str_id() -> str:
    return uuid4().hex

Base = declarative_base()

class Customer(Base):
    __tablename__ = "customer"

    id: str = Column('id', String, primary_key=True, default=make_str_id)
    name: str = Column('name', String)
    created_at: datetime = Column('created_at', DateTime, default=datetime.now)

    __mapper_args__ = {
        "version_id_col": created_at,
        "version_id_generator": datetime.now,
        "eager_defaults": True
    }


@contextlib.contextmanager
def sqlalchemy_session(project: str, future: bool):
    uri = f'bigquery://{project}/sandBox'
    engine = create_engine(uri, future=True)
    Base.metadata.create_all(engine, checkfirst=True)
    session = Session(
        bind=engine, future=future,
        autoflush=False, expire_on_commit=False
    )
    yield session
    session.close()
    Base.metadata.drop_all(engine, checkfirst=True)
    engine.dispose()


def print_result(name, nrows, seconds):
    print(f"{name}:\n{' '*10}Total time for {nrows} records {seconds:.3f} secs\n")


def test_sqlalchemy_orm(project: str, n=100000, future=True):
    with sqlalchemy_session(project, future) as session:
        t0 = time.time()
        for i in range(n):
            customer = Customer()
            customer.name = "NAME " + str(i)
            session.add(customer)
            if i % 1000 == 0:
                session.flush()
        session.commit()
        print_result("SQLA ORM", n, time.time() - t0)


def test_sqlalchemy_orm_pk_given(project: str, n=100000, future=True):
    with sqlalchemy_session(project, future) as session:
        t0 = time.time()
        for i in range(n):
            customer = Customer(id=make_str_id(), name="NAME " + str(i))
            session.add(customer)
            if i % 1000 == 0:
                session.flush()
        session.commit()
        print_result("SQLA ORM pk given", n, time.time() - t0)
        
        
def test_sqlalchemy_orm_enabled_insert(project: str, n=100000, future=True):
    with sqlalchemy_session(project, future) as session:
        t0 = time.time()
        session.execute(
            insert(Customer),
            [
                {"name": f"NAME{i}"}
                for i in range(n)
            ]
        )
        session.commit()
        print_result("SQLA ORM-Enabled INSERT", n, time.time() - t0)


def test_sqlalchemy_orm_bulk_save_objects(project: str, n=100000, future=True, return_defaults=False):
    with sqlalchemy_session(project, future) as session:
        t0 = time.time()
        for chunk in range(0, n, 10000):
            session.bulk_save_objects(
                [
                    Customer(name="NAME " + str(i))
                    for i in range(chunk, min(chunk + 10000, n))
                ],
                return_defaults=return_defaults,
            )
        session.commit()
        print_result(
            f"SQLA ORM bulk_save_objects{', return_defaults' if return_defaults else ''}",
            n,
            time.time() - t0,
        )


def test_sqlalchemy_orm_bulk_insert(project: str, n=100000, future=True, return_defaults=False):
    with sqlalchemy_session(project, future) as session:
        t0 = time.time()
        for chunk in range(0, n, 10000):
            session.bulk_insert_mappings(
                Customer,
                [
                    dict(name="NAME " + str(i))
                    for i in range(chunk, min(chunk + 10000, n))
                ],
                return_defaults=return_defaults,
            )
        session.commit()
        print_result(
            f"SQLA ORM bulk_insert_mappings{', return_defaults' if return_defaults else ''}",
            n,
            time.time() - t0,
        )


def test_sqlalchemy_core(project: str, n=100000, future=True):
    with sqlalchemy_session(project, future) as session:
        with session.bind.begin() as conn:
            t0 = time.time()
            conn.execute(
                insert(Customer.__table__),
                [{"name": "NAME " + str(i)} for i in range(n)],
            )
            conn.commit()
            print_result("SQLA Core", n, time.time() - t0)


@contextlib.contextmanager
def bigquery_client(project: str) -> Client:
    schema = [
        {"name": "id",
         "type": "STRING",
         "mode":"REQUIRED"},
        {"name": "name",
         "type": "STRING",
         "mode":"NULLABLE"},
        {"name": "created_at",
         "type": "DATETIME",
         "mode":"NULLABLE"},
    ]
    table_path = f"{project}.sandBox.{Customer.__tablename__}"
    with Client(project) as client:
        customer = Table(table_path, schema)
        client.create_table(customer, exists_ok=True)
        yield client
        client.delete_table(table_path)


def test_bigquery(project: str, n=100000):
    retry_ = retry.Retry((lambda e: True), 0.2, 10., 5, 10.)
    with bigquery_client(project) as client:
        client: Client
        t0 = time.time()
        rows = [
            {"id": make_str_id(),
             "name": "NAME " + str(i),
             "created_at": datetime.now()}
            for i in range(n)
        ]
        table_path = f"{project}.sandBox.{Customer.__tablename__}"
        customer = client.get_table(table_path)
        client.insert_rows(customer, rows, retry=retry_)
        print_result("BQ Client", n, time.time() - t0)


if __name__ == "__main__":
    rows = 10
    _future = True
    project_id = 'YOUR_PROJECT_ID'
    
    print(f"Python: {' '.join(sys.version.splitlines())}")
    print(f"sqlalchemy v{__version__} (future={_future})\n")

    test_sqlalchemy_orm(project_id, rows, _future)
    test_sqlalchemy_orm_pk_given(project_id, rows, _future)
    test_sqlalchemy_orm_enabled_insert(project_id, rows, _future)
    test_sqlalchemy_orm_bulk_save_objects(project_id, rows, _future)
    test_sqlalchemy_orm_bulk_save_objects(project_id, rows, _future, True)
    test_sqlalchemy_orm_bulk_insert(project_id, rows, _future)
    test_sqlalchemy_orm_bulk_insert(project_id, rows, _future, True)
    test_sqlalchemy_core(project_id, rows, _future)
    test_bigquery(project_id, rows)

@meredithslota meredithslota added the type: feature request ‘Nice-to-have’ improvement, new feature or different behavior or design. label Oct 4, 2022
@jlynchMicron
Copy link

I am in the same boat where I have to operate on large datasets and use SQLalchemy to format my BigQuery queries. Since I also use pandas when locally operating on my data, I find I have to use a mixture of these functions and modules for the best performance:

Upload:

  • google-cloud-bigquery -> load_table_from_dataframe (uses parquet to upload)
  • pandas-gbq -> to_gbq (uses parquet to upload)

Download:

  • pandas-gbq -> read_gbq (uses "use_bqstorage_api" argument for fast download)

So it appears all these methods (mostly) bypass using raw SQL data streams and instead convert data to compressed binary files for bulk data transfer. It would be convenient if all this could be accomplished via python-bigquery-sqlalchemy, but I am not sure if SQLalchemy supports underlying dialects using their own "conversion engines" (lets call it that).

I do see in the documentation for this library, there is a way to supply your own custom BQ client:
https://googleapis.dev/python/sqlalchemy-bigquery/latest/README.html#supplying-your-own-bigquery-client

Maybe there is a way to configure this client to use the faster interact methods/clients to interact with BiqQuery?

@shollyman shollyman added the priority: p3 Desirable enhancement or fix. May not be included in next release. label Mar 3, 2023
@shollyman
Copy link

BQ's high throughput ingestion is entirely divorced from the query engine, so mapping it into sqlalchemy already feels unusual. There's work unrelated to sqlalchemy to improve ingestion, but that will take time to propagate into other consumers like this project and other connectors/tools if it makes sense.

@firezdog
Copy link

firezdog commented Mar 10, 2024

BQ's high throughput ingestion is entirely divorced from the query engine, so mapping it into sqlalchemy already feels unusual. There's work unrelated to sqlalchemy to improve ingestion, but that will take time to propagate into other consumers like this project and other connectors/tools if it makes sense.

What do the first two sentences mean, and what is the logical connection between them?

I am also trying to understand why the throughput is so low. With such bad performance, this makes the library completely non-viable for most use cases. It's very hard to think of a production use case where an average of one minute to insert 10 / 20 rows into a table would be acceptable.

@shollyman
Copy link

The BigQuery Write API supports high throughput ingestion, but it is not exposed as part of the SQL query surface. This is a case of SQLAlchemy not being the right tool for an ingestion-based workload into BigQuery.

@chalmerlowe
Copy link
Collaborator

I am gonna close this ticket, due to workloads, personpower, and priorities.
There are tools available that enable high throughput ingestion.

@jlynchMicron
Copy link

@chalmerlowe,

Do you know how hard it would be to implement the dialect options "use_insertmanyvalues" and "use_insertmanyvalues_wo_returning", which were added in SQLalchemy 2.0 to give native ORM session.add() and session.add_all() support for bulk inserting? I tired manually adding it to my environment and I seem to be getting close to enabling support:

#SA ORM BQ bulk insert patch
from sqlalchemy_bigquery.base import BigQueryDialect
BigQueryDialect.use_insertmanyvalues = True
BigQueryDialect.use_insertmanyvalues_wo_returning = True

Unfortunately I seem to be having problems with parameter binding formatting in the google/cloud/bigquery/dbapi/cursor._format_operation_dict function.

I think this would be a great feature for sqlalchemy-bigquery users to get ORM nested object bulk insert support (and non-nested objects for that matter), without having to use the legacy bulk_insert_mapping and buld_save_objects methods that do not support ORM event hooks and do not support nested ORM objects.

Links:

SQLalchemy 2.0 bulk insert support: https://docs.sqlalchemy.org/en/20/changelog/whatsnew_20.html#optimized-orm-bulk-insert-now-implemented-for-all-backends-other-than-mysql

SQLalchemy commit enabling the support: sqlalchemy/sqlalchemy@2bcc97d

@jlynchMicron
Copy link

jlynchMicron commented Nov 13, 2024

For anyone else using the ORM and needing faster INSERT methods, I wrote some using batch DML statements and utilizing BQ streaming inserts via overridding the BigQueryDialect's use of the default Dialect do_execute and do_executemany functions.

@chalmerlowe please consider making this a more formalized option in the BigQueryDialect.

#Hotfix for this issue: https://github.com/googleapis/python-bigquery/issues/2048

import copy
import logging
import re
from typing import List

from google.cloud.bigquery import Client
from google.cloud.bigquery import dbapi as bq_dbapi
from sqlalchemy_bigquery.base import BigQueryDialect

###################################################################################################
#Insert style configuration
###################################################################################################
#NOTE: These are all to False so its default config, and let users overwrite the value in memory.
DML = False
STREAMING = False
LOADING = False

###################################################################################################
#Function definitions
###################################################################################################
def do_execute(self, cursor, statement, parameters, context=None):
    #NOTE: Borrowed from MySQL DBAPI: https://github.com/PyMySQL/PyMySQL/blob/main/pymysql/cursors.py#L157

    #: Regular expression for :meth:`Cursor.executemany`.
    #: executemany only supports simple bulk insert.
    #: You can use it to load large dataset.
    RE_INSERT_VALUES = re.compile(
        r"\s*((?:INSERT|REPLACE)\b\s+INTO\s+`?(\w+)`?\b.+\bVALUES?\s*)"
        + r"(\(\s*(?:%s|%\(.+\)s)\s*(?:,\s*(?:%s|%\(.+\)s)\s*)*\))"
        + r"(\s*(?:ON DUPLICATE.*)?);?\s*\Z",
        re.IGNORECASE | re.DOTALL,
    )

    m = RE_INSERT_VALUES.match(statement)

    if m and (STREAMING or LOADING):
        q_prefix = m.group(1) % ()
        q_tbl = m.group(2)
        q_values = m.group(3).rstrip()
        q_postfix = m.group(4) or ""
        assert q_values[0] == "(" and q_values[-1] == ")"

        if STREAMING:
            client:Client = cursor.connection._client
            #bqstorage_client = self.connection._bqstorage_client

            full_tbl_name = f'{client.default_query_job_config.default_dataset.project}.{client.default_query_job_config.default_dataset.dataset_id}.{q_tbl}'
            tbl_ref = client.get_table(full_tbl_name)

            insert_errs = client.insert_rows(tbl_ref, [parameters])
            if insert_errs:
                msg = f'Some rows failed to insert while performing BigQuery streaming inserts: failed_rows:{insert_errs}'
                logging.error(msg)
                raise Exception(msg)

        elif LOADING:
            raise NotImplementedError()

        else:
            pass

    else:
        cursor.execute(statement, parameters)

def do_executemany(self, cursor, statement:str, parameters:List[dict], context=None):
    #NOTE: Borrowed from MySQL DBAPI: https://github.com/PyMySQL/PyMySQL/blob/main/pymysql/cursors.py#L157

    #: Regular expression for :meth:`Cursor.executemany`.
    #: executemany only supports simple bulk insert.
    #: You can use it to load large dataset.
    RE_INSERT_VALUES = re.compile(
        r"\s*((?:INSERT|REPLACE)\b\s+INTO\s+`?(\w+)`?\b.+\bVALUES?\s*)"
        + r"(\(\s*(?:%s|%\(.+\)s)\s*(?:,\s*(?:%s|%\(.+\)s)\s*)*\))"
        + r"(\s*(?:ON DUPLICATE.*)?);?\s*\Z",
        re.IGNORECASE | re.DOTALL,
    )

    m = RE_INSERT_VALUES.match(statement)

    if m and (DML or STREAMING or LOADING):
        q_prefix = m.group(1) % ()
        q_tbl = m.group(2)
        q_values = m.group(3).rstrip()
        q_postfix = m.group(4) or ""
        assert q_values[0] == "(" and q_values[-1] == ")"

        if DML:
            param_limit = 10000 #"A GoogleSQL query can have up to 10,000 parameters."
            param_set_size = len(parameters[0])
            batch_step_size = param_limit // param_set_size

            for idx in range(0, len(parameters), batch_step_size):
                batch_params = parameters[idx : idx + batch_step_size]

                new_sql_stmt = copy.copy(q_prefix)
                new_sql_params = {}
                set_idx = 0

                for param_set in batch_params:
                    #formatted_operation, parameter_types = bq_dbapi.cursor._format_operation(statement, param_set)
                    #query_parameters = bq_dbapi._helpers.to_query_parameters(param_set, parameter_types)

                    set_params = {}
                    set_values = copy.copy(q_values)
                    for param_k,param_v in param_set.items():
                        new_param_k = f'{param_k}__{set_idx}'
                        set_params[new_param_k] = param_v
                        set_values = set_values.replace(f'%({param_k}:', f'%({new_param_k}:')

                    new_sql_stmt = f'{new_sql_stmt}{set_values},'
                    new_sql_params.update(set_params)
                    set_idx += 1

                new_sql_stmt = new_sql_stmt[:-1] # remove trailing comma
                new_sql_stmt = f'{new_sql_stmt}{q_postfix}'

               cursor.execute(new_sql_stmt, new_sql_params)
            
        elif STREAMING:
            client:Client = cursor.connection._client
            #bqstorage_client = self.connection._bqstorage_client

            full_tbl_name = f'{client.default_query_job_config.default_dataset.project}.{client.default_query_job_config.default_dataset.dataset_id}.{q_tbl}'
            tbl_ref = client.get_table(full_tbl_name)

            chunk_size = 500 #50k row limit, 500 recommended. https://cloud.google.com/bigquery/quotas#streaming_inserts 
            for idx in range(0, len(parameters), chunk_size):
                chunk = parameters[idx:idx + chunk_size]
                insert_errs = client.insert_rows(tbl_ref, chunk)

                if insert_errs:
                    msg = f'Some rows failed to insert while performing BigQuery streaming inserts: failed_rows:{insert_errs}'
                    logging.error(msg)
                    raise Exception(msg)
            
        elif LOADING:
            raise NotImplementedError()

        else:
            pass
    
    else:
        #Current implementation of this function only supports serial inserting.
        cursor.executemany(statement, parameters)

###################################################################################################
# Overwrite SA dialect function implementations
###################################################################################################
BigQueryDialect.do_execute = do_execute
BigQueryDialect.do_executemany = do_executemany

@chalmerlowe
Copy link
Collaborator

@jlynchMicron

Thanks for reaching out.
I appreciate your time and effort.

I will do my best to take a look.
One of the issues with taking on new features is that we need to figure out how to integrate those features into the overall package:

  • is the new code complete and comprehensive and does it cover a reasonable subset of edge-cases
  • does the new code meet style guidelines and readability, etc
  • does the new code need unit tests
  • does the new code need system tests
  • do all the features in the new code work in all the versions of Python we support or do we need to remove newer language features OR put in workarounds to ensure backward compatibility.

Not saying that this code doesn't meet guidelines, etc. I have not had a chance to take a good look at it.

You prolly already know this, I just want to ensure that you understand why something that seems as simple as add this snippet of new code takes time and effort to do, especially balanced against my existing TODO list.

I hope to be able to sit and look at this in depth, soon.
Will keep you posted and will reopen this issue.

@jlynchMicron
Copy link

Hi @chalmerlowe

Yeah, I completely understand all the overhead that's involved with onboarding new code features. I figured I would share my patch that accomplished my needs and could be the blueprints to solve other people's issues as well. Thanks!

@chalmerlowe
Copy link
Collaborator

chalmerlowe commented Nov 20, 2024

@jlynchMicron I read through a ton of discussion on this including comments by you, zzzeek, and others across multiple python-bigquery-* repositories. Gonna try to digest all this content and see what makes sense going forward.

@jlynchMicron
Copy link

@chalmerlowe, sounds good and thanks! Yeah the DML batching patch should arguably be changed at the python bigquery cursor level (this ticket googleapis/python-bigquery#2048 ), but I do think it would be nice to be able to select an insert "style" at the bigquery sqlalchemy level.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
api: bigquery Issues related to the googleapis/python-bigquery-sqlalchemy API. priority: p3 Desirable enhancement or fix. May not be included in next release. type: feature request ‘Nice-to-have’ improvement, new feature or different behavior or design.
Projects
None yet
Development

No branches or pull requests

7 participants