238 lines
6.6 KiB
Python
238 lines
6.6 KiB
Python
#
|
|
# Copyright 2015 The Android Open Source Project
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
|
# you may not use this file except in compliance with the License.
|
|
# You may obtain a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
# See the License for the specific language governing permissions and
|
|
# limitations under the License.
|
|
#
|
|
"""Script for sending data to a port.
|
|
|
|
This script provides a simple shell interface for sending data at run-time to a
|
|
port.
|
|
|
|
Usage:
|
|
1. Choose a port to use. Use 'adb forward tcp:<port>
|
|
tcp:<port>' to forward the port to the device.
|
|
2. In a separate shell, build and push the test vendor library to the device
|
|
using the script mentioned in option A (i.e. without the --test-channel flag
|
|
set).
|
|
3. Once logcat has started, turn Bluetooth on from the device.
|
|
4. Run this program, in the shell from step 1, the port, also from step 1,
|
|
as arguments.
|
|
|
|
scapy is the tool we use to build packets in Python.
|
|
|
|
>>> d = HCI_Hdr(type=1) / HCI_Command_Hdr(opcode = 0x1004) /
|
|
Raw(load='\x01')
|
|
>>> print(d)
|
|
<HCI_Hdr type=Command |<HCI_Command_Hdr opcode=0x1004 |<Raw load='\x01'
|
|
|>>>
|
|
>>> raw(d)
|
|
'\x01\x04\x10\x01\x01'
|
|
>>> hexdump(d)
|
|
0000 0104100101 .....
|
|
|
|
|
|
>>> pkt = HCI_Hdr('\x02\x02\x20\x0a\x00\x06\x00\x01\x00') /
|
|
L2CAP_CmdHdr(code=10, id=2, len=2) /L2CAP_InfoReq(type=2)
|
|
>>> pkt
|
|
<HCI_Hdr type=ACL Data |<HCI_ACL_Hdr handle=2 PB=0 BC=2 len=10 |<L2CAP_Hdr
|
|
len=6 cid=control |<L2CAP_CmdHdr code=info_req id=2 len=2 |<L2CAP_InfoReq
|
|
type=FEAT_MASK |>>>>>
|
|
>>> pkt = HCI_Hdr(type="ACL Data") / HCI_ACL_Hdr(handle=2, PB=0, BC=2,
|
|
len=10) / L2CAP_Hdr(len=6, cid="control") / L2CAP_CmdHdr(code="info_req",
|
|
id=2, len=2) / L2CAP_InfoReq(type="FEAT_MASK")
|
|
>>> raw(pkt)
|
|
'\x02\x02 \n\x00\x06\x00\x01\x00\n\x02\x02\x00\x02\x00'
|
|
>>> hexdump(pkt)
|
|
0000 0202200A00060001000A0202000200 .. ............
|
|
|
|
|
|
"""
|
|
|
|
#!/usr/bin/env python3
|
|
|
|
import binascii
|
|
import cmd
|
|
import random
|
|
import socket
|
|
import string
|
|
import struct
|
|
import sys
|
|
from scapy.all import *
|
|
""" Add some more SCAPY stuff"""
|
|
|
|
|
|
class HCI_Cmd_Connect(Packet):
|
|
name = "Connect"
|
|
fields_desc = [
|
|
ByteEnumField("filter", 0, {0: "address"}),
|
|
LEShortField("packet_type", 8),
|
|
ByteEnumField("page_scan_repetition_mode", 0, {
|
|
0: "R0",
|
|
1: "R1",
|
|
2: "R2"
|
|
}),
|
|
ByteEnumField("page_scan_repetition_mode", 0, {0: "Reserved"}),
|
|
LEShortField("clock_offset", 0),
|
|
ByteEnumField("allow_role_switch", 0, {
|
|
0: "false",
|
|
1: "true"
|
|
}),
|
|
]
|
|
|
|
|
|
class HCI_Cmd_Inquiry(Packet):
|
|
name = "Inquiry"
|
|
fields_desc = [
|
|
XByteField("LAP0", 0),
|
|
XByteField("LAP1", 0x8B),
|
|
XByteField("LAP2", 0x9E),
|
|
ByteField("length", 1),
|
|
ByteField("max_responses", 0),
|
|
]
|
|
|
|
|
|
""" END SCAPY stuff"""
|
|
|
|
|
|
class Connection(object):
|
|
"""Simple wrapper class for a socket object.
|
|
|
|
Attributes:
|
|
socket: The underlying socket created for the specified address and port.
|
|
"""
|
|
|
|
def __init__(self, port):
|
|
self._socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
|
self._socket.connect(("localhost", port))
|
|
|
|
def close(self):
|
|
self._socket.close()
|
|
|
|
def send(self, data):
|
|
self._socket.sendall(data)
|
|
|
|
def receive(self, size):
|
|
return self._socket.recv(size)
|
|
|
|
|
|
class RawPort(object):
|
|
"""Converts outgoing packets to binary and sends them.
|
|
|
|
Attributes:
|
|
connection: The connection to the HCI port.
|
|
"""
|
|
|
|
def __init__(self, port):
|
|
self._connection = Connection(port)
|
|
|
|
def close(self):
|
|
self._connection.close()
|
|
|
|
def send_binary(self, args):
|
|
joined_args = "".join(arg for arg in args)
|
|
print(joined_args)
|
|
packet = binascii.a2b_hex(joined_args)
|
|
self._connection.send(packet)
|
|
|
|
def receive_response(self):
|
|
ready_to_read, ready_to_write, in_error = \
|
|
select(
|
|
[ self._connection._socket ],
|
|
[ ],
|
|
[ self._connection._socket ],
|
|
1.5)
|
|
if len(in_error) > 0:
|
|
print("Error")
|
|
return False
|
|
if len(ready_to_read) > 0:
|
|
print("Ready to Read")
|
|
type_str = self._connection.receive(512)
|
|
print(len(type_str))
|
|
print(type_str)
|
|
return type_str
|
|
print("Returning false at the end")
|
|
return False
|
|
|
|
|
|
class RawPortShell(cmd.Cmd):
|
|
"""Shell for sending binary data to a port.
|
|
|
|
"""
|
|
|
|
def __init__(self, raw_port):
|
|
cmd.Cmd.__init__(self)
|
|
self._raw_port = raw_port
|
|
|
|
def do_send(self, args):
|
|
"""Arguments: dev_type_str Add a new device of type dev_type_str.
|
|
|
|
"""
|
|
self._raw_port.send_binary(args.split())
|
|
|
|
def do_scan(self, args):
|
|
"""Arguments: timeout (seconds) Print the scan responses from reachable devices
|
|
|
|
"""
|
|
self._raw_port.send_binary(args.split())
|
|
|
|
def do_quit(self, args):
|
|
"""Arguments: None.
|
|
|
|
Exits.
|
|
"""
|
|
self._raw_port.close()
|
|
print("Goodbye.")
|
|
return True
|
|
|
|
def do_help(self, args):
|
|
"""Arguments: [dev_num [attr]] List the commands available, optionally filtered by device and attr.
|
|
|
|
"""
|
|
if (len(args) == 0):
|
|
cmd.Cmd.do_help(self, args)
|
|
|
|
def postcmd(self, stop, line):
|
|
"""Called after each command stop : whether we will stop after this command line : the previous input line Return True to stop, False to continue
|
|
|
|
"""
|
|
if stop:
|
|
return True
|
|
response = self._raw_port.receive_response()
|
|
print(response)
|
|
return False
|
|
|
|
|
|
def main(argv):
|
|
if len(argv) != 2:
|
|
print("Usage: python raw_port.py [port]")
|
|
return
|
|
try:
|
|
port = int(argv[1])
|
|
except ValueError:
|
|
print("Error parsing port.")
|
|
else:
|
|
try:
|
|
raw_port = RawPort(port)
|
|
except (socket.error, e):
|
|
print("Error connecting to socket: %s" % e)
|
|
except:
|
|
print("Error creating (check arguments).")
|
|
else:
|
|
raw_port_shell = RawPortShell(raw_port)
|
|
raw_port_shell.prompt = "$ "
|
|
raw_port_shell.cmdloop("Welcome to the RootCanal Console \n" + 'Type \'help\' for more information.')
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main(sys.argv)
|