24

I would like to do an upsert using the "new" functionality added by postgresql 9.5, using sqlalchemy core. While it is implemented, I'm pretty confused by the syntax, which I can't adapt to my needs. Here is a sample code of what I would like to be able to do :

from sqlalchemy.ext.declarative import declarative_base

Base = declarative_base()
class User(Base):
    __tablename__ = 'test'
    a_id = Column('id',Integer, primary_key=True)
    a = Column("a",Integer)

engine = create_engine('postgres://name:password@localhost/test')
User().metadata.create_all(engine)
meta = MetaData(engine)
meta.reflect()
table = Table('test', meta, autoload=True)
conn = engine.connect()

from sqlalchemy.dialects.postgresql import insert as psql_insert
stmt = psql_insert(table).values({
    table.c['id']: bindparam('id'),
    table.c['a']: bindparam('a'),
})
stmt = stmt.on_conflict_do_update(
    index_elements=[table.c['id']],
    set_={'a': bindparam('a')},
)
list_of_dictionary = [{'id':1, 'a':1, }, {'id':2, 'a':2,}]
conn.execute(stmt, list_of_dictionary)

I basically want to insert a bulk of rows, and if one id is already taken, I want to update it with the value I initially wanted to insert. However sqlalchemy throw me this error :

CompileError: bindparam() name 'a' is reserved for automatic usage in the VALUES or SET clause of this insert/update statement.   Please use a name other than column name when using bindparam() with insert() or update() (for example, 'b_a').

While it is a known issue (see https://groups.google.com/forum/#!topic/sqlalchemy/VwiUlF1cz_o), I didn't found any proper answer that does not require to modify either the keys of list_of_dictionary or the name of your columns.

I want to know if there is a way of constructing stmt in a way to have a consistent behavior that does not depends on whether the keys of the variable list_of_dictionary are the name of the columns of the inserted table (my code works without error in those cases).

2 Answers 2

34

this does the trick for me:

from sqlalchemy import create_engine
from sqlalchemy import MetaData, Table
from sqlalchemy.dialects import postgresql
from sqlalchemy.inspection import inspect

def upsert(engine, schema, table_name, records=[]):

    metadata = MetaData(schema=schema)
    metadata.bind = engine

    table = Table(table_name, metadata, schema=schema, autoload=True)

    # get list of fields making up primary key
    primary_keys = [key.name for key in inspect(table).primary_key]

    # assemble base statement
    stmt = postgresql.insert(table).values(records)

    # define dict of non-primary keys for updating
    update_dict = {
        c.name: c
        for c in stmt.excluded
        if not c.primary_key
    }

    # cover case when all columns in table comprise a primary key
    # in which case, upsert is identical to 'on conflict do nothing.
    if update_dict == {}:
        warnings.warn('no updateable columns found for table')
        # we still wanna insert without errors
        insert_ignore(table_name, records)
        return None


    # assemble new statement with 'on conflict do update' clause
    update_stmt = stmt.on_conflict_do_update(
        index_elements=primary_keys,
        set_=update_dict,
    )

    # execute
    with engine.connect() as conn:
        result = conn.execute(update_stmt)
        return result
Sign up to request clarification or add additional context in comments.

6 Comments

I didn't knew the stmt.excluded, which is what I needed. I have on an other hand no idea of why you are trying to exclude primary keys, set={c.name : c for c in stmt.excluded} seems to just work as intended (i don't mind "updating" a primary key, as it's by definition the same value)
oh that is a good point. would make the code at least a bit more elegant.
Does this work with execute(query)? query is never defined anywhere. Should it be execute(update_stmt)?
yes, that's exactly right - i've updated the code block to reflect that.
What is insert_ignore i don't see that function in your imports, also what is records? Is a list or a list or dict?
|
-1

For anyone looking for an ORM solution, the following worked for me:

def upsert(
    sa_sessionmaker: Union[sessionmaker, scoped_session],
    model: DeclarativeMeta,
    get_values: Dict[str, Any],
    update_values: Dict[str, Any],
) -> Any:
    """Upserts (updates if exists, else inserts) a SQLAlchemy model object.

    Note that get_values must uniquely identify a single model object (row) for this
    function to work.

    Args:
        sa_sessionmaker: SQLAlchemy sessionmaker to connect to the database.
        model: Model declarative metadata.
        get_values: Arguments used to try to retrieve an existing object.
        update_values: Desired attributes for the object fetched via get_values, 
            or the new object if nothing was fetched.

    Returns:
        Model object subject to upsert.
    """
    with sa_sessionmaker() as session:
        instance = session.query(model).filter_by(**get_values).one_or_none()
        if instance:
            for attr, new_val in update_values.items():
                setattr(instance, attr, new_val)
        else:
            create_kwargs = get_values | update_values
            session.add(model(**create_kwargs))
        session.commit()
        instance = session.query(model).filter_by(**get_values).one_or_none()
    return instance

A few remarks:

  • If the primary key of the object is known, using Session.merge() is likely a better alternative than the function above. In that sense, the function above assumes that the primary key is not known (and hence not part of get_values)
  • sa_sessionmaker is a factory for Session objects (see the docs)
  • model takes a SQLAlchemy declarative metadata (i.e., a "table" see the docs)
  • Python >= 3.9 required for the implementation above. If your environment requires a previous version of Python, replace create_kwargs = get_values | update_values with create_kwargs = {**get_values, **update_values}

1 Comment

This has a race condition because it handles the get_or_create in Python instead of in the database. You're going to get sqlalchemy.exc.IntegrityError raised eventually with this code.

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.