Skip to content
This repository was archived by the owner on Nov 14, 2022. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 8 additions & 9 deletions .github/workflows/run-tests.yml
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
name: Tests
on: [ push ]
on: [push]

jobs:
pytest:
Expand All @@ -8,7 +8,7 @@ jobs:
strategy:
fail-fast: false
matrix:
python-version: ['3.8']
python-version: ["3.8"]

steps:
- uses: actions/checkout@v2
Expand All @@ -34,14 +34,14 @@ jobs:
with:
repository: "autoreduction/autoreduce-workspace"

- name: Run unit tests
run: >-
RUNNING_VIA_PYTEST=true pytest --ignore=autoreduce_qp/systemtests --cov=autoreduce_qp --cov-report=xml -v autoreduce_qp

- name: Run system tests
run: |
RUNNING_VIA_PYTEST=true pytest autoreduce_qp/systemtests --cov-append -v

- name: Run unit tests
run: >-
RUNNING_VIA_PYTEST=true pytest --ignore=autoreduce_qp/systemtests --cov=autoreduce_qp --cov-report=xml -v autoreduce_qp

- uses: codecov/codecov-action@v2
with:
files: ./coverage.xml
Expand All @@ -66,7 +66,7 @@ jobs:
strategy:
fail-fast: false
matrix:
python-version: ['3.6', '3.8']
python-version: ["3.6", "3.8"]

steps:
- uses: actions/checkout@v2
Expand Down Expand Up @@ -95,7 +95,7 @@ jobs:
strategy:
fail-fast: false
matrix:
python-version: ['3.8']
python-version: ["3.8"]

steps:
- uses: actions/checkout@v2
Expand All @@ -119,4 +119,3 @@ jobs:
- uses: autoreduction/autoreduce-actions/code_inspection@main
with:
package_name: autoreduce_qp

29 changes: 9 additions & 20 deletions autoreduce_qp/queue_processor/confluent_consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,15 @@
from pydantic import ValidationError
from confluent_kafka import DeserializingConsumer, KafkaException
from confluent_kafka.serialization import StringDeserializer
from confluent_kafka.admin import AdminClient, NewTopic
from autoreduce_utils.clients.connection_exception import ConnectionException
from autoreduce_utils.message.message import Message
from autoreduce_utils.clients.producer import Publisher
from autoreduce_utils.clients.kafka_utils import kafka_config_from_env
from autoreduce_qp.queue_processor.handle_message import HandleMessage

TRANSACTIONS_TOPIC = os.getenv('KAFKA_TOPIC')
KAFKA_BROKER_URL = os.getenv("KAFKA_BROKER_URL")
GROUP_ID = 'mygroup'
GROUP_ID = 'data_ready-group'


class Consumer(threading.Thread):
Expand All @@ -26,16 +26,6 @@ def __init__(self, consumer=None):
self.logger = logging.getLogger(__package__)
self.logger.debug("Initializing the consumer")

kafka_broker = {'bootstrap.servers': KAFKA_BROKER_URL}
admin_client = AdminClient(kafka_broker)
topics = admin_client.list_topics().topics

if not topics:
# Create the topic
self.logger.info("Creating the topic '%s'", TRANSACTIONS_TOPIC)
new_topic = NewTopic(TRANSACTIONS_TOPIC, num_partitions=1, replication_factor=1)
admin_client.create_topics([new_topic])

self.consumer = consumer
self.message_handler = HandleMessage()
self._stop_event = threading.Event()
Expand All @@ -49,14 +39,13 @@ def __init__(self, consumer=None):
try:
self.logger.debug("Getting the kafka consumer")

config = {
'bootstrap.servers': KAFKA_BROKER_URL,
'group.id': GROUP_ID,
'auto.offset.reset': "earliest",
"on_commit": self.on_commit,
'key.deserializer': StringDeserializer('utf_8'),
'value.deserializer': StringDeserializer('utf_8')
}
config = kafka_config_from_env()

config['key.deserializer'] = StringDeserializer('utf_8')
config['value.deserializer'] = StringDeserializer('utf_8')
config['on_commit'] = self.on_commit
config['group.id'] = GROUP_ID
config['auto.offset.reset'] = 'earliest'
self.consumer = DeserializingConsumer(config)
except KafkaException as err:
self.logger.error("Could not initialize the consumer: %s", err)
Expand Down
6 changes: 3 additions & 3 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,13 @@

setup(
name="autoreduce_qp",
version="22.0.0.dev35", # when updating the version here make sure to also update qp_mantid_python36.D
version="22.0.0.dev36", # when updating the version here make sure to also update qp_mantid_python36.D
description="ISIS Autoreduction queue processor",
author="ISIS Autoreduction Team",
url="https://github.com/autoreduction/autoreduce/",
install_requires=[
"autoreduce_db==22.0.0.dev34", "Django>=3.2.10", "fire==0.4.0", "plotly==5.3.1", "kaleido==0.2.1", "stomp.py",
"docker==5.0.3", "confluent-kafka==1.8.2"
"autoreduce_db==22.0.0.dev35", "Django>=3.2.10", "fire==0.4.0", "plotly==5.3.1", "kaleido==0.2.1", "stomp.py",
"docker==5.0.3", "confluent-kafka"
],
packages=find_packages(),
entry_points={
Expand Down