226 lines
5.8 KiB
Python
226 lines
5.8 KiB
Python
"""A simple script to backfill tko_task_references table with throttling."""
|
|
|
|
from __future__ import absolute_import
|
|
from __future__ import division
|
|
from __future__ import print_function
|
|
|
|
import argparse
|
|
import collections
|
|
import contextlib
|
|
import logging
|
|
import time
|
|
|
|
import MySQLdb
|
|
|
|
|
|
class BackfillException(Exception):
|
|
pass
|
|
|
|
|
|
def _parse_args():
|
|
parser = argparse.ArgumentParser(
|
|
description=__doc__)
|
|
parser.add_argument('--host', required=True, help='mysql server host')
|
|
parser.add_argument('--user', required=True, help='mysql server user')
|
|
parser.add_argument('--password', required=True, help='mysql server password')
|
|
parser.add_argument('--dryrun', action='store_true', default=False)
|
|
parser.add_argument(
|
|
'--num-iterations',
|
|
default=None,
|
|
type=int,
|
|
help='If set, total number of iterations. Default is no limit.',
|
|
)
|
|
parser.add_argument(
|
|
'--batch-size',
|
|
default=1000,
|
|
help='Number of tko_jobs rows to read in one iteration',
|
|
)
|
|
parser.add_argument(
|
|
'--sleep-seconds',
|
|
type=int,
|
|
default=1,
|
|
help='Time to sleep between iterations',
|
|
)
|
|
|
|
args = parser.parse_args()
|
|
if args.dryrun:
|
|
if not args.num_iterations:
|
|
logging.info('DRYRUN: Limiting to 5 iterations in dryrun mode.')
|
|
args.num_iterations = 5
|
|
return args
|
|
|
|
|
|
|
|
@contextlib.contextmanager
|
|
def _mysql_connection(args):
|
|
conn = MySQLdb.connect(user=args.user, host=args.host, passwd=args.password)
|
|
with _mysql_cursor(conn) as c:
|
|
c.execute('USE chromeos_autotest_db;')
|
|
try:
|
|
yield conn
|
|
finally:
|
|
conn.close()
|
|
|
|
|
|
@contextlib.contextmanager
|
|
def _autocommit(conn):
|
|
try:
|
|
yield conn
|
|
except:
|
|
conn.rollback()
|
|
else:
|
|
conn.commit()
|
|
|
|
|
|
@contextlib.contextmanager
|
|
def _mysql_cursor(conn):
|
|
c = conn.cursor()
|
|
try:
|
|
yield c
|
|
finally:
|
|
c.close()
|
|
|
|
|
|
def _latest_unfilled_job_idx(conn):
|
|
with _mysql_cursor(conn) as c:
|
|
c.execute("""
|
|
SELECT tko_job_idx
|
|
FROM tko_task_references
|
|
ORDER BY tko_job_idx
|
|
LIMIT 1
|
|
;""")
|
|
r = c.fetchall()
|
|
if r:
|
|
return str(long(r[0][0]) - 1)
|
|
logging.debug('tko_task_references is empty.'
|
|
' Grabbing the latest tko_job_idx to fill.')
|
|
with _mysql_cursor(conn) as c:
|
|
c.execute("""
|
|
SELECT job_idx
|
|
FROM tko_jobs
|
|
ORDER BY job_idx DESC
|
|
LIMIT 1
|
|
;""")
|
|
r = c.fetchall()
|
|
if r:
|
|
return r[0][0]
|
|
return None
|
|
|
|
|
|
_TKOTaskReference = collections.namedtuple(
|
|
'_TKOTaskReference',
|
|
['tko_job_idx', 'task_reference', 'parent_task_reference'],
|
|
)
|
|
|
|
_SQL_SELECT_TASK_REFERENCES = """
|
|
SELECT job_idx, afe_job_id, afe_parent_job_id
|
|
FROM tko_jobs
|
|
WHERE job_idx <= %(latest_job_idx)s
|
|
ORDER BY job_idx DESC
|
|
LIMIT %(batch_size)s
|
|
;"""
|
|
_SQL_INSERT_TASK_REFERENCES = """
|
|
INSERT INTO tko_task_references(reference_type, tko_job_idx, task_id, parent_task_id)
|
|
VALUES %(values)s
|
|
;"""
|
|
_SQL_SELECT_TASK_REFERENCE = """
|
|
SELECT tko_job_idx FROM tko_task_references WHERE tko_job_idx = %(tko_job_idx)s
|
|
;"""
|
|
|
|
|
|
def _compute_task_references(conn, latest_job_idx, batch_size):
|
|
with _mysql_cursor(conn) as c:
|
|
sql = _SQL_SELECT_TASK_REFERENCES % {
|
|
'latest_job_idx': latest_job_idx,
|
|
'batch_size': batch_size,
|
|
}
|
|
c.execute(sql)
|
|
rs = c.fetchall()
|
|
if rs is None:
|
|
return []
|
|
|
|
return [_TKOTaskReference(r[0], r[1], r[2]) for r in rs]
|
|
|
|
|
|
def _insert_task_references(conn, task_references, dryrun):
|
|
values = ', '.join([
|
|
'("afe", %s, "%s", "%s")' %
|
|
(tr.tko_job_idx, tr.task_reference, tr.parent_task_reference)
|
|
for tr in task_references
|
|
])
|
|
sql = _SQL_INSERT_TASK_REFERENCES % {'values': values}
|
|
if dryrun:
|
|
if len(sql) < 200:
|
|
sql_log = sql
|
|
else:
|
|
sql_log = '%s... [SNIP] ...%s' % (sql[:150], sql[-49:])
|
|
logging.debug('Would have run: %s', sql_log)
|
|
with _autocommit(conn) as conn:
|
|
with _mysql_cursor(conn) as c:
|
|
c.execute(sql)
|
|
|
|
|
|
def _verify_task_references(conn, task_references):
|
|
# Just verify that the last one was inserted.
|
|
if not task_references:
|
|
return
|
|
tko_job_idx = task_references[-1].tko_job_idx
|
|
sql = _SQL_SELECT_TASK_REFERENCE % {'tko_job_idx': tko_job_idx}
|
|
with _mysql_cursor(conn) as c:
|
|
c.execute(sql)
|
|
r = c.fetchall()
|
|
if not r or r[0][0] != tko_job_idx:
|
|
raise BackfillException(
|
|
'Failed to insert task reference for tko_job_id %s' % tko_job_idx)
|
|
|
|
|
|
def _next_job_idx(task_references):
|
|
return str(long(task_references[-1].tko_job_idx) - 1)
|
|
|
|
def main():
|
|
logging.basicConfig(level=logging.DEBUG)
|
|
args = _parse_args()
|
|
with _mysql_connection(args) as conn:
|
|
tko_job_idx = _latest_unfilled_job_idx(conn)
|
|
if tko_job_idx is None:
|
|
raise BackfillException('Failed to get last unfilled tko_job_idx')
|
|
logging.info('First tko_job_idx to fill: %s', tko_job_idx)
|
|
|
|
while True:
|
|
logging.info('####################################')
|
|
logging.info('Start backfilling from tko_job_idx: %s', tko_job_idx)
|
|
|
|
task_references = ()
|
|
with _mysql_connection(args) as conn:
|
|
task_references = _compute_task_references(
|
|
conn, tko_job_idx, args.batch_size)
|
|
if not task_references:
|
|
logging.info('No more unfilled task references. All done!')
|
|
break
|
|
|
|
logging.info(
|
|
'Inserting %d task references. tko_job_ids: %d...%d',
|
|
len(task_references),
|
|
task_references[0].tko_job_idx,
|
|
task_references[-1].tko_job_idx,
|
|
)
|
|
with _mysql_connection(args) as conn:
|
|
_insert_task_references(conn, task_references, args.dryrun)
|
|
if not args.dryrun:
|
|
with _mysql_connection(args) as conn:
|
|
_verify_task_references(conn, task_references)
|
|
|
|
tko_job_idx = _next_job_idx(task_references)
|
|
|
|
if args.num_iterations is not None:
|
|
args.num_iterations -= 1
|
|
if args.num_iterations <= 0:
|
|
break
|
|
logging.info('%d more iterations left', args.num_iterations)
|
|
logging.info('Iteration done. Sleeping for %d seconds', args.sleep_seconds)
|
|
time.sleep(args.sleep_seconds)
|
|
|
|
|
|
if __name__ == '__main__':
|
|
main()
|