Python 使用 Alembic 更改 Enum 字段
声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow
原文地址: http://stackoverflow.com/questions/14845203/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me):
StackOverFlow
Altering an Enum field using Alembic
提问by bboe
How can I add an element to an Enum field in an alembic migration when using a version of PostgreSQL older than 9.1 (which adds the ALTER TYPE for enums)? ThisSO question explains the direct process, but I'm not quite sure how best to translate that using alembic.
使用早于 9.1 的 PostgreSQL 版本(为枚举添加 ALTER TYPE)时,如何在 alembic 迁移中向 Enum 字段添加元素?这个SO question解释了直接过程,但我不太确定如何最好地使用alembic来翻译它。
This is what I have:
这就是我所拥有的:
new_type = sa.Enum('nonexistent_executable', 'output_limit_exceeded',
'signal', 'success', 'timed_out', name='status')
old_type = sa.Enum('nonexistent_executable', 'signal', 'success', 'timed_out',
name='status')
tcr = sa.sql.table('testcaseresult',
sa.Column('status', new_type, nullable=False))
def upgrade():
op.alter_column('testcaseresult', u'status', type_=new_type,
existing_type=old_type)
def downgrade():
op.execute(tcr.update().where(tcr.c.status==u'output_limit_exceeded')
.values(status='timed_out'))
op.alter_column('testcaseresult', u'status', type_=old_type,
existing_type=new_type)
The above unfortunately only produces ALTER TABLE testcaseresult ALTER COLUMN status TYPE statusupon upgrade, which essentially does nothing.
不幸的是,上面只ALTER TABLE testcaseresult ALTER COLUMN status TYPE status在升级时产生,基本上什么都不做。
采纳答案by bboe
I decided to try to follow the postgres approachas directly as possible and came up with the following migration.
我决定尝试尽可能直接地遵循postgres 方法并提出以下迁移。
from alembic import op
import sqlalchemy as sa
old_options = ('nonexistent_executable', 'signal', 'success', 'timed_out')
new_options = sorted(old_options + ('output_limit_exceeded',))
old_type = sa.Enum(*old_options, name='status')
new_type = sa.Enum(*new_options, name='status')
tmp_type = sa.Enum(*new_options, name='_status')
tcr = sa.sql.table('testcaseresult',
sa.Column('status', new_type, nullable=False))
def upgrade():
# Create a tempoary "_status" type, convert and drop the "old" type
tmp_type.create(op.get_bind(), checkfirst=False)
op.execute('ALTER TABLE testcaseresult ALTER COLUMN status TYPE _status'
' USING status::text::_status')
old_type.drop(op.get_bind(), checkfirst=False)
# Create and convert to the "new" status type
new_type.create(op.get_bind(), checkfirst=False)
op.execute('ALTER TABLE testcaseresult ALTER COLUMN status TYPE status'
' USING status::text::status')
tmp_type.drop(op.get_bind(), checkfirst=False)
def downgrade():
# Convert 'output_limit_exceeded' status into 'timed_out'
op.execute(tcr.update().where(tcr.c.status==u'output_limit_exceeded')
.values(status='timed_out'))
# Create a tempoary "_status" type, convert and drop the "new" type
tmp_type.create(op.get_bind(), checkfirst=False)
op.execute('ALTER TABLE testcaseresult ALTER COLUMN status TYPE _status'
' USING status::text::_status')
new_type.drop(op.get_bind(), checkfirst=False)
# Create and convert to the "old" status type
old_type.create(op.get_bind(), checkfirst=False)
op.execute('ALTER TABLE testcaseresult ALTER COLUMN status TYPE status'
' USING status::text::status')
tmp_type.drop(op.get_bind(), checkfirst=False)
It appears that alembic has no direct support for the USINGstatement in its alter_tablemethod.
似乎 alembicUSING对其alter_table方法中的语句没有直接支持。
回答by thisfred
In straight SQL, this would work for Postgres, if the order of the things in your enum doesn't need to be exactly as above:
在直接 SQL 中,如果您的枚举中的事物的顺序不需要完全如上,这将适用于 Postgres:
ALTER TYPE status ADD value 'output_limit_exceeded' after 'timed_out';
回答by Rob Young
As of Postgres 9.1 adding a new value to an enum can be done with the ALTER TYPEstatement. This is complicated by the fact that it cannot be done in a transaction. However this can be worked around by committing alembic's transaction see here.
从 Postgres 9.1 开始,可以使用该ALTER TYPE语句为枚举添加新值。由于它不能在事务中完成,因此这很复杂。但是,这可以通过提交 alembic 的交易来解决,请参见此处。
I actually had problems using the older, more verbose, solution because Postgres could not automatically convert the default for the column.
我实际上在使用旧的、更详细的解决方案时遇到了问题,因为 Postgres 无法自动转换列的默认值。
回答by JelteF
I used a bit simpler approach with less steps than the accepted answer, which I based this on. In this example I will pretend the enum in question is called 'status_enum', because in the accepted answer the use of 'status' for both the column and enum confused me.
我使用了一种比接受的答案更简单的方法,步骤更少,我以此为基础。在这个例子中,我将假设有问题的枚举称为“status_enum”,因为在接受的答案中,列和枚举都使用“status”让我感到困惑。
from alembic import op
import sqlalchemy as sa
name = 'status_enum'
tmp_name = 'tmp_' + name
old_options = ('nonexistent_executable', 'signal', 'success', 'timed_out')
new_options = sorted(old_options + ('output_limit_exceeded',))
new_type = sa.Enum(*new_options, name=name)
old_type = sa.Enum(*old_options, name=name)
tcr = sa.sql.table('testcaseresult',
sa.Column('status', new_type, nullable=False))
def upgrade():
op.execute('ALTER TYPE ' + name + ' RENAME TO ' + tmp_name)
new_type.create(op.get_bind())
op.execute('ALTER TABLE testcaseresult ALTER COLUMN status ' +
'TYPE ' + name + ' USING status::text::' + name)
op.execute('DROP TYPE ' + tmp_name)
def downgrade():
# Convert 'output_limit_exceeded' status into 'timed_out'
op.execute(tcr.update().where(tcr.c.status=='output_limit_exceeded')
.values(status='timed_out'))
op.execute('ALTER TYPE ' + name + ' RENAME TO ' + tmp_name)
old_type.create(op.get_bind())
op.execute('ALTER TABLE testcaseresult ALTER COLUMN status ' +
'TYPE ' + name + ' USING status::text::' + name)
op.execute('DROP TYPE ' + tmp_name)
回答by Jeyfel Brandauer
I had the same issue trying to migrate a column type to another. I use the following requirements:
我在尝试将列类型迁移到另一个时遇到了同样的问题。我使用以下要求:
Alembic==0.9.4
SQLAlchemy==1.1.12
You can provide the argument postgresql_usingas a kwarg of alembic.op.alter_column.
您可以将参数postgresql_using作为alembic.op.alter_column.
from alembic import op
import sqlalchemy as types
op.alter_column(
table_name='my_table',
column_name='my_column',
type_=types.NewType,
# allows to use postgresql USING
postgresql_using="my_column::PostgesEquivalentOfNewType",
)
I hope it can help.
我希望它能有所帮助。
回答by Aditya
回答by Californian
I needed to move data while migrating types, including deleting some old types, so I figured I'd write up a more general way of doing this based on the (awesome) accepted answer (https://stackoverflow.com/a/14845740/629272). Hopefully this helps someone else in the same boat!
我需要在迁移类型时移动数据,包括删除一些旧类型,所以我想我会根据(很棒的)接受的答案写出一种更通用的方法来做到这一点(https://stackoverflow.com/a/14845740 /629272)。希望这可以帮助其他人在同一条船上!
# This migration will move data from one column to two others based on the type
# for a given row, and modify the type of each row.
from alembic import op
import sqlalchemy as sa
from sqlalchemy.dialects import postgresql
revision = '000000000001'
down_revision = '000000000000'
branch_labels = None
depends_on = None
# This set of options makes up the old type.
example_types_old = (
'EXAMPLE_A',
'EXAMPLE_B',
'EXAMPLE_C',
)
example_type_enum_old = postgresql.ENUM(*example_types_old, name='exampletype')
# This set of options makes up the new type.
example_types_new = (
'EXAMPLE_C',
'EXAMPLE_D',
'EXAMPLE_E',
)
example_type_enum_new = postgresql.ENUM(*example_types_new, name='exampletype')
# This set of options includes everything from the old and new types.
example_types_tmp = set(example_types_old + example_types_new)
example_type_enum_tmp = postgresql.ENUM(*example_types_tmp, name='_exampletype')
# This is a table view from which we can select and update as necessary. This
# only needs to include the relevant columns which are in either the old or new
# version of the table.
examples_view = sa.Table(
# Use the name of the actual table so it is modified in the upgrade and
# downgrade.
'examples',
sa.MetaData(),
sa.Column('id', sa.Integer, primary_key=True),
# Use the _tmp type so all types are usable.
sa.Column('example_type', example_type_enum_tmp),
# This is a column from which the data will be migrated, after which the
# column will be removed.
sa.Column('example_old_column', sa.Integer),
# This is a column to which data from the old column will be added if the
# type is EXAMPLE_A.
sa.Column('example_new_column_a', sa.Integer),
# This is a column to which data from the old column will be added if the
# type is EXAMPLE_B.
sa.Column('example_new_column_b', sa.Integer),
)
def upgrade():
connection = op.get_bind()
# Add the new column to which data will be migrated.
example_new_column_a = sa.Column(
'example_new_column_a',
sa.Integer,
nullable=True
)
op.add_column('examples', example_new_column_a)
# Add the new column to which data will be migrated.
example_new_column_b = sa.Column(
'example_new_column_b',
sa.Integer,
nullable=True
)
op.add_column('examples', example_new_column_b)
# Create the temporary enum and change the example_type column to use the
# temporary enum.
# The USING statement automatically maps the old enum to the temporary one.
example_type_enum_tmp.create(connection, checkfirst=False)
# Change to the temporary type and map from the old type to the temporary
# one.
op.execute('''
ALTER TABLE examples
ALTER COLUMN example_type
TYPE _exampletype
USING example_type::text::_exampletype
''')
# Move data from example_old_column to example_new_column_a and change its
# type to EXAMPLE_D if the type is EXAMPLE_A.
connection.execute(
examples_view.update().where(
examples_view.c.example_type == 'EXAMPLE_A'
).values(
example_type='EXAMPLE_D',
example_new_column_a=examples_view.c.example_old_column,
)
)
# Move data from example_old_column to example_new_column_b and change its
# type to EXAMPLE_E if the type is EXAMPLE_B.
connection.execute(
examples_view.update().where(
examples_view.c.example_type == 'EXAMPLE_B'
).values(
example_type='EXAMPLE_E',
example_new_column_b=examples_view.c.example_old_column,
)
)
# Move any remaining data from example_old_column to example_new_column_a
# and keep its type as EXAMPLE_C.
connection.execute(
examples_view.update().where(
examples_view.c.example_type == 'EXAMPLE_C'
).values(
example_type='EXAMPLE_C',
example_new_column_a=examples_view.c.example_old_column,
)
)
# Delete the old enum now that the data with the old types have been moved.
example_type_enum_old.drop(connection, checkfirst=False)
# Create the new enum and change the example_type column to use the new
# enum.
# The USING statement automatically maps the temporary enum to the new one.
example_type_enum_new.create(connection, checkfirst=False)
op.execute('''
ALTER TABLE examples
ALTER COLUMN example_type
TYPE exampletype
USING example_type::text::exampletype
''')
# Delete the temporary enum.
example_type_enum_tmp.drop(connection, checkfirst=False)
# Remove the old column.
op.drop_column('examples', 'example_old_column')
# The downgrade just performs the opposite of all the upgrade operations but in
# reverse.
def downgrade():
connection = op.get_bind()
example_old_column = sa.Column(
'example_old_column',
sa.Integer,
nullable=True
)
op.add_column('examples', example_old_column)
example_type_enum_tmp.create(connection, checkfirst=False)
op.execute('''
ALTER TABLE examples
ALTER COLUMN example_type
TYPE _exampletype
USING example_type::text::_exampletype
''')
connection.execute(
examples_view.update().where(
examples_view.c.example_type == 'EXAMPLE_C'
).values(
example_type='EXAMPLE_C',
example_old_column=examples_view.c.example_new_column_b,
)
)
connection.execute(
examples_view.update().where(
examples_view.c.example_type == 'EXAMPLE_E'
).values(
example_type='EXAMPLE_B',
example_old_column=examples_view.c.example_new_column_b,
)
)
connection.execute(
examples_view.update().where(
examples_view.c.example_type == 'EXAMPLE_D'
).values(
example_type='EXAMPLE_A',
example_old_column=examples_view.c.example_new_column_a,
)
)
example_type_enum_old.create(connection, checkfirst=False)
op.execute('''
ALTER TABLE examples
ALTER COLUMN example_type
TYPE exampletype
USING example_type::text::exampletype
''')
example_type_enum_tmp.drop(connection, checkfirst=False)
op.drop_column('examples', 'example_new_column_b')
op.drop_column('examples', 'example_new_column_a')
回答by Ivorius
Since I got conversion errors and problems with default values, I wrote an even more generalised answer based on the accepted one:
由于我遇到了默认值的转换错误和问题,我根据接受的答案写了一个更笼统的答案:
def replace_enum_values(
name: str,
old: [str],
new: [str],
modify: [(str, str, str)]
):
"""
Replaces an enum's list of values.
Args:
name: Name of the enum
new: New list of values
old: Old list of values
modify: List of tuples of table name
and column to modify (which actively use the enum).
Assumes each column has a default val.
"""
connection = op.get_bind()
tmp_name = "{}_tmp".format(name)
# Rename old type
op.execute(
"ALTER TYPE {} RENAME TO {};"
.format(name, tmp_name)
)
# Create new type
lsl = sa.Enum(*new, name=name)
lsl.create(connection)
# Replace all usages
for (table, column) in modify:
# Get default to re-set later
default_typed = connection.execute(
"SELECT column_default "
"FROM information_schema.columns "
"WHERE table_name='{table}' "
"AND column_name='{column}';"
.format(table=table, column=column)
).first()[0] # type: str
# Is bracketed already
default = default_typed[:default_typed.index("::")]
# Set all now invalid values to default
connection.execute(
"UPDATE {table} "
"SET {column}={default} "
"WHERE {column} NOT IN {allowed};"
.format(
table=table,
column=column,
# Invalid: What isn't contained in both new and old
# Can't just remove what's not in new because we get
# a type error
allowed=tuple(set(old).intersection(set(new))),
default=default
)
)
op.execute(
"ALTER TABLE {table} "
# Default needs to be dropped first
"ALTER COLUMN {column} DROP DEFAULT,"
# Replace the tpye
"ALTER COLUMN {column} TYPE {enum_name} USING {column}::text::{enum_name},"
# Reset default
"ALTER COLUMN {column} SET DEFAULT {default};"
.format(
table=table,
column=column,
enum_name=name,
default=default
)
)
# Remove old type
op.execute("DROP TYPE {};".format(tmp_name))
This can be called from upgrade / downgrade as such:
这可以从升级/降级调用,如下所示:
replace_enum_values(
name='enum_name',
new=["A", "B"],
old=["A", "C"],
modify=[('some_table', 'some_column')]
)
All invalidated values will be set to server_default.
所有无效的值都将设置为 server_default。
回答by Swaleh Matongwa
Found another handy method
找到了另一个方便的方法
op.execute('ALTER TYPE enum_type ADD VALUE new_value')
op.execute('ALTER TYPE enum_type ADD VALUE new_value BEFORE old_value')
op.execute('ALTER TYPE enum_type ADD VALUE new_value AFTER old_value')

