1. How do I check what SQL is currently being executed by the database, and in what state?
2. How do I terminate abnormal SQL? For instance, if a SELECT statement used to query a table with massive data does not carry query conditions, it would drag down the performance of the entire database. This may push to want to terminate this abnormal SQL.
In response to the above issues, Apache ShardingSphere introduced functions such as Show processlist and Kill <processID>.
[IMG]https://miro.medium.com/max/720/1*xs4XXyTqJgCfZy9Y2aSV7Q@2x.png[/IMG]
1. Introduction
Show processlist: this command can display the list of SQL currently being executed by ShardingSphere and the execution progress of each SQL. If ShardingSphere is deployed in cluster mode, the Show processlist function aggregates the SQL running for all Proxy instances in the cluster and then displays the result, so you can always see all the SQL running at that moment.
Expand|Select|Wrap|Line Numbers
- mysql> show processlist \G;
- *************************** 1. row ***************************
- Id: 82a67f254959e0a0807a00f3cd695d87
- User: root
- Host: 10.200.79.156
- db: root
- Command: Execute
- Time: 19
- State: Executing 0/1
- Info: update t_order set version = 456
- 1 row in set (0.24 sec)
Expand|Select|Wrap|Line Numbers
- mysql> kill 82a67f254959e0a0807a00f3cd695d87;
- Query OK, 0 rows affected (0.17 sec)
Now that you understand the functions of Show processlist and Kill <processID>, let's see how the two commands work. As the working principle behind Kill <processID> is similar to that of Show processlist, we'll focus on the interpretation of Show processlist.
2.1 How is SQL saved and destroyed?
Each SQL executed in ShardingSphere will generate an ExecutionGroupContext object. The object contains all the information about this SQL, among which there is an executionID field to ensure its uniqueness.
When ShardingSphere receives a SQL command, the GovernanceExecuteProcessReporter#report is called to store ExecutionGroupContext information into the cache of ConcurrentHashMap (currently only DML and DDL statements of MySQL are supported; other types of databases will be supported in later versions. Query statements are also classified into DML).
Expand|Select|Wrap|Line Numbers
- public final class GovernanceExecuteProcessReporter implements ExecuteProcessReporter {
- @Override
- public void report(final QueryContext queryContext, final ExecutionGroupContext<? extends SQLExecutionUnit> executionGroupContext,
- final ExecuteProcessConstants constants, final EventBusContext eventBusContext) {
- ExecuteProcessContext executeProcessContext = new ExecuteProcessContext(queryContext.getSql(), executionGroupContext, constants);
- ShowProcessListManager.getInstance().putProcessContext(executeProcessContext.getExecutionID(), executeProcessContext);
- ShowProcessListManager.getInstance().putProcessStatement(executeProcessContext.getExecutionID(), executeProcessContext.getProcessStatements());
- }
- }
- @NoArgsConstructor(access = AccessLevel.PRIVATE)
- public final class ShowProcessListManager {
- private static final ShowProcessListManager INSTANCE = new ShowProcessListManager();
- @Getter
- private final Map<String, ExecuteProcessContext> processContexts = new ConcurrentHashMap<>();
- @Getter
- private final Map<String, Collection<Statement>> processStatements = new ConcurrentHashMap<>();
- public static ShowProcessListManager getInstance() {
- return INSTANCE;
- }
- public void putProcessContext(final String executionId, final ExecuteProcessContext processContext) {
- processContexts.put(executionId, processContext);
- }
- public void putProcessStatement(final String executionId, final Collection<Statement> statements) {
- if (statements.isEmpty()) {
- return;
- }
- processStatements.put(executionId, statements);
- }
- }
The latter contains the mapping between executionID and Statement objects that may generate multiple statements after the SQL is overwritten.
Every time ShardingSphere receives a SQL statement, the SQL information will be cached into the two Maps. After SQL is executed, the cache of Map will be deleted.
Expand|Select|Wrap|Line Numbers
- @RequiredArgsConstructor
- public final class ProxyJDBCExecutor {
- private final String type;
- private final ConnectionSession connectionSession;
- private final JDBCDatabaseCommunicationEngine databaseCommunicationEngine;
- private final JDBCExecutor jdbcExecutor;
- public List<ExecuteResult> execute(final QueryContext queryContext, final ExecutionGroupContext<JDBCExecutionUnit> executionGroupContext,
- final boolean isReturnGeneratedKeys, final boolean isExceptionThrown) throws SQLException {
- try {
- MetaDataContexts metaDataContexts = ProxyContext.getInstance().getContextManager().getMetaDataContexts();
- EventBusContext eventBusContext = ProxyContext.getInstance().getContextManager().getInstanceContext().getEventBusContext();
- ShardingSphereDatabase database = metaDataContexts.getMetaData().getDatabase(connectionSession.getDatabaseName());
- DatabaseType protocolType = database.getProtocolType();
- DatabaseType databaseType = database.getResource().getDatabaseType();
- ExecuteProcessEngine.initialize(queryContext, executionGroupContext, eventBusContext);
- SQLStatementContext<?> context = queryContext.getSqlStatementContext();
- List<ExecuteResult> result = jdbcExecutor.execute(executionGroupContext,
- ProxyJDBCExecutorCallbackFactory.newInstance(type, protocolType, databaseType, context.getSqlStatement(), databaseCommunicationEngine, isReturnGeneratedKeys, isExceptionThrown,
- true),
- ProxyJDBCExecutorCallbackFactory.newInstance(type, protocolType, databaseType, context.getSqlStatement(), databaseCommunicationEngine, isReturnGeneratedKeys, isExceptionThrown,
- false));
- ExecuteProcessEngine.finish(executionGroupContext.getExecutionID(), eventBusContext);
- return result;
- } finally {
- ExecuteProcessEngine.clean();
- }
- }
The SQL shown in the Show processlist was obtained from processContexts. But this Map is just a local cache. If ShardingSphere is deployed in cluster mode, how does Show processlist obtain SQL running on other machines in the cluster? Let's see how ShardingSphere handles it.
2.2 How does Show processlist work?
When ShardingSphere receives the Show process command, it is sent to the executor ShowProcessListExecutor#execute for processing. The implementation of the getQueryResult() is the focus.
Expand|Select|Wrap|Line Numbers
- public final class ShowProcessListExecutor implements DatabaseAdminQueryExecutor {
- private Collection<String> batchProcessContexts;
- @Getter
- private QueryResultMetaData queryResultMetaData;
- @Getter
- private MergedResult mergedResult;
- public ShowProcessListExecutor() {
- ProxyContext.getInstance().getContextManager().getInstanceContext().getEventBusContext().register(this);
- }
- @Subscribe
- public void receiveProcessListData(final ShowProcessListResponseEvent event) {
- batchProcessContexts = event.getBatchProcessContexts();
- }
- @Override
- public void execute(final ConnectionSession connectionSession) {
- queryResultMetaData = createQueryResultMetaData();
- mergedResult = new TransparentMergedResult(getQueryResult());
- }
- private QueryResult getQueryResult() {
- ProxyContext.getInstance().getContextManager().getInstanceContext().getEventBusContext().post(new ShowProcessListRequestEvent());
- if (null == batchProcessContexts || batchProcessContexts.isEmpty()) {
- return new RawMemoryQueryResult(queryResultMetaData, Collections.emptyList());
- }
- Collection<YamlExecuteProcessContext> processContexts = new LinkedList<>();
- for (String each : batchProcessContexts) {
- processContexts.addAll(YamlEngine.unmarshal(each, BatchYamlExecuteProcessContext.class).getContexts());
- }
- List<MemoryQueryResultDataRow> rows = processContexts.stream().map(processContext -> {
- List<Object> rowValues = new ArrayList<>(8);
- rowValues.add(processContext.getExecutionID());
- rowValues.add(processContext.getUsername());
- rowValues.add(processContext.getHostname());
- rowValues.add(processContext.getDatabaseName());
- rowValues.add("Execute");
- rowValues.add(TimeUnit.MILLISECONDS.toSeconds(System.currentTimeMillis() - processContext.getStartTimeMillis()));
- int processDoneCount = processContext.getUnitStatuses().stream().map(each -> ExecuteProcessConstants.EXECUTE_STATUS_DONE == each.getStatus() ? 1 : 0).reduce(0, Integer::sum);
- String statePrefix = "Executing ";
- rowValues.add(statePrefix + processDoneCount + "/" + processContext.getUnitStatuses().size());
- String sql = processContext.getSql();
- if (null != sql && sql.length() > 100) {
- sql = sql.substring(0, 100);
- }
- rowValues.add(null != sql ? sql : "");
- return new MemoryQueryResultDataRow(rowValues);
- }).collect(Collectors.toList());
- return new RawMemoryQueryResult(queryResultMetaData, rows);
- }
- private QueryResultMetaData createQueryResultMetaData() {
- List<RawQueryResultColumnMetaData> columns = new ArrayList<>();
- columns.add(new RawQueryResultColumnMetaData("", "Id", "Id", Types.VARCHAR, "VARCHAR", 20, 0));
- columns.add(new RawQueryResultColumnMetaData("", "User", "User", Types.VARCHAR, "VARCHAR", 20, 0));
- columns.add(new RawQueryResultColumnMetaData("", "Host", "Host", Types.VARCHAR, "VARCHAR", 64, 0));
- columns.add(new RawQueryResultColumnMetaData("", "db", "db", Types.VARCHAR, "VARCHAR", 64, 0));
- columns.add(new RawQueryResultColumnMetaData("", "Command", "Command", Types.VARCHAR, "VARCHAR", 64, 0));
- columns.add(new RawQueryResultColumnMetaData("", "Time", "Time", Types.VARCHAR, "VARCHAR", 10, 0));
- columns.add(new RawQueryResultColumnMetaData("", "State", "State", Types.VARCHAR, "VARCHAR", 64, 0));
- columns.add(new RawQueryResultColumnMetaData("", "Info", "Info", Types.VARCHAR, "VARCHAR", 120, 0));
- return new RawQueryResultMetaData(columns);
- }
- }
getQueryResult() method will post ShowProcessListRequestEvent. ProcessRegistrySubscriber#loadShowProcessListData uses the @Subscribe annotations to subscribe to the event.
This method is the core to implementing Show processlist. Next, we'll introduce specific procedures of this method.
Expand|Select|Wrap|Line Numbers
- public final class ProcessRegistrySubscriber {
- @Subscribe
- public void loadShowProcessListData(final ShowProcessListRequestEvent event) {
- String processListId = new UUID(ThreadLocalRandom.current().nextLong(), ThreadLocalRandom.current().nextLong()).toString().replace("-", "");
- boolean triggerIsComplete = false;
- // 1. Obtain the Process List path of all existing proxy nodes in cluster mode
- Collection<String> triggerPaths = getTriggerPaths(processListId);
- try {
- // 2. Iterate through the path and write an empty string to the node, to trigger the node monitoring.
- triggerPaths.forEach(each -> repository.persist(each, ""));
- // 3. Lock and wait 5 seconds for each node to write the information of currently running SQL to the persistence layer.
- triggerIsComplete = waitAllNodeDataReady(processListId, triggerPaths);
- // 4. Fetch and aggregate the data written by each proxy node from the persistence layer. Then EventBus will post a ShowProcessListResponseEvent command, which means the operation is completed.
- sendShowProcessList(processListId);
- } finally {
- // 5. Delete resources
- repository.delete(ProcessNode.getProcessListIdPath(processListId));
- if (!triggerIsComplete) {
- triggerPaths.forEach(repository::delete);
- }
- }
- }
- }
2.2.1 Step 2: the cluster obtains the data implementation
In this step, an empty string will be written to the node /nodes/compute_nodes/process_trigger/<instanceId>:<processlistId>, which will trigger ShardingSphere's monitoring logic.
When ShardingSphere is started, the persistence layer will watch to monitor a series of path changes, such as the addition, deletion, and modification operations of the path /nodes/compute_nodes.
However, monitoring is an asynchronous process and the main thread does not block, so step 3 is required to lock and wait for each ShardingSphere node to write its currently running SQL information into the persistence layer.
Let's take a look at how the ShardingSphere handles the monitoring logic.
Expand|Select|Wrap|Line Numbers
- public final class ComputeNodeStateChangedWatcher implements GovernanceWatcher<GovernanceEvent> {
- @Override
- public Collection<String> getWatchingKeys(final String databaseName) {
- return Collections.singleton(ComputeNode.getComputeNodePath());
- }
- @Override
- public Collection<Type> getWatchingTypes() {
- return Arrays.asList(Type.ADDED, Type.UPDATED, Type.DELETED);
- }
- @SuppressWarnings("unchecked")
- @Override
- public Optional<GovernanceEvent> createGovernanceEvent(final DataChangedEvent event) {
- String instanceId = ComputeNode.getInstanceIdByComputeNode(event.getKey());
- if (!Strings.isNullOrEmpty(instanceId)) {
- ...
- } else if (event.getKey().startsWith(ComputeNode.getOnlineInstanceNodePath())) {
- return createInstanceEvent(event);
- // show processlist
- } else if (event.getKey().startsWith(ComputeNode.getProcessTriggerNodePatch())) {
- return createShowProcessListTriggerEvent(event);
- // kill processlistId
- } else if (event.getKey().startsWith(ComputeNode.getProcessKillNodePatch())) {
- return createKillProcessListIdEvent(event);
- }
- return Optional.empty();
- }
- private Optional<GovernanceEvent> createShowProcessListTriggerEvent(final DataChangedEvent event) {
- Matcher matcher = getShowProcessTriggerMatcher(event);
- if (!matcher.find()) {
- return Optional.empty();
- }
- if (Type.ADDED == event.getType()) {
- return Optional.of(new ShowProcessListTriggerEvent(matcher.group(1), matcher.group(2)));
- }
- if (Type.DELETED == event.getType()) {
- return Optional.of(new ShowProcessListUnitCompleteEvent(matcher.group(2)));
- }
- return Optional.empty();
- }
- }
As shown in the above code, it is a new node, so ShowProcessListTriggerEvent will be posted. As each ShardingSphere instance will monitor /nodes/compute_nodes, each instance will process ShowProcessListTriggerEvent.
In this case, single-machine processing is transformed into cluster processing. Let's look at how ShardingSphere handles it.
Expand|Select|Wrap|Line Numbers
- public final class ClusterContextManagerCoordinator {
- @Subscribe
- public synchronized void triggerShowProcessList(final ShowProcessListTriggerEvent event) {
- if (!event.getInstanceId().equals(contextManager.getInstanceContext().getInstance().getMetaData().getId())) {
- return;
- }
- Collection<ExecuteProcessContext> processContexts = ShowProcessListManager.getInstance().getAllProcessContext();
- if (!processContexts.isEmpty()) {
- registryCenter.getRepository().persist(ProcessNode.getProcessListInstancePath(event.getProcessListId(), event.getInstanceId()),
- YamlEngine.marshal(new BatchYamlExecuteProcessContext(processContexts)));
- }
- registryCenter.getRepository().delete(ComputeNode.getProcessTriggerInstanceIdNodePath(event.getInstanceId(), event.getProcessListId()));
- }
- }
When you delete the node, monitoring will also be triggered and ShowProcessListUnitCompleteEvent will be posted. This event will finally awake the pending lock.
Expand|Select|Wrap|Line Numbers
- public final class ClusterContextManagerCoordinator {
- @Subscribe
- public synchronized void completeUnitShowProcessList(final ShowProcessListUnitCompleteEvent event) {
- ShowProcessListSimpleLock simpleLock = ShowProcessListManager.getInstance().getLocks().get(event.getProcessListId());
- if (null != simpleLock) {
- simpleLock.doNotify();
- }
- }
- }
ShardingSphere uses the isReady(Paths) method to determine whether all instances have been processed. It returns true only when all instances have been processed.
There is a maximum waiting time of 5 seconds for data processing. If the processing is not completed in 5 seconds, then false is returned.
Expand|Select|Wrap|Line Numbers
- public final class ClusterContextManagerCoordinator {
- @Subscribe
- public synchronized void completeUnitShowProcessList(final ShowProcessListUnitCompleteEvent event) {
- ShowProcessListSimpleLock simpleLock = ShowProcessListManager.getInstance().getLocks().get(event.getProcessListId());
- if (null != simpleLock) {
- simpleLock.doNotify();
- }
- }
- }
After each instance processed the data, the instance that received the Show processlist command needs to aggregate the data and then display the result.
Expand|Select|Wrap|Line Numbers
- public final class ProcessRegistrySubscriber {
- private void sendShowProcessList(final String processListId) {
- List<String> childrenKeys = repository.getChildrenKeys(ProcessNode.getProcessListIdPath(processListId));
- Collection<String> batchProcessContexts = new LinkedList<>();
- for (String each : childrenKeys) {
- batchProcessContexts.add(repository.get(ProcessNode.getProcessListInstancePath(processListId, each)));
- }
- eventBusContext.post(new ShowProcessListResponseEvent(batchProcessContexts));
- }
- }
This event will be consumed by ShowProcessListExecutor#receiveProcessListData, and the getQueryResult() method will proceed to show the queryResult.
So far, we’ve completed the execution process of Show processlist command.
2.3 How does Kill <processId> work?
Kill <processId> shares a similar logic with Show processlist, that is to combine EventBus with the watch mechanism.
Since we do not know which SQL the processId belongs to, it is also necessary to add empty nodes for each instance.
Through the watch mechanism, each ShardingSphere instance watches to the new node and checks whether the processId key is in the cache Map. If yes, fetch the value corresponding to the key.
The value is a Collection<Statement> collection. Then you only have to iterate through the Statement collection and call statement.cancel() in turn. The underlying layer is java.sql.Statement#cancel() method called to cancel SQL execution.
3. Conclusion
Currently, Apache ShardingSphere can only implement the Show processlist and Kill <processId> functions for MySQL dialects.
Once you get to know how they work, and if you’re interested, you‘re welcome to participate in the development of related functions. Our community is very open and anyone who is interested in contributing to open source code is welcome.
Relevant Links:
Apache ShardingSphere Official Website
Apache ShardingSphere GitHub
Apache ShardingSphere Slack Channel
Author
Xu Yang, a middleware R&D engineer at Servyou Group. Responsible for the table and database sharding with massive data. An open source enthusiast and ShardingSphere contributor. Currently, he’s interested in developing the kernel module of the ShardingSphere project.