Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,19 +18,17 @@

from __future__ import annotations

import json
import logging
import os
from typing import Any

import pytest
from confluent_kafka import Producer

from airflow.models import Connection

# Import Operator
from airflow.providers.apache.kafka.operators.consume import ConsumeFromTopicOperator

from tests_common.test_utils.config import conf_vars

log = logging.getLogger(__name__)


Expand All @@ -52,32 +50,27 @@ def _basic_message_tester(message, test=None) -> Any:


@pytest.mark.integration("kafka")
@conf_vars(
{
(
"connections",
"operator.consumer.test.integration.test_1",
): "kafka://broker:29092?socket.timeout.ms=10&bootstrap.servers=broker:29092&group.id=operator.consumer.test.integration.test_1&enable.auto.commit=False&auto.offset.reset=beginning",
(
"connections",
"operator.consumer.test.integration.test_2",
): "kafka://broker:29092?socket.timeout.ms=10&bootstrap.servers=broker:29092&group.id=operator.consumer.test.integration.test_2&enable.auto.commit=False&auto.offset.reset=beginning",
(
"connections",
"operator.consumer.test.integration.test_3",
): "kafka://broker:29092?socket.timeout.ms=10&bootstrap.servers=broker:29092&group.id=operator.consumer.test.integration.test_3&enable.auto.commit=False&auto.offset.reset=beginning",
}
)
class TestConsumeFromTopic:
"""
test ConsumeFromTopicOperator
"""

def setup_method(self):
"""Set up connections for each test method."""
# Create separate connections for each test
for num in (1, 2, 3):
conn = Connection(
conn_id=f"operator.consumer.test.integration.test_{num}",
conn_type="kafka",
extra=json.dumps(
{
"socket.timeout.ms": 10,
"bootstrap.servers": "broker:29092",
"group.id": f"operator.consumer.test.integration.test_{num}",
"enable.auto.commit": False,
"auto.offset.reset": "beginning",
}
),
)

env_var_name = f"AIRFLOW_CONN_{conn.conn_id.upper()}"
os.environ[env_var_name] = conn.get_uri()

def test_consumer_operator_test_1(self):
"""test consumer works with string import"""

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,14 @@

import json
import logging
import os

import pytest
from confluent_kafka import Consumer

from airflow.models import Connection
from airflow.providers.apache.kafka.operators.produce import ProduceToTopicOperator

from tests_common.test_utils.config import conf_vars

log = logging.getLogger(__name__)


Expand All @@ -35,33 +35,23 @@ def _producer_function():


@pytest.mark.integration("kafka")
@conf_vars(
{
(
"connections",
"kafka_default_test_1",
): "kafka://broker:29092?socket.timeout.ms=10&message.timeout.ms=10&group.id=operator.producer.test.integration.test_1",
(
"connections",
"kafka_default_test_2",
): "kafka://broker:29092?socket.timeout.ms=10&message.timeout.ms=10&group.id=operator.producer.test.integration.test_2",
}
)
class TestProduceToTopic:
"""
test ProduceToTopicOperator
"""

def setup_method(self):
"""Set up connections for each test method."""
# Create separate connections for each test
for num in (1, 2):
group = f"operator.producer.test.integration.test_{num}"
conn = Connection(
conn_id=f"kafka_default_test_{num}",
conn_type="kafka",
extra=json.dumps(
{
"socket.timeout.ms": 10,
"message.timeout.ms": 10,
"bootstrap.servers": "broker:29092",
"group.id": group,
}
),
)

# Set environment variable directly (like create_connection_without_db does)
env_var_name = f"AIRFLOW_CONN_{conn.conn_id.upper()}"
os.environ[env_var_name] = conn.get_uri()

def test_producer_operator_test_1(self):
GROUP = "operator.producer.test.integration.test_1"
TOPIC = "operator.producer.test.integration.test_1"
Expand Down