forked from signal18/replication-manager
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathproxysql.go
142 lines (123 loc) · 4.71 KB
/
proxysql.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
package proxysql
import (
"fmt"
"time"
"github.com/go-sql-driver/mysql"
"github.com/jmoiron/sqlx"
)
type ProxySQL struct {
Connection *sqlx.DB
User string
Password string
Port string
Host string
WriterHG string
ReaderHG string
}
func (psql *ProxySQL) Connect() error {
ProxysqlConfig := mysql.Config{
User: psql.User,
Passwd: psql.Password,
Net: "tcp",
Addr: fmt.Sprintf("%s:%s", psql.Host, psql.Port),
Timeout: time.Second * 5,
ReadTimeout: time.Second * 15,
}
var err error
psql.Connection, err = sqlx.Connect("mysql", ProxysqlConfig.FormatDSN())
if err != nil {
return fmt.Errorf("Could not connect to ProxySQL (%s)", err)
}
return nil
}
func (psql *ProxySQL) AddServer(host string, port string) error {
sql := fmt.Sprintf("INSERT INTO mysql_servers (hostname, port) VALUES('%s','%s')", host, port)
_, err := psql.Connection.Exec(sql)
return err
}
func (psql *ProxySQL) AddOfflineServer(host string, port string) error {
sql := fmt.Sprintf("INSERT INTO mysql_servers (hostgroup_id, hostname, port) VALUES('666', '%s','%s')", host, port)
_, err := psql.Connection.Exec(sql)
return err
}
func (psql *ProxySQL) SetOffline(host string, port string) error {
sql := fmt.Sprintf("UPDATE mysql_servers SET hostgroup_id='666' WHERE hostname='%s' AND port='%s'", host, port)
_, err := psql.Connection.Exec(sql)
return err
}
func (psql *ProxySQL) SetOfflineSoft(host string, port string) error {
sql := fmt.Sprintf("UPDATE mysql_servers SET status='OFFLINE_SOFT', hostgroup_id='%s' WHERE hostname='%s' AND port='%s'", psql.ReaderHG, host, port)
_, err := psql.Connection.Exec(sql)
return err
}
func (psql *ProxySQL) SetOnline(host string, port string) error {
sql := fmt.Sprintf("UPDATE mysql_servers SET status='ONLINE' WHERE hostname='%s' AND port='%s'", host, port)
_, err := psql.Connection.Exec(sql)
return err
}
func (psql *ProxySQL) SetWriter(host string, port string) error {
sql := fmt.Sprintf("UPDATE mysql_servers SET status='ONLINE', hostgroup_id='%s' WHERE hostname='%s' AND port='%s'", psql.WriterHG, host, port)
_, err := psql.Connection.Exec(sql)
return err
}
func (psql *ProxySQL) SetReader(host string, port string) error {
sql := fmt.Sprintf("UPDATE mysql_servers SET status='ONLINE', hostgroup_id='%s' WHERE hostname='%s' AND port='%s'", psql.ReaderHG, host, port)
_, err := psql.Connection.Exec(sql)
return err
}
func (psql *ProxySQL) LoadServersToRuntime() error {
_, err := psql.Connection.Exec("LOAD MYSQL SERVERS TO RUNTIME")
return err
}
func (psql *ProxySQL) GetStatsForHostRead(host string, port string) (string, string, int, int, int, int, error) {
var (
hostgroup string
status string
connused int
byteout int
bytein int
latency int
)
sql := fmt.Sprintf("SELECT hostgroup, status, ConnUsed, Bytes_data_sent , Bytes_data_recv , Latency_us FROM stats.stats_mysql_connection_pool INNER JOIN mysql_replication_hostgroups ON mysql_replication_hostgroups.reader_hostgroup=hostgroup WHERE srv_host='%s' AND srv_port='%s'", host, port)
row := psql.Connection.QueryRow(sql)
err := row.Scan(&hostgroup, &status, &connused, &byteout, &bytein, &latency)
return hostgroup, status, connused, byteout, bytein, latency, err
}
func (psql *ProxySQL) GetStatsForHostWrite(host string, port string) (string, string, int, int, int, int, error) {
var (
hostgroup string
status string
connused int
byteout int
bytein int
latency int
)
sql := fmt.Sprintf("SELECT hostgroup, status, ConnUsed, Bytes_data_sent , Bytes_data_recv , Latency_us FROM stats.stats_mysql_connection_pool INNER JOIN mysql_replication_hostgroups ON mysql_replication_hostgroups.writer_hostgroup=hostgroup WHERE srv_host='%s' AND srv_port='%s'", host, port)
row := psql.Connection.QueryRow(sql)
err := row.Scan(&hostgroup, &status, &connused, &byteout, &bytein, &latency)
return hostgroup, status, connused, byteout, bytein, latency, err
}
func (psql *ProxySQL) GetVersion() string {
var version string
sql := "SELECT @@admin-version"
row := psql.Connection.QueryRow(sql)
row.Scan(&version)
return version
}
func (psql *ProxySQL) GetHostsRuntime() (string, error) {
var h string
err := psql.Connection.Get(&h, "SELECT GROUP_CONCAT(host) AS hostlist FROM (SELECT hostname || ':' || port AS host FROM runtime_mysql_servers)")
return h, err
}
func (psql *ProxySQL) Truncate() error {
_, err := psql.Connection.Exec("DELETE FROM mysql_servers")
return err
}
func (psql *ProxySQL) AddUser(User string, Password string) error {
_, err := psql.Connection.Exec("REPLACE INTO mysql_users(username,password) VALUES('" + User + "','" + Password + "')")
if err != nil {
return err
}
_, err = psql.Connection.Exec("LOAD MYSQL USERS TO RUNTIME")
return err
}