Skip to content

Commit

Permalink
Merge branch 'tensorturtle-ftms-dev'
Browse files Browse the repository at this point in the history
  • Loading branch information
zacharyedwardbull committed Feb 4, 2023
2 parents 428ca92 + 80f2e06 commit 64a0c70
Show file tree
Hide file tree
Showing 9 changed files with 926 additions and 11 deletions.
25 changes: 14 additions & 11 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,22 +17,23 @@ I have tested it with (tested protocols in brackets):
- an Elite Sterzo Smart steering plate (STERZO)
- a pair of Garmin Vector 3 power meter pedals (CPS)
- a Garmin RVR315 rear view radar (RDR)
- a S3+ Speed/Cadence sensor (CSCS)
- a Magene S3+ Speed/Cadence sensor (CSCS)
- a Elite Suito-T smart trainer (CPS, CSCS, FTMS)

Please let me know if you have used it with another device, and I will add it to the list.

## Supported protocols

Protocol name | Fully supported | Partially supported | Not supported
--- | --- | --- | ---
Battery Service (BAS)| | ✓ |
Cycling Speed and Cadence Service (CSCS) | | ✓ |
Cycling Power Service (CPS) | | ✓ |
Elite Sterzo Steering Service (STERZO)| | ✓ |
FiTness Machine Service (FTMS) | | |✓
Heart Rate Service (HRS)| | ✓ |
Tacx Trainer Control (ANT+ FE-C over BLE) | | ✓ |
Rear View Radar (RDR) | | ✓ |
| Protocol name | Fully supported | Partially supported |
|-------------------------------------------|-----------------|---------------------|
| Battery Service (BAS) | | |
| Cycling Speed and Cadence Service (CSCS) | | |
| Cycling Power Service (CPS) | | |
| Elite Sterzo Steering Service (STERZO) | | |
| FiTness Machine Service (FTMS) | ||
| Heart Rate Service (HRS) | | |
| Tacx Trainer Control (ANT+ FE-C over BLE) | | |
| Rear View Radar (RDR) | | |

## Installation
Clone this repo and then run the following command from the root directory
Expand All @@ -56,6 +57,8 @@ If you would like to add support for another cycling related protocol, that woul
* Tacx Trainer Control documentation: https://github.com/jedla22/BleTrainerControl/blob/master/How-to%20FE-C%20over%20BLE%20v1_0_0.pdf
* Reverse engineering Sterzo Smart: https://www.youtube.com/watch?v=BPVFjz5zD4g
* Reverse engineering Garmin RVR315 (radar): https://forums.garmin.com/developer/connect-iq/f/discussion/240452/bluetooth-profile-for-garmin-varia-rtl515#pifragment-1298=3
* Bluetooth FTMS Specifications: https://www.bluetooth.org/DocMan/handlers/DownloadDoc.ashx?doc_id=423422
* Huawei HMSCore Bluetooth LE documentation (FTMS): https://developer.huawei.com/consumer/en/doc/development/HMSCore-Guides/ibd-0000001051005923

## Projects using pycycling
* cycling-cadence-display: https://github.com/cboddy/cycling-cadence-display
131 changes: 131 additions & 0 deletions examples/fitness_machine_service_example.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
import asyncio
from bleak import BleakClient
from pycycling.fitness_machine_service import FitnessMachineService

async def run(address):
async with BleakClient(address, timeout=10) as client:
ftms = FitnessMachineService(client)

# Print all 'read' characteristics

supported_resistance_level_range = (
await ftms.get_supported_resistance_level_range()
)

print("Supported resistance level range:")
print(supported_resistance_level_range)
print()

max_resistance = supported_resistance_level_range.maximum_resistance

supported_power_range = await ftms.get_supported_power_range()

print("Supported power range:")
print(supported_power_range)
print()

max_power = supported_power_range.maximum_power

fitness_machine_feature = await ftms.get_fitness_machine_feature()

print("Fitness machine feature:")
print(fitness_machine_feature)
print()

def print_indoor_bike_data(data):
print("Received indoor bike data:")
print(data)
print()

# Start receiving and printing 'notify' characteristics
ftms.set_indoor_bike_data_handler(print_indoor_bike_data)
await ftms.enable_indoor_bike_data_notify()

def print_fitness_machine_status(data):
print("Received fitness machine status:")
print("\t" + str(data))
print()

ftms.set_fitness_machine_status_handler(print_fitness_machine_status)
await ftms.enable_fitness_machine_status_notify()

def print_training_status(data):
print("Received training status:")
print(data)
print()

ftms.set_training_status_handler(print_training_status)
await ftms.enable_training_status_notify()

# Write to 'write' characteristics
# IMPORTANT: Before being able to write, the client (this script) must

# 1. Start receiving 'indicate' notifications from the control point characteristic
def print_control_point_response(message):
print("Received control point response:")
print(message)
print()

ftms.set_control_point_response_handler(print_control_point_response)
await ftms.enable_control_point_indicate()
# 2. 'write' a request to control the fitness machine
await ftms.request_control()
# 3. (recommended) 'write' a reset command
await ftms.reset()

print(
"Setting target resistance level to 25 percent of maximum resistance level..."
)
await ftms.set_target_resistance_level(max_resistance * 0.25)

await asyncio.sleep(5)

print(
"Increasing target resistance level to 50 percent of maximum resistance level..."
)
await ftms.set_target_resistance_level(max_resistance * 0.5)

await asyncio.sleep(5)

# Reset target resistance level
print("Resetting target resistance level...")

await ftms.reset()

power_level = 4 / 100 * max_power
print(f"Increasing target power to 4 percent of maximum power ({power_level}W).")
print("The trainer will automatically adjust resistance based on your leg speed.")
print("Try pedaling above {power_level}W to feel decreasing resistance, and vice versa.")
await ftms.set_target_power(power_level)

await asyncio.sleep(30)

# Reset
print("Resetting target power...")
await ftms.reset()


if __name__ == "__main__":
import os

os.environ["PYTHONASYNCIODEBUG"] = str(1)

device_address = "DEVICE_ADDRESS HERE"
loop = asyncio.get_event_loop()
loop.run_until_complete(run(device_address))

# Unresolved intermittent bug when writing to the control point characteristic:
# bleak.exc.BleakDBusError: [org.bluez.Error.Failed] Operation failed with ATT error: 0x80 (Unknown code)
#
# To work around this, just retry on error, like:
#
# import bleak
# while True:
# try:
# loop.run_until_complete(run(device_address))
# except bleak.exc.BleakDBusError as e:
# print("BleakDBusError, retrying...")
# print(e)
# continue
# except KeyboardInterrupt:
# break
218 changes: 218 additions & 0 deletions pycycling/fitness_machine_service.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,218 @@
"""
Interact with a Fitness Machine Service (FTMS) Bluetooth LE device.
Example
=======
This example demonstrates all cycling-related functionalities of a FTMS indoor cycling device.
Please see also information on :ref:`obtaining the Bluetooth address of your device <obtaining_device_address>`.
First, it prints all 'read' characteristics:
+ Supported resistance level range
+ Supported power range
+ Fitness machine features
Then, it starts 'notify' characteristics, which stream data from the device:
+ Indoor bike data (speed, cadence, distance, resistance level, power, time)
Finally, it modifies 'write' characteristics with some time in between:
+ Resistance level
+ Target power (automatically adjusts resistance level based on cadence to maintain same power)
.. literalinclude:: ../examples/fitness_machine_service_example.py
"""
from collections import namedtuple

from pycycling.ftms_parsers import (
parse_fitness_machine_status,
parse_indoor_bike_data,
parse_fitness_machine_feature,
parse_training_status,
parse_control_point_response,
form_ftms_control_command,
FTMSControlPointOpCode,
FitnessMachineFeature,
)

# read: Supported Resistance Level Range
ftms_supported_resistance_level_range_characteristic_id = (
"00002ad6-0000-1000-8000-00805f9b34fb"
)
# read: Supported Power Range
ftms_supported_power_range_characteristic_id = "00002ad8-0000-1000-8000-00805f9b34fb"
# (read): Fitness Machine Feature
ftms_fitness_machine_feature_characteristic_id = "00002acc-0000-1000-8000-00805f9b34fb"
# notify: Indoor Bike Data
ftms_indoor_bike_data_characteristic_id = "00002ad2-0000-1000-8000-00805f9b34fb"
# notify: Fitness Machine Status
ftms_fitness_machine_status_characteristic_id = "00002ad3-0000-1000-8000-00805f9b34fb"
# notify: Training Status
ftms_training_status_characteristic_id = "00002ad3-0000-1000-8000-00805f9b34fb"
# (write, indicate): Fitness Machine Control Point
ftms_fitness_machine_control_point_characteristic_id = (
"00002ad9-0000-1000-8000-00805f9b34fb"
)

SupportedResistanceLevelRange = namedtuple(
"SupportedResistanceLevelRange",
["minimum_resistance", "maximum_resistance", "minimum_increment"],
)


def _parse_supported_resistance_level_range(message: bytearray) -> SupportedResistanceLevelRange:
minimum_resistance = int.from_bytes(message[0:2], "little")
maximum_resistance = int.from_bytes(message[2:4], "little")
minimum_increment = int.from_bytes(message[4:6], "little")
return SupportedResistanceLevelRange(
minimum_resistance, maximum_resistance, minimum_increment
)


SupportedPowerRange = namedtuple(
"SupportedPowerRange",
["minimum_power", "maximum_power", "minimum_increment"],
)


def _parse_supported_power_range(message: bytearray) -> SupportedPowerRange:
minimum_power = int.from_bytes(message[0:2], "little")
maximum_power = int.from_bytes(message[2:4], "little")
minimum_increment = int.from_bytes(message[4:6], "little")
return SupportedPowerRange(minimum_power, maximum_power, minimum_increment)


class FitnessMachineService:
def __init__(self, client):
self._client = client
self._control_point_response_callback = None
self._indoor_bike_data_callback = None
self._fitness_machine_status_callback = None
self._training_status_callback = None

# === READ Characteristics ===
async def get_supported_resistance_level_range(self) -> SupportedResistanceLevelRange:
message = await self._client.read_gatt_char(
ftms_supported_resistance_level_range_characteristic_id
)
return _parse_supported_resistance_level_range(message)

async def get_supported_power_range(self) -> SupportedPowerRange:
message = await self._client.read_gatt_char(
ftms_supported_power_range_characteristic_id
)
return _parse_supported_power_range(message)

async def get_fitness_machine_feature(self) -> FitnessMachineFeature:
message = await self._client.read_gatt_char(
ftms_fitness_machine_feature_characteristic_id
)
return parse_fitness_machine_feature(message)

# === NOTIFY Characteristics ===
# ====== Indoor Bike Data ======
async def enable_indoor_bike_data_notify(self) -> None:
await self._client.start_notify(
ftms_indoor_bike_data_characteristic_id,
self._indoor_bike_data_notification_handler,
)

async def disable_indoor_bike_data_notify(self):
await self._client.stop_notify(ftms_indoor_bike_data_characteristic_id)

def set_indoor_bike_data_handler(self, callback):
self._indoor_bike_data_callback = callback

def _indoor_bike_data_notification_handler(
self, sender, data
): # pylint: disable=unused-argument
if self._indoor_bike_data_callback is not None:
self._indoor_bike_data_callback(parse_indoor_bike_data(data))

# ====== Fitness Machine Status ======
async def enable_fitness_machine_status_notify(self) -> None:
await self._client.start_notify(
ftms_fitness_machine_status_characteristic_id,
self._fitness_machine_status_notification_handler,
)

async def disable_fitness_machine_status_notify(self):
await self._client.stop_notify(ftms_fitness_machine_status_characteristic_id)

def set_fitness_machine_status_handler(self, callback):
self._fitness_machine_status_callback = callback

def _fitness_machine_status_notification_handler(
self, sender, data
): # pylint: disable=unused-argument
if self._fitness_machine_status_callback is not None:
self._fitness_machine_status_callback(parse_fitness_machine_status(data))

# ====== Training Status ======
async def enable_training_status_notify(self) -> None:
await self._client.start_notify(
ftms_training_status_characteristic_id,
self._training_status_notification_handler,
)

async def disable_training_status_notify(self):
await self._client.stop_notify(ftms_training_status_characteristic_id)

def set_training_status_handler(self, callback):
self._training_status_callback = callback

def _training_status_notification_handler(
self, sender, data
): # pylint: disable=unused-argument
if self._training_status_callback is not None:
self._training_status_callback(parse_training_status(data))

# === WRITE/INDICATE Characteristics ===
# ====== Fitness Machine Control Point ======
async def enable_control_point_indicate(self) -> None:
await self._client.start_notify(
ftms_fitness_machine_control_point_characteristic_id,
self._control_point_response_handler,
)

async def disable_control_point_indicate(self):
await self._client.stop_notify(
ftms_fitness_machine_control_point_characteristic_id
)

def set_control_point_response_handler(self, callback):
self._control_point_response_callback = callback

def _control_point_response_handler(
self, sender, data
): # pylint: disable=unused-argument
if self._control_point_response_callback is not None:
self._control_point_response_callback(parse_control_point_response(data))

async def request_control(self) -> None:
message = form_ftms_control_command(FTMSControlPointOpCode.REQUEST_CONTROL)
await self._client.write_gatt_char(
ftms_fitness_machine_control_point_characteristic_id, message, False
)

async def reset(self) -> None:
message = form_ftms_control_command(FTMSControlPointOpCode.RESET)
await self._client.write_gatt_char(
ftms_fitness_machine_control_point_characteristic_id, message, False
)

async def set_target_resistance_level(self, level: int) -> None:
message = form_ftms_control_command(
FTMSControlPointOpCode.SET_TARGET_RESISTANCE_LEVEL, int(level)
)
await self._client.write_gatt_char(
ftms_fitness_machine_control_point_characteristic_id, message, False
)

async def set_target_power(self, power: int) -> None:
message = form_ftms_control_command(
FTMSControlPointOpCode.SET_TARGET_POWER, int(power)
)
await self._client.write_gatt_char(
ftms_fitness_machine_control_point_characteristic_id, message, False
)
Loading

0 comments on commit 64a0c70

Please sign in to comment.