360 lines
14 KiB
Python
360 lines
14 KiB
Python
# Copyright 2021-2022 Google LLC
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
|
# you may not use this file except in compliance with the License.
|
|
# You may obtain a copy of the License at
|
|
#
|
|
# https://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
# See the License for the specific language governing permissions and
|
|
# limitations under the License.
|
|
|
|
# -----------------------------------------------------------------------------
|
|
# Imports
|
|
# -----------------------------------------------------------------------------
|
|
import asyncio
|
|
import sys
|
|
import os
|
|
import logging
|
|
import struct
|
|
import websockets
|
|
import json
|
|
from colors import color
|
|
|
|
from bumble.core import AdvertisingData
|
|
from bumble.device import Device, Connection, Peer
|
|
from bumble.utils import AsyncRunner
|
|
from bumble.transport import open_transport_or_link
|
|
from bumble.gatt import (
|
|
Descriptor,
|
|
Service,
|
|
Characteristic,
|
|
CharacteristicValue,
|
|
GATT_DEVICE_INFORMATION_SERVICE,
|
|
GATT_DEVICE_HUMAN_INTERFACE_DEVICE_SERVICE,
|
|
GATT_DEVICE_BATTERY_SERVICE,
|
|
GATT_BATTERY_LEVEL_CHARACTERISTIC,
|
|
GATT_MANUFACTURER_NAME_STRING_CHARACTERISTIC,
|
|
GATT_REPORT_CHARACTERISTIC,
|
|
GATT_REPORT_MAP_CHARACTERISTIC,
|
|
GATT_PROTOCOL_MODE_CHARACTERISTIC,
|
|
GATT_HID_INFORMATION_CHARACTERISTIC,
|
|
GATT_HID_CONTROL_POINT_CHARACTERISTIC,
|
|
GATT_REPORT_REFERENCE_DESCRIPTOR
|
|
)
|
|
|
|
# -----------------------------------------------------------------------------
|
|
|
|
# Protocol Modes
|
|
HID_BOOT_PROTOCOL = 0x00
|
|
HID_REPORT_PROTOCOL = 0x01
|
|
|
|
# Report Types
|
|
HID_INPUT_REPORT = 0x01
|
|
HID_OUTPUT_REPORT = 0x02
|
|
HID_FEATURE_REPORT = 0x03
|
|
|
|
# Report Map
|
|
HID_KEYBOARD_REPORT_MAP = bytes([
|
|
0x05, 0x01, # Usage Page (Generic Desktop Ctrls)
|
|
0x09, 0x06, # Usage (Keyboard)
|
|
0xA1, 0x01, # Collection (Application)
|
|
0x85, 0x01, # . Report ID (1)
|
|
0x05, 0x07, # . Usage Page (Kbrd/Keypad)
|
|
0x19, 0xE0, # . Usage Minimum (0xE0)
|
|
0x29, 0xE7, # . Usage Maximum (0xE7)
|
|
0x15, 0x00, # . Logical Minimum (0)
|
|
0x25, 0x01, # . Logical Maximum (1)
|
|
0x75, 0x01, # . Report Size (1)
|
|
0x95, 0x08, # . Report Count (8)
|
|
0x81, 0x02, # . Input (Data,Var,Abs,No Wrap,Linear,Preferred State,No Null Position)
|
|
0x95, 0x01, # . Report Count (1)
|
|
0x75, 0x08, # . Report Size (8)
|
|
0x81, 0x01, # . Input (Const,Array,Abs,No Wrap,Linear,Preferred State,No Null Position)
|
|
0x95, 0x06, # . Report Count (6)
|
|
0x75, 0x08, # . Report Size (8)
|
|
0x15, 0x00, # . Logical Minimum (0x00)
|
|
0x25, 0x94, # . Logical Maximum (0x94)
|
|
0x05, 0x07, # . Usage Page (Kbrd/Keypad)
|
|
0x19, 0x00, # . Usage Minimum (0x00)
|
|
0x29, 0x94, # . Usage Maximum (0x94)
|
|
0x81, 0x00, # . Input (Data,Array,Abs,No Wrap,Linear,Preferred State,No Null Position)
|
|
0x95, 0x05, # . Report Count (5)
|
|
0x75, 0x01, # . Report Size (1)
|
|
0x05, 0x08, # . Usage Page (LEDs)
|
|
0x19, 0x01, # . Usage Minimum (Num Lock)
|
|
0x29, 0x05, # . Usage Maximum (Kana)
|
|
0x91, 0x02, # . Output (Data,Var,Abs,No Wrap,Linear,Preferred State,No Null Position,Non-volatile)
|
|
0x95, 0x01, # . Report Count (1)
|
|
0x75, 0x03, # . Report Size (3)
|
|
0x91, 0x01, # . Output (Const,Array,Abs,No Wrap,Linear,Preferred State,No Null Position,Non-volatile)
|
|
0xC0 # End Collection
|
|
])
|
|
|
|
|
|
# -----------------------------------------------------------------------------
|
|
class ServerListener(Device.Listener, Connection.Listener):
|
|
def __init__(self, device):
|
|
self.device = device
|
|
|
|
@AsyncRunner.run_in_task()
|
|
async def on_connection(self, connection):
|
|
print(f'=== Connected to {connection}')
|
|
connection.listener = self
|
|
|
|
@AsyncRunner.run_in_task()
|
|
async def on_disconnection(self, reason):
|
|
print(f'### Disconnected, reason={reason}')
|
|
|
|
|
|
# -----------------------------------------------------------------------------
|
|
def on_hid_control_point_write(connection, value):
|
|
print(f'Control Point Write: {value}')
|
|
|
|
|
|
# -----------------------------------------------------------------------------
|
|
def on_report(characteristic, value):
|
|
print(color('Report:', 'cyan'), value.hex(), 'from', characteristic)
|
|
|
|
|
|
# -----------------------------------------------------------------------------
|
|
async def keyboard_host(device, peer_address):
|
|
await device.power_on()
|
|
connection = await device.connect(peer_address)
|
|
await connection.pair()
|
|
peer = Peer(connection)
|
|
await peer.discover_service(GATT_DEVICE_HUMAN_INTERFACE_DEVICE_SERVICE)
|
|
hid_services = peer.get_services_by_uuid(GATT_DEVICE_HUMAN_INTERFACE_DEVICE_SERVICE)
|
|
if not hid_services:
|
|
print(color('!!! No HID service', 'red'))
|
|
return
|
|
await peer.discover_characteristics()
|
|
|
|
protocol_mode_characteristics = peer.get_characteristics_by_uuid(GATT_PROTOCOL_MODE_CHARACTERISTIC)
|
|
if not protocol_mode_characteristics:
|
|
print(color('!!! No Protocol Mode characteristic', 'red'))
|
|
return
|
|
protocol_mode_characteristic = protocol_mode_characteristics[0]
|
|
|
|
hid_information_characteristics = peer.get_characteristics_by_uuid(GATT_HID_INFORMATION_CHARACTERISTIC)
|
|
if not hid_information_characteristics:
|
|
print(color('!!! No HID Information characteristic', 'red'))
|
|
return
|
|
hid_information_characteristic = hid_information_characteristics[0]
|
|
|
|
report_map_characteristics = peer.get_characteristics_by_uuid(GATT_REPORT_MAP_CHARACTERISTIC)
|
|
if not report_map_characteristics:
|
|
print(color('!!! No Report Map characteristic', 'red'))
|
|
return
|
|
report_map_characteristic = report_map_characteristics[0]
|
|
|
|
control_point_characteristics = peer.get_characteristics_by_uuid(GATT_HID_CONTROL_POINT_CHARACTERISTIC)
|
|
if not control_point_characteristics:
|
|
print(color('!!! No Control Point characteristic', 'red'))
|
|
return
|
|
# control_point_characteristic = control_point_characteristics[0]
|
|
|
|
report_characteristics = peer.get_characteristics_by_uuid(GATT_REPORT_CHARACTERISTIC)
|
|
if not report_characteristics:
|
|
print(color('!!! No Report characteristic', 'red'))
|
|
return
|
|
for i, characteristic in enumerate(report_characteristics):
|
|
print(color('REPORT:', 'yellow'), characteristic)
|
|
if characteristic.properties & Characteristic.NOTIFY:
|
|
await peer.discover_descriptors(characteristic)
|
|
report_reference_descriptor = characteristic.get_descriptor(GATT_REPORT_REFERENCE_DESCRIPTOR)
|
|
if report_reference_descriptor:
|
|
report_reference = await peer.read_value(report_reference_descriptor)
|
|
print(color(' Report Reference:', 'blue'), report_reference.hex())
|
|
else:
|
|
report_reference = bytes([0, 0])
|
|
await peer.subscribe(characteristic, lambda value, param=f'[{i}] {report_reference.hex()}': on_report(param, value))
|
|
|
|
protocol_mode = await peer.read_value(protocol_mode_characteristic)
|
|
print(f'Protocol Mode: {protocol_mode.hex()}')
|
|
hid_information = await peer.read_value(hid_information_characteristic)
|
|
print(f'HID Information: {hid_information.hex()}')
|
|
report_map = await peer.read_value(report_map_characteristic)
|
|
print(f'Report Map: {report_map.hex()}')
|
|
|
|
await asyncio.get_running_loop().create_future()
|
|
|
|
|
|
# -----------------------------------------------------------------------------
|
|
async def keyboard_device(device, command):
|
|
# Create an 'input report' characteristic to send keyboard reports to the host
|
|
input_report_characteristic = Characteristic(
|
|
GATT_REPORT_CHARACTERISTIC,
|
|
Characteristic.READ | Characteristic.WRITE | Characteristic.NOTIFY,
|
|
Characteristic.READABLE | Characteristic.WRITEABLE,
|
|
bytes([0, 0, 0, 0, 0, 0, 0, 0]),
|
|
[
|
|
Descriptor(GATT_REPORT_REFERENCE_DESCRIPTOR, Descriptor.READABLE, bytes([0x01, HID_INPUT_REPORT]))
|
|
]
|
|
)
|
|
|
|
# Create an 'output report' characteristic to receive keyboard reports from the host
|
|
output_report_characteristic = Characteristic(
|
|
GATT_REPORT_CHARACTERISTIC,
|
|
Characteristic.READ | Characteristic.WRITE | Characteristic.WRITE_WITHOUT_RESPONSE,
|
|
Characteristic.READABLE | Characteristic.WRITEABLE,
|
|
bytes([0]),
|
|
[
|
|
Descriptor(GATT_REPORT_REFERENCE_DESCRIPTOR, Descriptor.READABLE, bytes([0x01, HID_OUTPUT_REPORT]))
|
|
]
|
|
)
|
|
|
|
# Add the services to the GATT sever
|
|
device.add_services([
|
|
Service(
|
|
GATT_DEVICE_INFORMATION_SERVICE,
|
|
[
|
|
Characteristic(
|
|
GATT_MANUFACTURER_NAME_STRING_CHARACTERISTIC,
|
|
Characteristic.READ,
|
|
Characteristic.READABLE,
|
|
'Bumble'
|
|
)
|
|
]
|
|
),
|
|
Service(
|
|
GATT_DEVICE_HUMAN_INTERFACE_DEVICE_SERVICE,
|
|
[
|
|
Characteristic(
|
|
GATT_PROTOCOL_MODE_CHARACTERISTIC,
|
|
Characteristic.READ,
|
|
Characteristic.READABLE,
|
|
bytes([HID_REPORT_PROTOCOL])
|
|
),
|
|
Characteristic(
|
|
GATT_HID_INFORMATION_CHARACTERISTIC,
|
|
Characteristic.READ,
|
|
Characteristic.READABLE,
|
|
bytes([0x11, 0x01, 0x00, 0x03]) # bcdHID=1.1, bCountryCode=0x00, Flags=RemoteWake|NormallyConnectable
|
|
),
|
|
Characteristic(
|
|
GATT_HID_CONTROL_POINT_CHARACTERISTIC,
|
|
Characteristic.WRITE_WITHOUT_RESPONSE,
|
|
Characteristic.WRITEABLE,
|
|
CharacteristicValue(write=on_hid_control_point_write)
|
|
),
|
|
Characteristic(
|
|
GATT_REPORT_MAP_CHARACTERISTIC,
|
|
Characteristic.READ,
|
|
Characteristic.READABLE,
|
|
HID_KEYBOARD_REPORT_MAP
|
|
),
|
|
input_report_characteristic,
|
|
output_report_characteristic
|
|
]
|
|
),
|
|
Service(
|
|
GATT_DEVICE_BATTERY_SERVICE,
|
|
[
|
|
Characteristic(
|
|
GATT_BATTERY_LEVEL_CHARACTERISTIC,
|
|
Characteristic.READ,
|
|
Characteristic.READABLE,
|
|
bytes([100])
|
|
)
|
|
]
|
|
)
|
|
])
|
|
|
|
# Debug print
|
|
for attribute in device.gatt_server.attributes:
|
|
print(attribute)
|
|
|
|
# Set the advertising data
|
|
device.advertising_data = bytes(
|
|
AdvertisingData([
|
|
(AdvertisingData.COMPLETE_LOCAL_NAME, bytes('Bumble Keyboard', 'utf-8')),
|
|
(AdvertisingData.INCOMPLETE_LIST_OF_16_BIT_SERVICE_CLASS_UUIDS,
|
|
bytes(GATT_DEVICE_HUMAN_INTERFACE_DEVICE_SERVICE)),
|
|
(AdvertisingData.APPEARANCE, struct.pack('<H', 0x03C1)),
|
|
(AdvertisingData.FLAGS, bytes([0x05]))
|
|
])
|
|
)
|
|
|
|
# Attach a listener
|
|
device.listener = ServerListener(device)
|
|
|
|
# Go!
|
|
await device.power_on()
|
|
await device.start_advertising(auto_restart=True)
|
|
|
|
if command == 'web':
|
|
# Start a Websocket server to receive events from a web page
|
|
async def serve(websocket, path):
|
|
while True:
|
|
try:
|
|
message = await websocket.recv()
|
|
print('Received: ', str(message))
|
|
|
|
parsed = json.loads(message)
|
|
message_type = parsed['type']
|
|
if message_type == 'keydown':
|
|
# Only deal with keys a to z for now
|
|
key = parsed['key']
|
|
if len(key) == 1:
|
|
code = ord(key)
|
|
if code >= ord('a') and code <= ord('z'):
|
|
hid_code = 0x04 + code - ord('a')
|
|
input_report_characteristic.value = bytes([0, 0, hid_code, 0, 0, 0, 0, 0])
|
|
await device.notify_subscribers(input_report_characteristic)
|
|
elif message_type == 'keyup':
|
|
input_report_characteristic.value = bytes.fromhex('0000000000000000')
|
|
await device.notify_subscribers(input_report_characteristic)
|
|
|
|
except websockets.exceptions.ConnectionClosedOK:
|
|
pass
|
|
await websockets.serve(serve, 'localhost', 8989)
|
|
await asyncio.get_event_loop().create_future()
|
|
else:
|
|
message = bytes('hello', 'ascii')
|
|
while True:
|
|
for letter in message:
|
|
await asyncio.sleep(3.0)
|
|
|
|
# Keypress for the letter
|
|
keycode = 0x04 + letter - 0x61
|
|
input_report_characteristic.value = bytes([0, 0, keycode, 0, 0, 0, 0, 0])
|
|
await device.notify_subscribers(input_report_characteristic)
|
|
|
|
# Key release
|
|
input_report_characteristic.value = bytes.fromhex('0000000000000000')
|
|
await device.notify_subscribers(input_report_characteristic)
|
|
|
|
|
|
# -----------------------------------------------------------------------------
|
|
async def main():
|
|
if len(sys.argv) < 4:
|
|
print('Usage: python keyboard.py <device-config> <transport-spec> <command>')
|
|
print(' where <command> is one of:')
|
|
print(' connect <address> (run a keyboard host, connecting to a keyboard)')
|
|
print(' web (run a keyboard with keypress input from a web page, see keyboard.html')
|
|
print(' sim (run a keyboard simulation, emitting a canned sequence of keystrokes')
|
|
print('example: python keyboard.py keyboard.json usb:0 sim')
|
|
print('example: python keyboard.py keyboard.json usb:0 connect A0:A1:A2:A3:A4:A5')
|
|
return
|
|
|
|
async with await open_transport_or_link(sys.argv[2]) as (hci_source, hci_sink):
|
|
# Create a device to manage the host
|
|
device = Device.from_config_file_with_hci(sys.argv[1], hci_source, hci_sink)
|
|
|
|
command = sys.argv[3]
|
|
if command == 'connect':
|
|
# Run as a Keyboard host
|
|
await keyboard_host(device, sys.argv[4])
|
|
elif command in {'sim', 'web'}:
|
|
# Run as a keyboard device
|
|
await keyboard_device(device, command)
|
|
|
|
|
|
# -----------------------------------------------------------------------------
|
|
logging.basicConfig(level = os.environ.get('BUMBLE_LOGLEVEL', 'DEBUG').upper())
|
|
asyncio.run(main())
|