Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions airflow/migrations/versions/e3a246e0dc1_current_schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,6 @@ def upgrade():
if 'xcom' not in tables:
op.create_table(
'xcom',
sa.Column('id', sa.Integer(), nullable=False),
sa.Column('key', sa.String(length=512), nullable=True),
sa.Column('value', sa.PickleType(), nullable=True),
sa.Column(
Expand All @@ -235,7 +234,7 @@ def upgrade():
sa.Column('execution_date', sa.DateTime(), nullable=False),
sa.Column('task_id', sa.String(length=250), nullable=False),
sa.Column('dag_id', sa.String(length=250), nullable=False),
sa.PrimaryKeyConstraint('id')
sa.PrimaryKeyConstraint('dag_id', 'task_id', 'execution_date', 'key')
)


Expand Down
15 changes: 0 additions & 15 deletions airflow/models/taskinstance.py
Original file line number Diff line number Diff line change
Expand Up @@ -473,18 +473,6 @@ def refresh_from_db(self, session=None, lock_for_update=False, refresh_executor_
else:
self.state = None

@provide_session
def clear_xcom_data(self, session=None):
"""
Clears all XCom data from the database for the task instance
"""
session.query(XCom).filter(
XCom.dag_id == self.dag_id,
XCom.task_id == self.task_id,
XCom.execution_date == self.execution_date
).delete()
session.commit()

@property
def key(self):
"""
Expand Down Expand Up @@ -909,9 +897,6 @@ def signal_handler(signum, frame):
raise AirflowException("Task received SIGTERM signal")
signal.signal(signal.SIGTERM, signal_handler)

# Don't clear Xcom until the task is certain to execute
self.clear_xcom_data()

start_time = time.time()

self.render_templates(context=context)
Expand Down
18 changes: 6 additions & 12 deletions airflow/models/xcom.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
import json
import pickle

from sqlalchemy import Column, Index, Integer, LargeBinary, String, and_
from sqlalchemy import Column, Index, LargeBinary, String, and_
from sqlalchemy.orm import reconstructor

from airflow.configuration import conf
Expand All @@ -43,16 +43,14 @@ class XCom(Base, LoggingMixin):
"""
__tablename__ = "xcom"

id = Column(Integer, primary_key=True)
key = Column(String(512))
key = Column(String(512), primary_key=True, nullable=False)
value = Column(LargeBinary)
timestamp = Column(
UtcDateTime, default=timezone.utcnow, nullable=False)
execution_date = Column(UtcDateTime, nullable=False)
timestamp = Column(UtcDateTime, default=timezone.utcnow, nullable=False)
execution_date = Column(UtcDateTime, primary_key=True, nullable=False)

# source information
task_id = Column(String(ID_LEN), nullable=False)
dag_id = Column(String(ID_LEN), nullable=False)
task_id = Column(String(ID_LEN), primary_key=True, nullable=False)
dag_id = Column(String(ID_LEN), primary_key=True, nullable=False)

__table_args__ = (
Index('idx_xcom_dag_task_date', dag_id, task_id, execution_date, unique=False),
Copy link
Contributor

@dstandish dstandish Oct 26, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Fokko couple things / questions

  1. it seems we don't need this index anymore, now that there is a primary key with the same columns
  2. i could be wrong (i am new to alembic and sqlalchemy) but aren't you missing a migration script for users who are upgrading?
  3. should we add PrimaryKeyConstraint('dag_id', 'task_id', 'execution_date', 'key') here to clarify order of index columns? Although it seems that table structure is entirely managed by alembic, and these table args have no effect, we should probably be consistent (and if i understand correctly, if not specified, primary key cols are ordered as they appear in table def).

Copy link
Contributor Author

@Fokko Fokko Oct 26, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, you're right. The index has been replaced by the PrimaryKey.

  • For sqlite the only way of doing these kinds of changes is dropping and recreating the table. Because sqlite does not have any support for altering columns.
  • The table is actually more or less the same, only the ID column is removed (which wasn't being used).
  • I looked at other __table_args__, and I don't see a PrimaryKeyConstraint being defined. I think it would make sense to add this.

Copy link
Contributor

@dstandish dstandish Oct 26, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was mainly looking at this PR mainly to learn how to do DB change, in case I ever want to propose something involving DB schema change. But noticed these things and wanted to surface in case something was possibly missed.

Clarification of observations

  • I noticed that only migration changes in this PR are in the current_schema migration script, and they are only applied if table not exists. So this indicated to me that this change would not be applied on upgrades, whether sqlite or any other database.
  • I tried doing a new install (on sqlite) and both index and primary key were present.
  • I tried doing upgrade (on sqlite) and the id column was not dropped.

sqlite migration issues

Re sqlite migrations, this stack overflow post that indicates that table alters on sqlite are supported with alembic > 0.7.0 using batch mode, where it will handle creating new tables and copying data. Wondering if that is possibly relevant here.

alembic revision issues

I tried running alembic revision locally and encountered 2 issues. 1 was typing import resolution error due to new airflow/typing.py module being in same directory where we try to run alembic. 2 was FAILED: Multiple heads are present; in alembic. Are migrations are handled by release manager so we don't deal with them in PRs?

This blog post suggests adding unit test for detecting multiple revision heads. Maybe that's a good idea.

upgrades
are we meant to be able to upgrade to 2.0 from 1.10.X? cus i tried installing fresh install of 1.10.6 and then checking out master and running airflow db upgrade and got an error. Is this something that we should be testing for possibly?

Sorry, know, these questions sort of venture off out of scope of this PR...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The change is fully backward compatible. The only chance is getting rid of the id column because that one wasn't used anywhere in the code. The index on there has been replaced by the primary key. Because this is such a small change, I thought this would be okay to keep this only for new installations.

The merging of the alembic should have been fixed here: #6362 Please check if you're on the latest master. The unit test sounds like an excellent idea.

Regarding the upgrade, that should work. Could you share the error?

I'll work on a migration script: https://jira.apache.org/jira/browse/AIRFLOW-5767

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here is the error message:

~/code/airflow master ⇡
v1-10 ❯ airflow db upgrade
/Users/dstandish/code/airflow/airflow/models/dagbag.py:21: DeprecationWarning: the imp module is deprecated in favour of importlib; see the module's documentation for alternative uses
  import imp
DB: sqlite:////Users/dstandish/airflow/airflow.db
[2019-10-26 15:23:49,454] {db.py:318} INFO - Creating tables
INFO  [alembic.runtime.migration] Context impl SQLiteImpl.
INFO  [alembic.runtime.migration] Will assume non-transactional DDL.
Traceback (most recent call last):
  File "/Users/dstandish/.virtualenvs/v1-10/lib/python3.7/site-packages/alembic/script/base.py", line 162, in _catch_revision_errors
    yield
  File "/Users/dstandish/.virtualenvs/v1-10/lib/python3.7/site-packages/alembic/script/base.py", line 364, in _upgrade_revs
    revs = list(revs)
  File "/Users/dstandish/.virtualenvs/v1-10/lib/python3.7/site-packages/alembic/script/revision.py", line 819, in _iterate_revisions
    select_for_downgrade and requested_lowers
  File "/Users/dstandish/.virtualenvs/v1-10/lib/python3.7/site-packages/alembic/script/revision.py", line 814, in <genexpr>
    rev.revision
  File "/Users/dstandish/.virtualenvs/v1-10/lib/python3.7/site-packages/alembic/script/revision.py", line 746, in _iterate_related_revisions
    ", ".join(r.revision for r in overlaps),
alembic.script.revision.RevisionError: Requested revision a56c9515abdc overlaps with other requested revisions 004c1210f153, 74effc47d867

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/Users/dstandish/.virtualenvs/v1-10/bin/airflow", line 7, in <module>
    exec(compile(f.read(), __file__, 'exec'))
  File "/Users/dstandish/code/airflow/airflow/bin/airflow", line 39, in <module>
    args.func(args)
  File "/Users/dstandish/code/airflow/airflow/utils/cli.py", line 74, in wrapper
    return f(*args, **kwargs)
  File "/Users/dstandish/code/airflow/airflow/bin/cli.py", line 1236, in upgradedb
    db.upgradedb()
  File "/Users/dstandish/code/airflow/airflow/utils/db.py", line 326, in upgradedb
    command.upgrade(config, 'heads')
  File "/Users/dstandish/.virtualenvs/v1-10/lib/python3.7/site-packages/alembic/command.py", line 298, in upgrade
    script.run_env()
  File "/Users/dstandish/.virtualenvs/v1-10/lib/python3.7/site-packages/alembic/script/base.py", line 489, in run_env
    util.load_python_file(self.dir, "env.py")
  File "/Users/dstandish/.virtualenvs/v1-10/lib/python3.7/site-packages/alembic/util/pyfiles.py", line 98, in load_python_file
    module = load_module_py(module_id, path)
  File "/Users/dstandish/.virtualenvs/v1-10/lib/python3.7/site-packages/alembic/util/compat.py", line 173, in load_module_py
    spec.loader.exec_module(module)
  File "<frozen importlib._bootstrap_external>", line 728, in exec_module
  File "<frozen importlib._bootstrap>", line 219, in _call_with_frames_removed
  File "/Users/dstandish/code/airflow/airflow/migrations/env.py", line 103, in <module>
    run_migrations_online()
  File "/Users/dstandish/code/airflow/airflow/migrations/env.py", line 97, in run_migrations_online
    context.run_migrations()
  File "<string>", line 8, in run_migrations
  File "/Users/dstandish/.virtualenvs/v1-10/lib/python3.7/site-packages/alembic/runtime/environment.py", line 846, in run_migrations
    self.get_context().run_migrations(**kw)
  File "/Users/dstandish/.virtualenvs/v1-10/lib/python3.7/site-packages/alembic/runtime/migration.py", line 507, in run_migrations
    for step in self._migrations_fn(heads, self):
  File "/Users/dstandish/.virtualenvs/v1-10/lib/python3.7/site-packages/alembic/command.py", line 287, in upgrade
    return script._upgrade_revs(revision, rev)
  File "/Users/dstandish/.virtualenvs/v1-10/lib/python3.7/site-packages/alembic/script/base.py", line 369, in _upgrade_revs
    for script in reversed(list(revs))
  File "/usr/local/Cellar/python/3.7.3/Frameworks/Python.framework/Versions/3.7/lib/python3.7/contextlib.py", line 130, in __exit__
    self.gen.throw(type, value, traceback)
  File "/Users/dstandish/.virtualenvs/v1-10/lib/python3.7/site-packages/alembic/script/base.py", line 196, in _catch_revision_errors
    compat.raise_from_cause(util.CommandError(err.args[0]))
  File "/Users/dstandish/.virtualenvs/v1-10/lib/python3.7/site-packages/alembic/util/compat.py", line 297, in raise_from_cause
    reraise(type(exception), exception, tb=exc_tb, cause=exc_value)
  File "/Users/dstandish/.virtualenvs/v1-10/lib/python3.7/site-packages/alembic/util/compat.py", line 290, in reraise
    raise value.with_traceback(tb)
  File "/Users/dstandish/.virtualenvs/v1-10/lib/python3.7/site-packages/alembic/script/base.py", line 162, in _catch_revision_errors
    yield
  File "/Users/dstandish/.virtualenvs/v1-10/lib/python3.7/site-packages/alembic/script/base.py", line 364, in _upgrade_revs
    revs = list(revs)
  File "/Users/dstandish/.virtualenvs/v1-10/lib/python3.7/site-packages/alembic/script/revision.py", line 819, in _iterate_revisions
    select_for_downgrade and requested_lowers
  File "/Users/dstandish/.virtualenvs/v1-10/lib/python3.7/site-packages/alembic/script/revision.py", line 814, in <genexpr>
    rev.revision
  File "/Users/dstandish/.virtualenvs/v1-10/lib/python3.7/site-packages/alembic/script/revision.py", line 746, in _iterate_related_revisions
    ", ".join(r.revision for r in overlaps),
alembic.util.exc.CommandError: Requested revision a56c9515abdc overlaps with other requested revisions 004c1210f153, 74effc47d867

To reproduce here's what I did:

git pull upstream master # ensure we have latest
git checkout v-10-stable # 1.10.6rc2
rm ~/airflow/airflow.db
airflow initdb
git checkout master
airflow db upgrade

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for pointing this out @dstandish. This is indeed an issue. Could you create a ticket for this? And include the stack trace and how to reproduce this? That would be very helpful. Thanks

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK created issue here & assigned to you for now

Expand Down Expand Up @@ -99,8 +97,6 @@ def set(

:return: None
"""
session.expunge_all()

value = XCom.serialize_value(value)

# remove any duplicate XComs
Expand All @@ -110,8 +106,6 @@ def set(
cls.task_id == task_id,
cls.dag_id == dag_id).delete()

session.commit()

# insert new XCom
session.add(XCom(
key=key,
Expand Down
3 changes: 0 additions & 3 deletions tests/models/test_taskinstance.py
Original file line number Diff line number Diff line change
Expand Up @@ -900,9 +900,6 @@ def test_xcom_pull_after_success(self):
# execute, even if dependencies are ignored
ti.run(ignore_all_deps=True, mark_success=True)
self.assertEqual(ti.xcom_pull(task_ids='test_xcom', key=key), value)
# Xcom IS finally cleared once task has executed
ti.run(ignore_all_deps=True)
self.assertEqual(ti.xcom_pull(task_ids='test_xcom', key=key), None)

def test_xcom_pull_different_execution_date(self):
"""
Expand Down