Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 29 additions & 0 deletions airflow/api_connexion/endpoints/connection_endpoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
# specific language governing permissions and limitations
# under the License.

import os

from flask import request
from marshmallow import ValidationError
from sqlalchemy import func
Expand All @@ -27,10 +29,13 @@
ConnectionCollection,
connection_collection_schema,
connection_schema,
connection_test_schema,
)
from airflow.models import Connection
from airflow.secrets.environment_variables import CONN_ENV_PREFIX
from airflow.security import permissions
from airflow.utils.session import provide_session
from airflow.utils.strings import get_random_string


@security.requires_access([(permissions.ACTION_CAN_DELETE, permissions.RESOURCE_CONNECTION)])
Expand Down Expand Up @@ -129,3 +134,27 @@ def post_connection(session):
session.commit()
return connection_schema.dump(connection)
raise AlreadyExists(detail=f"Connection already exist. ID: {conn_id}")


@security.requires_access([(permissions.ACTION_CAN_CREATE, permissions.RESOURCE_CONNECTION)])
def test_connection():
"""
To test a connection, this method first creates an in-memory dummy conn_id & exports that to an
env var, as some hook classes tries to find out the conn from their __init__ method & errors out
if not found. It also deletes the conn id env variable after the test.
"""
body = request.json
dummy_conn_id = get_random_string()
conn_env_var = f'{CONN_ENV_PREFIX}{dummy_conn_id.upper()}'
try:
data = connection_schema.load(body)
data['conn_id'] = dummy_conn_id
conn = Connection(**data)
os.environ[conn_env_var] = conn.get_uri()
status, message = conn.test_connection()
return connection_test_schema.dump({"status": status, "message": message})
except ValidationError as err:
raise BadRequest(detail=str(err.messages))
finally:
if conn_env_var in os.environ:
del os.environ[conn_env_var]
39 changes: 39 additions & 0 deletions airflow/api_connexion/openapi/v1.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -369,6 +369,34 @@ paths:
'404':
$ref: '#/components/responses/NotFound'

/connections/test:
post:
summary: Test a connection
x-openapi-router-controller: airflow.api_connexion.endpoints.connection_endpoint
operationId: test_connection
tags: [Connection]
requestBody:
required: true
content:
application/json:
schema:
$ref: '#/components/schemas/Connection'
responses:
'200':
description: Success.
content:
application/json:
schema:
$ref: '#/components/schemas/ConnectionTest'
'400':
$ref: '#/components/responses/BadRequest'
'401':
$ref: '#/components/responses/Unauthenticated'
'403':
$ref: '#/components/responses/PermissionDenied'
'404':
$ref: '#/components/responses/NotFound'

/dags:
get:
summary: List DAGs
Expand Down Expand Up @@ -1739,6 +1767,17 @@ components:
nullable: true
description: Other values that cannot be put into another field, e.g. RSA keys.

ConnectionTest:
description: Connection test results.
type: object
properties:
status:
type: boolean
description: The status of the request.
message:
type: string
description: The success or failure message of the request.

DAG:
type: object
description: DAG
Expand Down
9 changes: 9 additions & 0 deletions airflow/api_connexion/schemas/connection_schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ class Meta:

connection_id = auto_field('conn_id', required=True)
conn_type = auto_field(required=True)
description = auto_field()
host = auto_field()
login = auto_field()
schema = auto_field()
Expand Down Expand Up @@ -60,6 +61,14 @@ class ConnectionCollectionSchema(Schema):
total_entries = fields.Int()


class ConnectionTestSchema(Schema):
"""connection Test Schema"""

status = fields.Boolean(required=True)
message = fields.String(required=True)


connection_schema = ConnectionSchema()
connection_collection_item_schema = ConnectionCollectionItemSchema()
connection_collection_schema = ConnectionCollectionSchema()
connection_test_schema = ConnectionTestSchema()
16 changes: 16 additions & 0 deletions airflow/hooks/dbapi.py
Original file line number Diff line number Diff line change
Expand Up @@ -350,3 +350,19 @@ def bulk_load(self, table, tmp_file):
:type tmp_file: str
"""
raise NotImplementedError()

def test_connection(self):
"""Tests the connection by executing a select 1 query"""
status, message = False, ''
try:
with closing(self.get_conn()) as conn:
with closing(conn.cursor()) as cur:
cur.execute("select 1")
if cur.fetchone():
status = True
message = 'Connection successfully tested'
except Exception as e: # noqa pylint: disable=broad-except
status = False
message = str(e)

return status, message
16 changes: 16 additions & 0 deletions airflow/models/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -348,6 +348,22 @@ def debug_info(self):
self.extra_dejson,
)

def test_connection(self):
"""Calls out get_hook method and executes test_connection method on that."""
status, message = False, ''
try:
hook = self.get_hook()
if getattr(hook, 'test_connection', False):
status, message = hook.test_connection()
else:
message = (
f"Hook {hook.__class__.__name__} doesn't implement or inherit test_connection method"
)
except Exception as e: # noqa pylint: disable=broad-except
message = str(e)

return status, message

@property
def extra_dejson(self) -> Dict:
"""Returns the extra property by deserializing json."""
Expand Down
1 change: 1 addition & 0 deletions airflow/www/extensions/init_jinja_globals.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ def prepare_jinja_globals():
'airflow_version': airflow_version,
'git_version': git_version,
'k8s_or_k8scelery_executor': IS_K8S_OR_K8SCELERY_EXECUTOR,
'rest_api_enabled': conf.get('api', 'auth_backend') != 'airflow.api.auth.backend.deny_all',
}

if 'analytics_tool' in conf.getsection('webserver'):
Expand Down
77 changes: 77 additions & 0 deletions airflow/www/static/js/connection_form.js
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,10 @@
* Created by janomar on 23/07/15.
*/

import getMetaValue from './meta_value';

const restApiEnabled = getMetaValue('rest_api_enabled') === 'True';

function decode(str) {
return new DOMParser().parseFromString(str, "text/html").documentElement.textContent
}
Expand Down Expand Up @@ -68,6 +72,18 @@ $(document).ready(function () {
const controlsContainer = getControlsContainer();
const connTypesToControlsMap = getConnTypesToControlsMap();

// Create a test connection button & insert it right next to the save (submit) button
const testConnBtn = $('<button id="test-connection" type="button" class="btn btn-sm btn-primary" ' +
'style="margin-left: 3px; pointer-events: all">Test\n <i class="fa fa-rocket"></i></button>');

if (!restApiEnabled) {
$(testConnBtn).addClass('disabled')
.attr('title', 'Airflow REST APIs have been disabled. '
+ 'See api->auth_backend section of the Airflow configuration.');
}

$(testConnBtn).insertAfter($('form#model_form div.well.well-sm button:submit'));

/**
* Changes the connection type.
* @param {string} connType The connection type to change to.
Expand Down Expand Up @@ -140,6 +156,67 @@ $(document).ready(function () {
}
}

/**
* Produces JSON stringified data from a html form data
*
* @param {string} selector Jquery from selector string.
* @returns {string} Form data as a JSON string
*/
function getSerializedFormData(selector) {
const outObj = {};
const inArray = $(selector).serializeArray();

$.each(inArray, function () {
if (this.name === 'conn_id') {
outObj.connection_id = this.value;
} else if (this.value !== '' && this.name !== 'csrf_token') {
outObj[this.name] = this.value;
}
});

return JSON.stringify(outObj);
}

/**
* Displays the Flask style alert on UI via JS
*
* @param {boolean} status - true for success, false for error
* @param {string} message - The text message to show in alert box
*/
function displayAlert(status, message) {
const alertClass = status ? 'alert-success' : 'alert-error';
let alertBox = $('.container .row .alert');
if (alertBox.length) {
alertBox.removeClass('alert-success').removeClass('alert-error');
alertBox.addClass(alertClass);
alertBox.text(message);
alertBox.show();
} else {
alertBox = $('<div class="alert ' + alertClass + '">\n' +
'<button type="button" class="close" data-dismiss="alert">×</button>\n' + message + '</div>');

$('.container .row').prepend(alertBox).show();
}
}

// Bind click event to Test Connection button & perform an AJAX call via REST API
$('#test-connection').on('click', (e) => {
e.preventDefault();
$.ajax({
url: '/api/v1/connections/test',
type: 'post',
contentType: 'application/json',
dataType: 'json',
data: getSerializedFormData('form#model_form'),
success(data) {
displayAlert(data.status, data.message);
},
error(jq, err, msg) {
displayAlert(false, msg);
},
});
});

const connTypeElem = document.getElementById('conn_type');
$(connTypeElem).on('change', (e) => {
connType = e.target.value;
Expand Down
5 changes: 5 additions & 0 deletions airflow/www/templates/airflow/conn_create.html
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,11 @@

{% extends 'appbuilder/general/model/add.html' %}

{% block head_css %}
{{ super() }}
<meta name="rest_api_enabled" content="{{ rest_api_enabled }}">
{% endblock %}

{% block tail %}
{{ super() }}
<script src="{{ url_for_asset('connectionForm.js') }}"></script>
Expand Down
5 changes: 5 additions & 0 deletions airflow/www/templates/airflow/conn_edit.html
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,11 @@

{% extends 'appbuilder/general/model/edit.html' %}

{% block head_css %}
{{ super() }}
<meta name="rest_api_enabled" content="{{ rest_api_enabled }}">
{% endblock %}

{% block tail %}
{{ super() }}
<script src="{{ url_for_asset(filename='connectionForm.js') }}"></script>
Expand Down
22 changes: 22 additions & 0 deletions docs/apache-airflow/howto/connection.rst
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ variables.
See the :doc:`Connections Concepts </concepts/connections>` documentation for
more information.

.. _creating_connection_ui:

Creating a Connection with the UI
---------------------------------

Expand All @@ -48,6 +50,8 @@ to create a new connection.
belonging to the different connection types.
4. Click the ``Save`` button to create the connection.

.. _editing_connection_ui:

Editing a Connection with the UI
--------------------------------

Expand Down Expand Up @@ -360,6 +364,24 @@ In addition to retrieving connections from environment variables or the metastor
an secrets backend to retrieve connections. For more details see :doc:`/security/secrets/secrets-backend/index`.


Test Connections
----------------

Airflow Web UI & API allows to test connections. The test connection feature can be used from
:ref:`create <creating_connection_ui>` or :ref:`edit <editing_connection_ui>` connection page, or through calling
:doc:`Connections REST API </stable-rest-api-ref/>`.

To test a connection Airflow calls out the ``test_connection`` method from the associated hook class and reports the
results of it. It may happen that the connection type does not have any associated hook or the hook doesn't have the
``test_connection`` method implementation, in either case the error message will throw the proper error message.

One important point to note is that the connections will be tested from the webserver only, so this feature is
subject to network egress rules setup for your webserver. Also, if webserver & worker machines have different libs or
provider packages installed then the test results might differ.

Last caveat is that this feature won't be available for the connections coming out of the secrets backends.


Custom connection types
-----------------------

Expand Down
Loading