Skip to content

Commit

Permalink
Merge branch 'master' into resharding
Browse files Browse the repository at this point in the history
  • Loading branch information
alainjobart committed Oct 30, 2014
2 parents f0fbfae + 50eeab9 commit b4e353b
Show file tree
Hide file tree
Showing 12 changed files with 132 additions and 7 deletions.
4 changes: 4 additions & 0 deletions go/vt/tabletmanager/actionnode/actionnode.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,10 @@ const (
// the topo server.
TABLET_ACTION_REFRESH_STATE = "RefreshState"

// RunHealthCheck tells the tablet to refresh its tablet record from
// the topo server.
TABLET_ACTION_RUN_HEALTH_CHECK = "RunHealthCheck"

// ReloadSchema tells the tablet to reload its schema.
TABLET_ACTION_RELOAD_SCHEMA = "ReloadSchema"

Expand Down
8 changes: 8 additions & 0 deletions go/vt/tabletmanager/agent_rpc_actions.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,8 @@ type RpcAgent interface {

RefreshState()

RunHealthCheck(targetTabletType topo.TabletType)

ReloadSchema()

PreflightSchema(change string) (*myproto.SchemaChangeResult, error)
Expand Down Expand Up @@ -209,6 +211,12 @@ func (agent *ActionAgent) ExecuteHook(hk *hook.Hook) *hook.HookResult {
func (agent *ActionAgent) RefreshState() {
}

// RunHealthCheck will manually run the health check on the tablet
// Should be called under RpcWrap.
func (agent *ActionAgent) RunHealthCheck(targetTabletType topo.TabletType) {
agent.runHealthCheck(targetTabletType)
}

// ReloadSchema will reload the schema
// Should be called under RpcWrapLockAction.
func (agent *ActionAgent) ReloadSchema() {
Expand Down
15 changes: 15 additions & 0 deletions go/vt/tabletmanager/agentrpctest/test_agent_rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -295,6 +295,19 @@ func agentRpcTestRefreshState(t *testing.T, client tmclient.TabletManagerClient,
}
}

var testRunHealthCheckValue = topo.TYPE_RDONLY

func (fra *fakeRpcAgent) RunHealthCheck(targetTabletType topo.TabletType) {
compare(fra.t, "RunHealthCheck tabletType", targetTabletType, testRunHealthCheckValue)
}

func agentRpcTestRunHealthCheck(t *testing.T, client tmclient.TabletManagerClient, ti *topo.TabletInfo) {
err := client.RunHealthCheck(ti, testRunHealthCheckValue, time.Minute)
if err != nil {
t.Errorf("RunHealthCheck failed: %v", err)
}
}

var testReloadSchemaCalled = false

func (fra *fakeRpcAgent) ReloadSchema() {
Expand Down Expand Up @@ -877,6 +890,8 @@ func AgentRpcTestSuite(t *testing.T, client tmclient.TabletManagerClient, ti *to
agentRpcTestScrap(t, client, ti)
agentRpcTestSleep(t, client, ti)
agentRpcTestExecuteHook(t, client, ti)
agentRpcTestRefreshState(t, client, ti)
agentRpcTestRunHealthCheck(t, client, ti)
agentRpcTestReloadSchema(t, client, ti)
agentRpcTestPreflightSchema(t, client, ti)
agentRpcTestApplySchema(t, client, ti)
Expand Down
5 changes: 5 additions & 0 deletions go/vt/tabletmanager/gorpctmclient/gorpc_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,11 @@ func (client *GoRpcTabletManagerClient) RefreshState(tablet *topo.TabletInfo, wa
return client.rpcCallTablet(tablet, actionnode.TABLET_ACTION_REFRESH_STATE, "", &noOutput, waitTime)
}

func (client *GoRpcTabletManagerClient) RunHealthCheck(tablet *topo.TabletInfo, targetTabletType topo.TabletType, waitTime time.Duration) error {
var noOutput rpc.UnusedResponse
return client.rpcCallTablet(tablet, actionnode.TABLET_ACTION_RUN_HEALTH_CHECK, &targetTabletType, &noOutput, waitTime)
}

func (client *GoRpcTabletManagerClient) ReloadSchema(tablet *topo.TabletInfo, waitTime time.Duration) error {
var noOutput rpc.UnusedResponse
return client.rpcCallTablet(tablet, actionnode.TABLET_ACTION_RELOAD_SCHEMA, "", &noOutput, waitTime)
Expand Down
7 changes: 7 additions & 0 deletions go/vt/tabletmanager/gorpctmserver/gorpc_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,13 @@ func (tm *TabletManager) RefreshState(context *rpcproto.Context, args *rpc.Unuse
})
}

func (tm *TabletManager) RunHealthCheck(context *rpcproto.Context, args *topo.TabletType, reply *rpc.UnusedResponse) error {
return tm.agent.RpcWrap(context, actionnode.TABLET_ACTION_RUN_HEALTH_CHECK, args, reply, func() error {
tm.agent.RunHealthCheck(*args)
return nil
})
}

func (tm *TabletManager) ReloadSchema(context *rpcproto.Context, args *rpc.UnusedRequest, reply *rpc.UnusedResponse) error {
return tm.agent.RpcWrapLockAction(context, actionnode.TABLET_ACTION_RELOAD_SCHEMA, args, reply, true, func() error {
tm.agent.ReloadSchema()
Expand Down
3 changes: 3 additions & 0 deletions go/vt/tabletmanager/tmclient/rpc_client_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,9 @@ type TabletManagerClient interface {
// RefreshState asks the remote tablet to reload its tablet record
RefreshState(tablet *topo.TabletInfo, waitTime time.Duration) error

// RunHealthCheck asks the remote tablet to run a health check cycle
RunHealthCheck(tablet *topo.TabletInfo, targetTabletType topo.TabletType, waitTime time.Duration) error

// ReloadSchema asks the remote tablet to reload its schema
ReloadSchema(tablet *topo.TabletInfo, waitTime time.Duration) error

Expand Down
25 changes: 25 additions & 0 deletions go/vt/vtctl/vtctl.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,9 @@ var commands = []commandGroup{
command{"RefreshState", commandRefreshState,
"<tablet alias|zk tablet path>",
"Asks a remote tablet to reload its tablet record."},
command{"RunHealthCheck", commandRunHealthCheck,
"<tablet alias> <target tablet type>",
"Asks a remote tablet to run a health check with the providd target type."},
command{"Query", commandQuery,
"<cell> <keyspace> <query>",
"Send a SQL query to a tablet."},
Expand Down Expand Up @@ -866,6 +869,28 @@ func commandRefreshState(wr *wrangler.Wrangler, subFlags *flag.FlagSet, args []s
return wr.TabletManagerClient().RefreshState(tabletInfo, wr.ActionTimeout())
}

func commandRunHealthCheck(wr *wrangler.Wrangler, subFlags *flag.FlagSet, args []string) error {
if err := subFlags.Parse(args); err != nil {
return err
}
if subFlags.NArg() != 2 {
return fmt.Errorf("action RunHealthCheck requires <tablet alias> <target tablet type>")
}
tabletAlias, err := tabletParamToTabletAlias(subFlags.Arg(0))
if err != nil {
return err
}
servedType, err := parseTabletType(subFlags.Arg(1), []topo.TabletType{topo.TYPE_REPLICA, topo.TYPE_RDONLY})
if err != nil {
return err
}
tabletInfo, err := wr.TopoServer().GetTablet(tabletAlias)
if err != nil {
return err
}
return wr.TabletManagerClient().RunHealthCheck(tabletInfo, servedType, wr.ActionTimeout())
}

func commandQuery(wr *wrangler.Wrangler, subFlags *flag.FlagSet, args []string) error {
if err := subFlags.Parse(args); err != nil {
return err
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,16 @@ public DatabaseException(String message) {
}
}

/**
* Exception raised by MySQL due to violation of unique key constraint
*/
@SuppressWarnings("serial")
public static class IntegrityException extends DatabaseException {
public IntegrityException(String message) {
super(message);
}
}

/**
* Exception caused due to irrecoverable connection failures or other low level exceptions
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import com.youtube.vitess.vtgate.Exceptions.ConnectionException;
import com.youtube.vitess.vtgate.Exceptions.DatabaseException;
import com.youtube.vitess.vtgate.Exceptions.IntegrityException;
import com.youtube.vitess.vtgate.cursor.Cursor;
import com.youtube.vitess.vtgate.cursor.CursorImpl;
import com.youtube.vitess.vtgate.cursor.StreamCursor;
Expand Down Expand Up @@ -36,18 +37,17 @@
*vtgate.close();
*</pre>
*
* TODO: Currently only ExecuteKeyspaceIds is supported, add the rest.
*/
public class VtGate {

private static String INTEGRITY_ERROR_MSG = "(errno 1062)";
private RpcClient client;
private Object session;

/**
* Opens connection to a VtGate server. Connection remains open until close() is called.
*
* @param addresses comma separated list of host:port pairs
* @params timeoutMs connection timeout in milliseconds, 0 for no timeout
* @param timeoutMs connection timeout in milliseconds, 0 for no timeout
* @throws ConnectionException
*/
public static VtGate connect(String addresses, int timeoutMs) throws ConnectionException {
Expand All @@ -70,7 +70,11 @@ public Cursor execute(Query query) throws DatabaseException, ConnectionException
query.setSession(session);
}
QueryResponse response = client.execute(query);
if (response.getError() != null) {
String error = response.getError();
if (error != null) {
if (error.contains(INTEGRITY_ERROR_MSG)) {
throw new IntegrityException(error);
}
throw new DatabaseException(response.getError());
}
if (response.getSession() != null) {
Expand All @@ -87,7 +91,11 @@ public List<Cursor> execute(BatchQuery query) throws DatabaseException, Connecti
query.setSession(session);
}
BatchQueryResponse response = client.batchExecute(query);
if (response.getError() != null) {
String error = response.getError();
if (error != null) {
if (error.contains(INTEGRITY_ERROR_MSG)) {
throw new IntegrityException(error);
}
throw new DatabaseException(response.getError());
}
if (response.getSession() != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import com.youtube.vitess.vtgate.BindVariable;
import com.youtube.vitess.vtgate.Exceptions.ConnectionException;
import com.youtube.vitess.vtgate.Exceptions.DatabaseException;
import com.youtube.vitess.vtgate.Exceptions.IntegrityException;
import com.youtube.vitess.vtgate.KeyRange;
import com.youtube.vitess.vtgate.KeyspaceId;
import com.youtube.vitess.vtgate.Query;
Expand Down Expand Up @@ -332,6 +333,29 @@ public void testBatchExecuteKeyspaceIds() throws Exception {
vtgate.close();
}

@Test
public void testIntegrityException() throws Exception {
VtGate vtgate = VtGate.connect("localhost:" + testEnv.port, 0);
String insertSql = "insert into vtgate_test(id, keyspace_id) values (:id, :keyspace_id)";
KeyspaceId kid = testEnv.getAllKeyspaceIds().get(0);
Query insertQuery = new QueryBuilder(insertSql, testEnv.keyspace, "master")
.addBindVar(BindVariable.forInt("id", 1))
.addBindVar(BindVariable.forULong("keyspace_id", ((UnsignedLong) kid.getId())))
.addKeyspaceId(kid).build();
vtgate.begin();
vtgate.execute(insertQuery);
vtgate.commit();
vtgate.begin();
try {
vtgate.execute(insertQuery);
Assert.fail("failed to throw exception");
} catch (IntegrityException e) {
} finally {
vtgate.rollback();
vtgate.close();
}
}

/**
* Create env with two shards each having a master and replica
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ public class Util {
public static void setupTestEnv(TestEnv testEnv, boolean isSetUp) throws Exception {
ProcessBuilder pb = new ProcessBuilder(SetupCommand.get(testEnv, isSetUp));
pb.redirectErrorStream(true);
pb.environment().put("VTDATAROOT", "/dev/shm/vt");
Process p = pb.start();
BufferedReader br = new BufferedReader(new InputStreamReader(p.getInputStream()));

Expand Down
19 changes: 17 additions & 2 deletions test/resharding.py
Original file line number Diff line number Diff line change
Expand Up @@ -433,8 +433,12 @@ def _test_keyrange_constraints(self):

def _check_query_service(self, tablet, serving, tablet_control_disabled):
"""_check_query_service will check that the query service is enabled
or disabled on the tablet. It will also check if the tablet control
status is the reason for being enabled / disabled."""
or disabled on the tablet. It will also check if the tablet control
status is the reason for being enabled / disabled.
It will also run a remote RunHealthCheck to be sure it doesn't change
the serving state.
"""
tablet_vars = utils.get_vars(tablet.port)
if serving:
expected_state = 'SERVING'
Expand All @@ -448,6 +452,17 @@ def _check_query_service(self, tablet, serving, tablet_control_disabled):
else:
self.assertNotIn("Query Service disabled by TabletControl", status)

if tablet.tablet_type == 'rdonly':
utils.run_vtctl(['RunHealthCheck', tablet.tablet_alias, 'rdonly'],
auto_log=True)

tablet_vars = utils.get_vars(tablet.port)
if serving:
expected_state = 'SERVING'
else:
expected_state = 'NOT_SERVING'
self.assertEqual(tablet_vars['TabletStateName'], expected_state, 'tablet %s is not in the right serving state after health check: got %s expected %s' % (tablet.tablet_alias, tablet_vars['TabletStateName'], expected_state))

def test_resharding(self):
utils.run_vtctl(['CreateKeyspace',
'--sharding_column_name', 'bad_column',
Expand Down

0 comments on commit b4e353b

Please sign in to comment.