# Copyright 2021-2022 Google LLC # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # https://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # ----------------------------------------------------------------------------- # Imports # ----------------------------------------------------------------------------- import struct import click from colors import color from bumble import hci from bumble.transport.common import PacketReader from bumble.helpers import PacketTracer # ----------------------------------------------------------------------------- class SnoopPacketReader: ''' Reader that reads HCI packets from a "snoop" file (based on RFC 1761, but not exactly the same...) ''' DATALINK_H1 = 1001 DATALINK_H4 = 1002 DATALINK_BSCP = 1003 DATALINK_H5 = 1004 def __init__(self, source): self.source = source # Read the header identification_pattern = source.read(8) if identification_pattern.hex().lower() != '6274736e6f6f7000': raise ValueError('not a valid snoop file, unexpected identification pattern') (self.version_number, self.data_link_type) = struct.unpack('>II', source.read(8)) if self.data_link_type != self.DATALINK_H4 and self.data_link_type != self.DATALINK_H1: raise ValueError(f'datalink type {self.data_link_type} not supported') def next_packet(self): # Read the record header header = self.source.read(24) if len(header) < 24: return (0, None) ( original_length, included_length, packet_flags, cumulative_drops, timestamp_seconds, timestamp_microsecond ) = struct.unpack('>IIIIII', header) # Abort on truncated packets if original_length != included_length: return (0, None) if self.data_link_type == self.DATALINK_H1: # The packet is un-encapsulated, look at the flags to figure out its type if packet_flags & 1: # Controller -> Host if packet_flags & 2: packet_type = hci.HCI_EVENT_PACKET else: packet_type = hci.HCI_ACL_DATA_PACKET else: # Host -> Controller if packet_flags & 2: packet_type = hci.HCI_COMMAND_PACKET else: packet_type = hci.HCI_ACL_DATA_PACKET return (packet_flags & 1, bytes([packet_type]) + self.source.read(included_length)) else: return (packet_flags & 1, self.source.read(included_length)) # ----------------------------------------------------------------------------- # Main # ----------------------------------------------------------------------------- @click.command() @click.option('--format', type=click.Choice(['h4', 'snoop']), default='h4', help='Format of the input file') @click.argument('filename') def show(format, filename): input = open(filename, 'rb') if format == 'h4': packet_reader = PacketReader(input) def read_next_packet(): (0, packet_reader.next_packet()) else: packet_reader = SnoopPacketReader(input) read_next_packet = packet_reader.next_packet tracer = PacketTracer(emit_message=print) while True: try: (direction, packet) = read_next_packet() if packet is None: break tracer.trace(hci.HCI_Packet.from_bytes(packet), direction) except Exception as error: print(color(f'!!! {error}', 'red')) pass # ----------------------------------------------------------------------------- if __name__ == '__main__': show()