121 lines
2.3 KiB
Go
121 lines
2.3 KiB
Go
package sql
|
|
|
|
import (
|
|
"database/sql"
|
|
"fmt"
|
|
"runtime"
|
|
"sync"
|
|
|
|
"github.com/GoogleCloudPlatform/cloudsql-proxy/proxy/dialers/mysql"
|
|
"github.com/pkg/errors"
|
|
|
|
"repodiff/constants"
|
|
)
|
|
|
|
var mux sync.Mutex
|
|
var db *sql.DB
|
|
|
|
type handleRowFn func(*sql.Rows)
|
|
|
|
func newDBConnectionPool() (*sql.DB, error) {
|
|
cfg := mysql.Cfg(
|
|
constants.GetConfigVar("GCP_DB_INSTANCE_CONNECTION_NAME"),
|
|
constants.GetConfigVar("GCP_DB_USER"),
|
|
constants.GetConfigVar("GCP_DB_PASSWORD"),
|
|
)
|
|
cfg.DBName = constants.GetConfigVar("GCP_DB_NAME")
|
|
return mysql.DialCfg(cfg)
|
|
}
|
|
|
|
func maxParallelism() int {
|
|
maxProcs := runtime.GOMAXPROCS(0)
|
|
numCPU := runtime.NumCPU()
|
|
if maxProcs < numCPU {
|
|
return maxProcs
|
|
}
|
|
return numCPU
|
|
}
|
|
|
|
func GetDBConnectionPool() (*sql.DB, error) {
|
|
if db != nil {
|
|
return db, nil
|
|
}
|
|
mux.Lock()
|
|
defer mux.Unlock()
|
|
|
|
// check, lock, check; redundant check for thread safety
|
|
if db != nil {
|
|
return db, nil
|
|
}
|
|
var err error
|
|
db, err = newDBConnectionPool()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
connections := maxParallelism()
|
|
|
|
// unless explicitly specified, the default connection pool size is unlimited
|
|
db.SetMaxOpenConns(connections)
|
|
|
|
// unless explicitly specified, the default is 0 where idle connections are immediately closed
|
|
db.SetMaxIdleConns(connections)
|
|
return db, nil
|
|
}
|
|
|
|
func SingleTransactionInsert(db *sql.DB, insertQuery string, rowsOfCols [][]interface{}) error {
|
|
tx, err := db.Begin()
|
|
if err != nil {
|
|
return errors.Wrap(err, "Error starting transaction")
|
|
}
|
|
stmt, err := tx.Prepare(insertQuery)
|
|
if err != nil {
|
|
return errors.Wrap(err, "Error preparing statement")
|
|
}
|
|
defer stmt.Close()
|
|
|
|
for _, cols := range rowsOfCols {
|
|
_, err = stmt.Exec(
|
|
cols...,
|
|
)
|
|
if err != nil {
|
|
tx.Rollback()
|
|
return errors.Wrap(err, "Error inserting values")
|
|
}
|
|
}
|
|
err = tx.Commit()
|
|
if err != nil {
|
|
tx.Rollback()
|
|
return errors.Wrap(
|
|
err,
|
|
"Error committing transaction",
|
|
)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func Select(db *sql.DB, rowHandler handleRowFn, query string, args ...interface{}) error {
|
|
rows, err := db.Query(
|
|
query,
|
|
args...,
|
|
)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer rows.Close()
|
|
|
|
for rows.Next() {
|
|
rowHandler(rows)
|
|
}
|
|
if err = rows.Err(); err != nil {
|
|
return err
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func TruncateTable(db *sql.DB, tableName string) error {
|
|
_, err := db.Exec(
|
|
fmt.Sprintf("TRUNCATE TABLE %s", tableName),
|
|
)
|
|
return err
|
|
}
|