# Copyright 2021-2022 Google LLC # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # https://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # ----------------------------------------------------------------------------- # Imports # ----------------------------------------------------------------------------- import asyncio import sys import os import logging import struct import websockets import json from colors import color from bumble.core import AdvertisingData from bumble.device import Device, Connection, Peer from bumble.utils import AsyncRunner from bumble.transport import open_transport_or_link from bumble.gatt import ( Descriptor, Service, Characteristic, CharacteristicValue, GATT_DEVICE_INFORMATION_SERVICE, GATT_DEVICE_HUMAN_INTERFACE_DEVICE_SERVICE, GATT_DEVICE_BATTERY_SERVICE, GATT_BATTERY_LEVEL_CHARACTERISTIC, GATT_MANUFACTURER_NAME_STRING_CHARACTERISTIC, GATT_REPORT_CHARACTERISTIC, GATT_REPORT_MAP_CHARACTERISTIC, GATT_PROTOCOL_MODE_CHARACTERISTIC, GATT_HID_INFORMATION_CHARACTERISTIC, GATT_HID_CONTROL_POINT_CHARACTERISTIC, GATT_REPORT_REFERENCE_DESCRIPTOR ) # ----------------------------------------------------------------------------- # Protocol Modes HID_BOOT_PROTOCOL = 0x00 HID_REPORT_PROTOCOL = 0x01 # Report Types HID_INPUT_REPORT = 0x01 HID_OUTPUT_REPORT = 0x02 HID_FEATURE_REPORT = 0x03 # Report Map HID_KEYBOARD_REPORT_MAP = bytes([ 0x05, 0x01, # Usage Page (Generic Desktop Ctrls) 0x09, 0x06, # Usage (Keyboard) 0xA1, 0x01, # Collection (Application) 0x85, 0x01, # . Report ID (1) 0x05, 0x07, # . Usage Page (Kbrd/Keypad) 0x19, 0xE0, # . Usage Minimum (0xE0) 0x29, 0xE7, # . Usage Maximum (0xE7) 0x15, 0x00, # . Logical Minimum (0) 0x25, 0x01, # . Logical Maximum (1) 0x75, 0x01, # . Report Size (1) 0x95, 0x08, # . Report Count (8) 0x81, 0x02, # . Input (Data,Var,Abs,No Wrap,Linear,Preferred State,No Null Position) 0x95, 0x01, # . Report Count (1) 0x75, 0x08, # . Report Size (8) 0x81, 0x01, # . Input (Const,Array,Abs,No Wrap,Linear,Preferred State,No Null Position) 0x95, 0x06, # . Report Count (6) 0x75, 0x08, # . Report Size (8) 0x15, 0x00, # . Logical Minimum (0x00) 0x25, 0x94, # . Logical Maximum (0x94) 0x05, 0x07, # . Usage Page (Kbrd/Keypad) 0x19, 0x00, # . Usage Minimum (0x00) 0x29, 0x94, # . Usage Maximum (0x94) 0x81, 0x00, # . Input (Data,Array,Abs,No Wrap,Linear,Preferred State,No Null Position) 0x95, 0x05, # . Report Count (5) 0x75, 0x01, # . Report Size (1) 0x05, 0x08, # . Usage Page (LEDs) 0x19, 0x01, # . Usage Minimum (Num Lock) 0x29, 0x05, # . Usage Maximum (Kana) 0x91, 0x02, # . Output (Data,Var,Abs,No Wrap,Linear,Preferred State,No Null Position,Non-volatile) 0x95, 0x01, # . Report Count (1) 0x75, 0x03, # . Report Size (3) 0x91, 0x01, # . Output (Const,Array,Abs,No Wrap,Linear,Preferred State,No Null Position,Non-volatile) 0xC0 # End Collection ]) # ----------------------------------------------------------------------------- class ServerListener(Device.Listener, Connection.Listener): def __init__(self, device): self.device = device @AsyncRunner.run_in_task() async def on_connection(self, connection): print(f'=== Connected to {connection}') connection.listener = self @AsyncRunner.run_in_task() async def on_disconnection(self, reason): print(f'### Disconnected, reason={reason}') # ----------------------------------------------------------------------------- def on_hid_control_point_write(connection, value): print(f'Control Point Write: {value}') # ----------------------------------------------------------------------------- def on_report(characteristic, value): print(color('Report:', 'cyan'), value.hex(), 'from', characteristic) # ----------------------------------------------------------------------------- async def keyboard_host(device, peer_address): await device.power_on() connection = await device.connect(peer_address) await connection.pair() peer = Peer(connection) await peer.discover_service(GATT_DEVICE_HUMAN_INTERFACE_DEVICE_SERVICE) hid_services = peer.get_services_by_uuid(GATT_DEVICE_HUMAN_INTERFACE_DEVICE_SERVICE) if not hid_services: print(color('!!! No HID service', 'red')) return await peer.discover_characteristics() protocol_mode_characteristics = peer.get_characteristics_by_uuid(GATT_PROTOCOL_MODE_CHARACTERISTIC) if not protocol_mode_characteristics: print(color('!!! No Protocol Mode characteristic', 'red')) return protocol_mode_characteristic = protocol_mode_characteristics[0] hid_information_characteristics = peer.get_characteristics_by_uuid(GATT_HID_INFORMATION_CHARACTERISTIC) if not hid_information_characteristics: print(color('!!! No HID Information characteristic', 'red')) return hid_information_characteristic = hid_information_characteristics[0] report_map_characteristics = peer.get_characteristics_by_uuid(GATT_REPORT_MAP_CHARACTERISTIC) if not report_map_characteristics: print(color('!!! No Report Map characteristic', 'red')) return report_map_characteristic = report_map_characteristics[0] control_point_characteristics = peer.get_characteristics_by_uuid(GATT_HID_CONTROL_POINT_CHARACTERISTIC) if not control_point_characteristics: print(color('!!! No Control Point characteristic', 'red')) return # control_point_characteristic = control_point_characteristics[0] report_characteristics = peer.get_characteristics_by_uuid(GATT_REPORT_CHARACTERISTIC) if not report_characteristics: print(color('!!! No Report characteristic', 'red')) return for i, characteristic in enumerate(report_characteristics): print(color('REPORT:', 'yellow'), characteristic) if characteristic.properties & Characteristic.NOTIFY: await peer.discover_descriptors(characteristic) report_reference_descriptor = characteristic.get_descriptor(GATT_REPORT_REFERENCE_DESCRIPTOR) if report_reference_descriptor: report_reference = await peer.read_value(report_reference_descriptor) print(color(' Report Reference:', 'blue'), report_reference.hex()) else: report_reference = bytes([0, 0]) await peer.subscribe(characteristic, lambda value, param=f'[{i}] {report_reference.hex()}': on_report(param, value)) protocol_mode = await peer.read_value(protocol_mode_characteristic) print(f'Protocol Mode: {protocol_mode.hex()}') hid_information = await peer.read_value(hid_information_characteristic) print(f'HID Information: {hid_information.hex()}') report_map = await peer.read_value(report_map_characteristic) print(f'Report Map: {report_map.hex()}') await asyncio.get_running_loop().create_future() # ----------------------------------------------------------------------------- async def keyboard_device(device, command): # Create an 'input report' characteristic to send keyboard reports to the host input_report_characteristic = Characteristic( GATT_REPORT_CHARACTERISTIC, Characteristic.READ | Characteristic.WRITE | Characteristic.NOTIFY, Characteristic.READABLE | Characteristic.WRITEABLE, bytes([0, 0, 0, 0, 0, 0, 0, 0]), [ Descriptor(GATT_REPORT_REFERENCE_DESCRIPTOR, Descriptor.READABLE, bytes([0x01, HID_INPUT_REPORT])) ] ) # Create an 'output report' characteristic to receive keyboard reports from the host output_report_characteristic = Characteristic( GATT_REPORT_CHARACTERISTIC, Characteristic.READ | Characteristic.WRITE | Characteristic.WRITE_WITHOUT_RESPONSE, Characteristic.READABLE | Characteristic.WRITEABLE, bytes([0]), [ Descriptor(GATT_REPORT_REFERENCE_DESCRIPTOR, Descriptor.READABLE, bytes([0x01, HID_OUTPUT_REPORT])) ] ) # Add the services to the GATT sever device.add_services([ Service( GATT_DEVICE_INFORMATION_SERVICE, [ Characteristic( GATT_MANUFACTURER_NAME_STRING_CHARACTERISTIC, Characteristic.READ, Characteristic.READABLE, 'Bumble' ) ] ), Service( GATT_DEVICE_HUMAN_INTERFACE_DEVICE_SERVICE, [ Characteristic( GATT_PROTOCOL_MODE_CHARACTERISTIC, Characteristic.READ, Characteristic.READABLE, bytes([HID_REPORT_PROTOCOL]) ), Characteristic( GATT_HID_INFORMATION_CHARACTERISTIC, Characteristic.READ, Characteristic.READABLE, bytes([0x11, 0x01, 0x00, 0x03]) # bcdHID=1.1, bCountryCode=0x00, Flags=RemoteWake|NormallyConnectable ), Characteristic( GATT_HID_CONTROL_POINT_CHARACTERISTIC, Characteristic.WRITE_WITHOUT_RESPONSE, Characteristic.WRITEABLE, CharacteristicValue(write=on_hid_control_point_write) ), Characteristic( GATT_REPORT_MAP_CHARACTERISTIC, Characteristic.READ, Characteristic.READABLE, HID_KEYBOARD_REPORT_MAP ), input_report_characteristic, output_report_characteristic ] ), Service( GATT_DEVICE_BATTERY_SERVICE, [ Characteristic( GATT_BATTERY_LEVEL_CHARACTERISTIC, Characteristic.READ, Characteristic.READABLE, bytes([100]) ) ] ) ]) # Debug print for attribute in device.gatt_server.attributes: print(attribute) # Set the advertising data device.advertising_data = bytes( AdvertisingData([ (AdvertisingData.COMPLETE_LOCAL_NAME, bytes('Bumble Keyboard', 'utf-8')), (AdvertisingData.INCOMPLETE_LIST_OF_16_BIT_SERVICE_CLASS_UUIDS, bytes(GATT_DEVICE_HUMAN_INTERFACE_DEVICE_SERVICE)), (AdvertisingData.APPEARANCE, struct.pack('= ord('a') and code <= ord('z'): hid_code = 0x04 + code - ord('a') input_report_characteristic.value = bytes([0, 0, hid_code, 0, 0, 0, 0, 0]) await device.notify_subscribers(input_report_characteristic) elif message_type == 'keyup': input_report_characteristic.value = bytes.fromhex('0000000000000000') await device.notify_subscribers(input_report_characteristic) except websockets.exceptions.ConnectionClosedOK: pass await websockets.serve(serve, 'localhost', 8989) await asyncio.get_event_loop().create_future() else: message = bytes('hello', 'ascii') while True: for letter in message: await asyncio.sleep(3.0) # Keypress for the letter keycode = 0x04 + letter - 0x61 input_report_characteristic.value = bytes([0, 0, keycode, 0, 0, 0, 0, 0]) await device.notify_subscribers(input_report_characteristic) # Key release input_report_characteristic.value = bytes.fromhex('0000000000000000') await device.notify_subscribers(input_report_characteristic) # ----------------------------------------------------------------------------- async def main(): if len(sys.argv) < 4: print('Usage: python keyboard.py ') print(' where is one of:') print(' connect
(run a keyboard host, connecting to a keyboard)') print(' web (run a keyboard with keypress input from a web page, see keyboard.html') print(' sim (run a keyboard simulation, emitting a canned sequence of keystrokes') print('example: python keyboard.py keyboard.json usb:0 sim') print('example: python keyboard.py keyboard.json usb:0 connect A0:A1:A2:A3:A4:A5') return async with await open_transport_or_link(sys.argv[2]) as (hci_source, hci_sink): # Create a device to manage the host device = Device.from_config_file_with_hci(sys.argv[1], hci_source, hci_sink) command = sys.argv[3] if command == 'connect': # Run as a Keyboard host await keyboard_host(device, sys.argv[4]) elif command in {'sim', 'web'}: # Run as a keyboard device await keyboard_device(device, command) # ----------------------------------------------------------------------------- logging.basicConfig(level = os.environ.get('BUMBLE_LOGLEVEL', 'DEBUG').upper()) asyncio.run(main())