Skip to content

Commit 5d1510c

Browse files
authored
Merge pull request mtconnect#481 from mtconnect/350_xpath_for_mqtt_service_2
350 xpath for mqtt service 2
2 parents ef4c359 + f4f9869 commit 5d1510c

File tree

5 files changed

+75
-14
lines changed

5 files changed

+75
-14
lines changed

CMakeLists.txt

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,8 @@
11
# The version number.
22
set(AGENT_VERSION_MAJOR 2)
3-
set(AGENT_VERSION_MINOR 3)
3+
set(AGENT_VERSION_MINOR 4)
44
set(AGENT_VERSION_PATCH 0)
5-
set(AGENT_VERSION_BUILD 16)
5+
set(AGENT_VERSION_BUILD 1)
66
set(AGENT_VERSION_RC "")
77

88
# This minimum version is to support Visual Studio 2019 and C++ feature checking and FetchContent

README.md

Lines changed: 39 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -589,6 +589,10 @@ Configuration Parameters
589589
understand the internal workings of the agent.
590590

591591
*Default*: 1000
592+
593+
* `CreateUniqueIds`: Changes all the ids in each element to a UUID that will be unique across devices. This is used for merging devices from multiple sources.
594+
595+
*Default*: `false`
592596

593597
* `Devices` - The XML file to load that specifies the devices and is
594598
supplied as the result of a probe request. If the key is not found
@@ -604,23 +608,35 @@ Configuration Parameters
604608
* `JsonVersion` - JSON Printer format. Old format: 1, new format: 2
605609

606610
*Default*: 2
611+
612+
* `LogStreams` - Debugging flag to log the streamed data to a file. Logs to a file named: `Stream_` + timestamp + `.log` in the current working directory. This is only for the Rest Sink.
607613

608-
* `SchemaVersion` - Change the schema version to a different version number.
609-
610-
*Default*: 2.0
614+
*Default*: `false`
611615

612616
* `MaxAssets` - The maximum number of assets the agent can hold in its buffer. The
613617
number is the actual count, not an exponent.
614618

615619
*Default*: 1024
620+
621+
* `MaxCachedFileSize` - The maximum size of a raw file to cache in memory.
616622

617-
* `MonitorConfigFiles` - Monitor agent.cfg and Devices.xml files and restart agent if they change.
623+
*Default*: 20 kb
624+
625+
* `MinCompressFileSize` - The file size where we begin compressing raw files sent to the client.
618626

619-
*Default*: false
627+
*Default*: 100 kb
620628

621629
* `MinimumConfigReloadAge` - The minimum age of a config file before an agent reload is triggered (seconds).
622630

623-
*Default*: 15
631+
*Default*: 15 seconds
632+
633+
* `MonitorConfigFiles` - Monitor agent.cfg and Devices.xml files and restart agent if they change.
634+
635+
*Default*: false
636+
637+
* `MonitorInterval` - The interval between checks if the agent.cfg or Device.xml files have changed.
638+
639+
*Default*: 10 seconds
624640

625641
* `Pretty` - Pretty print the output with indententation
626642

@@ -630,6 +646,14 @@ Configuration Parameters
630646
process id of the daemon. This is not supported in Windows.
631647

632648
*Default*: agent.pid
649+
650+
* `SchemaVersion` - Change the schema version to a different version number.
651+
652+
*Default*: 2.0
653+
654+
* `Sender` - The value for the sender header attribute.
655+
656+
*Default*: Local machine name
633657

634658
* `ServiceName` - Changes the service name when installing or removing
635659
the service. This allows multiple agents to run as services on the same machine.
@@ -638,7 +662,11 @@ Configuration Parameters
638662

639663
* `SuppressIPAddress` - Suppress the Adapter IP Address and port when creating the Agent Device ids and names. This applies to all adapters.
640664

641-
*Default*: false
665+
*Default*: `false`
666+
667+
* `VersionDeviceXml` - Create a new versioned file every time the Device.xml file changes from an external source.
668+
669+
*Default*: `false`
642670

643671
* `WorkerThreads` - The number of operating system threads dedicated to the Agent
644672

@@ -867,6 +895,10 @@ Sinks {
867895
* `MqttQOS`: - For the MQTT Sinks, sets the Quality of Service. Must be one of `at_least_once`, `at_most_once`, `exactly_once`.
868896

869897
*Default*: `at_least_once`
898+
899+
* `MqttXPath`: - The xpath filter to apply to all current and samples published to MQTT. If the XPath is invalid, it will fall back to publishing all data items.
900+
901+
*Default*: All data items
870902

871903
### Adapter Configuration Items ###
872904

src/mtconnect/configuration/config_options.hpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -104,6 +104,7 @@ namespace mtconnect {
104104
DECLARE_CONFIGURATION(MqttPassword);
105105
DECLARE_CONFIGURATION(MqttMaxTopicDepth);
106106
DECLARE_CONFIGURATION(MqttLastWillTopic);
107+
DECLARE_CONFIGURATION(MqttXPath);
107108
///@}
108109

109110
/// @name Adapter Configuration

src/mtconnect/sink/mqtt_sink/mqtt2_service.cpp

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,7 @@ namespace mtconnect {
7070
{configuration::MqttUserName, string()},
7171
{configuration::MqttPassword, string()},
7272
{configuration::MqttPort, int()},
73+
{configuration::MqttXPath, string()},
7374
{configuration::MqttRetain, bool()},
7475
{configuration::MqttQOS, string()},
7576
{configuration::MqttHost, string()}});
@@ -229,7 +230,7 @@ namespace mtconnect {
229230
auto seq = publishCurrent(boost::system::error_code {});
230231
for (auto &dev : m_sinkContract->getDevices())
231232
{
232-
FilterSet filterSet = filterForDevice(dev);
233+
FilterSet filterSet { filterForDevice(dev) };
233234
auto sampler =
234235
make_shared<AsyncSample>(m_strand, m_sinkContract->getCircularBuffer(),
235236
std::move(filterSet), m_sampleInterval, 600s, m_client, dev);

src/mtconnect/sink/mqtt_sink/mqtt2_service.hpp

Lines changed: 31 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -128,11 +128,35 @@ namespace mtconnect {
128128
auto pos = m_filters.emplace(*(device->getUuid()), FilterSet());
129129
filter = pos.first;
130130
auto &set = filter->second;
131-
for (const auto &wdi : device->getDeviceDataItems())
131+
132+
auto xpath = GetOption<string>(m_options, configuration::MqttXPath);
133+
if (xpath)
134+
{
135+
try
136+
{
137+
m_sinkContract->getDataItemsForPath(device, xpath, set, nullopt);
138+
}
139+
catch (exception &e)
140+
{
141+
LOG(warning) << "MqttService: Invalid xpath '" << *xpath <<
142+
"', defaulting to all data items";
143+
}
144+
145+
if (set.empty())
146+
{
147+
LOG(warning) << "MqttService: Invalid xpath '" << *xpath <<
148+
"', defaulting to all data items";
149+
}
150+
}
151+
152+
if (set.empty())
132153
{
133-
const auto di = wdi.lock();
134-
if (di)
135-
set.insert(di->getId());
154+
for (const auto &wdi : device->getDeviceDataItems())
155+
{
156+
const auto di = wdi.lock();
157+
if (di)
158+
set.insert(di->getId());
159+
}
136160
}
137161
}
138162
return filter->second;
@@ -208,6 +232,9 @@ namespace mtconnect {
208232

209233
bool m_retain {true};
210234
MqttClient::QOS m_qos {MqttClient::QOS::at_least_once};
235+
236+
// For XPath
237+
211238
};
212239
} // namespace mqtt_sink
213240
} // namespace sink

0 commit comments

Comments
 (0)