Source code for bleak.backends.bluezdbus.utils

import sys
from typing import TYPE_CHECKING

if TYPE_CHECKING:
    if sys.platform != "linux":
        assert False, "This backend is only available on Linux"

import os
import re
from typing import Optional

from dbus_fast.auth import AuthExternal
from dbus_fast.constants import MessageType
from dbus_fast.message import Message

from bleak.backends.bluezdbus import defs
from bleak.exc import (
    BleakDBusError,
    BleakError,
    BleakGATTProtocolError,
    BleakGATTProtocolErrorCode,
)


[docs] def assert_reply(reply: Message) -> None: """Checks that a D-Bus message is a valid reply. Raises: BleakDBusError: if the message type is ``MessageType.ERROR`` AssertionError: if the message type is not ``MessageType.METHOD_RETURN`` """ if reply.message_type == MessageType.ERROR: assert reply.error_name raise BleakDBusError(reply.error_name, reply.body) assert reply.message_type == MessageType.METHOD_RETURN
[docs] def assert_gatt_reply(reply: Message, start_notify: bool = False) -> None: """ Checks that a D-Bus message is a valid reply. Like :func:`assert_reply`, but has special handling for GATT protocol errors. Args: reply: The D-Bus message to check. start_notify: Whether this reply is for a StartNotify call. Raises: BleakGATTProtocolError: for specific GATT protocol errors. BleakDBusError: if the message type is ``MessageType.ERROR`` AssertionError: if the message type is not ``MessageType.METHOD_RETURN`` """ # BlueZ has specific errors for some GATT protocol errors, so we # have to turn them back into the generic BleakGATTProtocolError # with the correct code. See create_gatt_dbus_error() in BlueZ source. if reply.error_name == defs.BLUEZ_ERROR_NOT_PERMITTED: # Same error is used for both read and write not permitted, so we have # to use the message to discriminate. if reply.body and reply.body[0] == "Read not permitted": raise BleakGATTProtocolError(BleakGATTProtocolErrorCode.READ_NOT_PERMITTED) if reply.body and reply.body[0] == "Write not permitted": raise BleakGATTProtocolError(BleakGATTProtocolErrorCode.WRITE_NOT_PERMITTED) # REVISIT: could also be "Not paired" which could be any of: # INSUFFICIENT_AUTHENTICATION, INSUFFICIENT_ENCRYPTION, or # INSUFFICIENT_ENCRYPTION_KEY_SIZE # "StartNotify" will return BLUEZ_ERROR_NOT_SUPPORTED if the characteristic # does not support notifications before even trying, so it is not a GATT # error in this case. if not start_notify and reply.error_name == defs.BLUEZ_ERROR_NOT_SUPPORTED: raise BleakGATTProtocolError(BleakGATTProtocolErrorCode.REQUEST_NOT_SUPPORTED) if reply.error_name == defs.BLUEZ_ERROR_NOT_AUTHORIZED: raise BleakGATTProtocolError( BleakGATTProtocolErrorCode.INSUFFICIENT_AUTHORIZATION ) if reply.error_name == defs.BLUEZ_ERROR_INVALID_ARGUMENT: if reply.body and reply.body[0] == "Invalid offset": raise BleakGATTProtocolError(BleakGATTProtocolErrorCode.INVALID_OFFSET) if reply.body and reply.body[0] == "Invalid Length": raise BleakGATTProtocolError( BleakGATTProtocolErrorCode.INVALID_ATTRIBUTE_VALUE_LENGTH ) if reply.error_name == defs.BLUEZ_ERROR_IMPROPERLY_CONFIGURED: raise BleakGATTProtocolError( BleakGATTProtocolErrorCode.CCCD_IMPROPERLY_CONFIGURED ) if ( reply.error_name == defs.BLUEZ_ERROR_FAILED and reply.body and ( # Unfortunately, BlueZ makes us scrape the string to get the # error code in this case. match := re.match( r"^Operation failed with ATT error: (0x[0-9a-fA-F]+)", reply.body[0], ) ) ): raise BleakGATTProtocolError( BleakGATTProtocolErrorCode(int(match.group(1), 16)) ) assert_reply(reply)
[docs] def extract_service_handle_from_path(path: str) -> int: try: return int(path[-4:], 16) except Exception as e: raise BleakError(f"Could not parse service handle from path: {path}") from e
[docs] def device_path_from_characteristic_path(characteristic_path: str) -> str: """ Scrape the device path from a D-Bus characteristic path. Args: characteristic_path: The D-Bus object path of the characteristic. Returns: A D-Bus object path of the device. """ # /org/bluez/hci1/dev_FA_23_9D_AA_45_46/service000c/char000d return characteristic_path[:-21]
[docs] def get_dbus_authenticator() -> Optional[AuthExternal]: uid = None try: uid = int(os.environ.get("BLEAK_DBUS_AUTH_UID", "")) except ValueError: pass auth = None if uid is not None: auth = AuthExternal(uid=uid) return auth