243 changed files with 9757 additions and 4409 deletions
File diff suppressed because one or more lines are too long
File diff suppressed because one or more lines are too long
@ -1,300 +0,0 @@ |
|||
/** |
|||
* Copyright © 2016-2020 The Thingsboard Authors |
|||
* |
|||
* Licensed under the Apache License, Version 2.0 (the "License"); |
|||
* you may not use this file except in compliance with the License. |
|||
* You may obtain a copy of the License at |
|||
* |
|||
* http://www.apache.org/licenses/LICENSE-2.0
|
|||
* |
|||
* Unless required by applicable law or agreed to in writing, software |
|||
* distributed under the License is distributed on an "AS IS" BASIS, |
|||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|||
* See the License for the specific language governing permissions and |
|||
* limitations under the License. |
|||
*/ |
|||
package org.thingsboard.server.service.install; |
|||
|
|||
import com.datastax.driver.core.KeyspaceMetadata; |
|||
import com.datastax.driver.core.exceptions.InvalidQueryException; |
|||
import lombok.extern.slf4j.Slf4j; |
|||
import org.springframework.beans.factory.annotation.Autowired; |
|||
import org.springframework.context.annotation.Profile; |
|||
import org.springframework.stereotype.Service; |
|||
import org.thingsboard.server.dao.dashboard.DashboardService; |
|||
import org.thingsboard.server.dao.util.NoSqlDao; |
|||
import org.thingsboard.server.service.install.cql.CassandraDbHelper; |
|||
|
|||
import java.nio.file.Files; |
|||
import java.nio.file.Path; |
|||
import java.nio.file.Paths; |
|||
|
|||
import static org.thingsboard.server.service.install.DatabaseHelper.ADDITIONAL_INFO; |
|||
import static org.thingsboard.server.service.install.DatabaseHelper.ASSET; |
|||
import static org.thingsboard.server.service.install.DatabaseHelper.ASSIGNED_CUSTOMERS; |
|||
import static org.thingsboard.server.service.install.DatabaseHelper.CONFIGURATION; |
|||
import static org.thingsboard.server.service.install.DatabaseHelper.CUSTOMER_ID; |
|||
import static org.thingsboard.server.service.install.DatabaseHelper.DASHBOARD; |
|||
import static org.thingsboard.server.service.install.DatabaseHelper.DEVICE; |
|||
import static org.thingsboard.server.service.install.DatabaseHelper.END_TS; |
|||
import static org.thingsboard.server.service.install.DatabaseHelper.ENTITY_ID; |
|||
import static org.thingsboard.server.service.install.DatabaseHelper.ENTITY_TYPE; |
|||
import static org.thingsboard.server.service.install.DatabaseHelper.ENTITY_VIEW; |
|||
import static org.thingsboard.server.service.install.DatabaseHelper.ENTITY_VIEWS; |
|||
import static org.thingsboard.server.service.install.DatabaseHelper.ID; |
|||
import static org.thingsboard.server.service.install.DatabaseHelper.KEYS; |
|||
import static org.thingsboard.server.service.install.DatabaseHelper.NAME; |
|||
import static org.thingsboard.server.service.install.DatabaseHelper.SEARCH_TEXT; |
|||
import static org.thingsboard.server.service.install.DatabaseHelper.START_TS; |
|||
import static org.thingsboard.server.service.install.DatabaseHelper.TENANT_ID; |
|||
import static org.thingsboard.server.service.install.DatabaseHelper.TITLE; |
|||
import static org.thingsboard.server.service.install.DatabaseHelper.TYPE; |
|||
|
|||
@Service |
|||
@NoSqlDao |
|||
@Profile("install") |
|||
@Slf4j |
|||
public class CassandraDatabaseUpgradeService extends AbstractCassandraDatabaseUpgradeService implements DatabaseEntitiesUpgradeService { |
|||
|
|||
private static final String SCHEMA_UPDATE_CQL = "schema_update.cql"; |
|||
|
|||
@Autowired |
|||
private DashboardService dashboardService; |
|||
|
|||
@Autowired |
|||
private InstallScripts installScripts; |
|||
|
|||
@Override |
|||
public void upgradeDatabase(String fromVersion) throws Exception { |
|||
|
|||
switch (fromVersion) { |
|||
case "1.2.3": |
|||
|
|||
log.info("Upgrading Cassandara DataBase from version {} to 1.3.0 ...", fromVersion); |
|||
|
|||
//Dump devices, assets and relations
|
|||
|
|||
cluster.getSession(); |
|||
|
|||
KeyspaceMetadata ks = cluster.getCluster().getMetadata().getKeyspace(cluster.getKeyspaceName()); |
|||
|
|||
log.info("Dumping devices ..."); |
|||
Path devicesDump = CassandraDbHelper.dumpCfIfExists(ks, cluster.getSession(), DEVICE, |
|||
new String[]{"id", TENANT_ID, CUSTOMER_ID, "name", SEARCH_TEXT, ADDITIONAL_INFO, "type"}, |
|||
new String[]{"", "", "", "", "", "", "default"}, |
|||
"tb-devices"); |
|||
log.info("Devices dumped."); |
|||
|
|||
log.info("Dumping assets ..."); |
|||
Path assetsDump = CassandraDbHelper.dumpCfIfExists(ks, cluster.getSession(), ASSET, |
|||
new String[]{"id", TENANT_ID, CUSTOMER_ID, "name", SEARCH_TEXT, ADDITIONAL_INFO, "type"}, |
|||
new String[]{"", "", "", "", "", "", "default"}, |
|||
"tb-assets"); |
|||
log.info("Assets dumped."); |
|||
|
|||
log.info("Dumping relations ..."); |
|||
Path relationsDump = CassandraDbHelper.dumpCfIfExists(ks, cluster.getSession(), "relation", |
|||
new String[]{"from_id", "from_type", "to_id", "to_type", "relation_type", ADDITIONAL_INFO, "relation_type_group"}, |
|||
new String[]{"", "", "", "", "", "", "COMMON"}, |
|||
"tb-relations"); |
|||
log.info("Relations dumped."); |
|||
|
|||
log.info("Updating schema ..."); |
|||
Path schemaUpdateFile = Paths.get(installScripts.getDataDir(), "upgrade", "1.3.0", SCHEMA_UPDATE_CQL); |
|||
loadCql(schemaUpdateFile); |
|||
log.info("Schema updated."); |
|||
|
|||
//Restore devices, assets and relations
|
|||
|
|||
log.info("Restoring devices ..."); |
|||
if (devicesDump != null) { |
|||
CassandraDbHelper.loadCf(ks, cluster.getSession(), DEVICE, |
|||
new String[]{"id", TENANT_ID, CUSTOMER_ID, "name", SEARCH_TEXT, ADDITIONAL_INFO, "type"}, devicesDump); |
|||
Files.deleteIfExists(devicesDump); |
|||
} |
|||
log.info("Devices restored."); |
|||
|
|||
log.info("Dumping device types ..."); |
|||
Path deviceTypesDump = CassandraDbHelper.dumpCfIfExists(ks, cluster.getSession(), DEVICE, |
|||
new String[]{TENANT_ID, "type"}, |
|||
new String[]{"", ""}, |
|||
"tb-device-types"); |
|||
if (deviceTypesDump != null) { |
|||
CassandraDbHelper.appendToEndOfLine(deviceTypesDump, "DEVICE"); |
|||
} |
|||
log.info("Device types dumped."); |
|||
log.info("Loading device types ..."); |
|||
if (deviceTypesDump != null) { |
|||
CassandraDbHelper.loadCf(ks, cluster.getSession(), "entity_subtype", |
|||
new String[]{TENANT_ID, "type", "entity_type"}, deviceTypesDump); |
|||
Files.deleteIfExists(deviceTypesDump); |
|||
} |
|||
log.info("Device types loaded."); |
|||
|
|||
log.info("Restoring assets ..."); |
|||
if (assetsDump != null) { |
|||
CassandraDbHelper.loadCf(ks, cluster.getSession(), ASSET, |
|||
new String[]{"id", TENANT_ID, CUSTOMER_ID, "name", SEARCH_TEXT, ADDITIONAL_INFO, "type"}, assetsDump); |
|||
Files.deleteIfExists(assetsDump); |
|||
} |
|||
log.info("Assets restored."); |
|||
|
|||
log.info("Dumping asset types ..."); |
|||
Path assetTypesDump = CassandraDbHelper.dumpCfIfExists(ks, cluster.getSession(), ASSET, |
|||
new String[]{TENANT_ID, "type"}, |
|||
new String[]{"", ""}, |
|||
"tb-asset-types"); |
|||
if (assetTypesDump != null) { |
|||
CassandraDbHelper.appendToEndOfLine(assetTypesDump, "ASSET"); |
|||
} |
|||
log.info("Asset types dumped."); |
|||
log.info("Loading asset types ..."); |
|||
if (assetTypesDump != null) { |
|||
CassandraDbHelper.loadCf(ks, cluster.getSession(), "entity_subtype", |
|||
new String[]{TENANT_ID, "type", "entity_type"}, assetTypesDump); |
|||
Files.deleteIfExists(assetTypesDump); |
|||
} |
|||
log.info("Asset types loaded."); |
|||
|
|||
log.info("Restoring relations ..."); |
|||
if (relationsDump != null) { |
|||
CassandraDbHelper.loadCf(ks, cluster.getSession(), "relation", |
|||
new String[]{"from_id", "from_type", "to_id", "to_type", "relation_type", ADDITIONAL_INFO, "relation_type_group"}, relationsDump); |
|||
Files.deleteIfExists(relationsDump); |
|||
} |
|||
log.info("Relations restored."); |
|||
|
|||
break; |
|||
case "1.3.0": |
|||
break; |
|||
case "1.3.1": |
|||
|
|||
cluster.getSession(); |
|||
|
|||
ks = cluster.getCluster().getMetadata().getKeyspace(cluster.getKeyspaceName()); |
|||
|
|||
log.info("Dumping dashboards ..."); |
|||
Path dashboardsDump = CassandraDbHelper.dumpCfIfExists(ks, cluster.getSession(), DASHBOARD, |
|||
new String[]{ID, TENANT_ID, CUSTOMER_ID, TITLE, SEARCH_TEXT, ASSIGNED_CUSTOMERS, CONFIGURATION}, |
|||
new String[]{"", "", "", "", "", "", ""}, |
|||
"tb-dashboards", true); |
|||
log.info("Dashboards dumped."); |
|||
|
|||
|
|||
log.info("Updating schema ..."); |
|||
schemaUpdateFile = Paths.get(installScripts.getDataDir(), "upgrade", "1.4.0", SCHEMA_UPDATE_CQL); |
|||
loadCql(schemaUpdateFile); |
|||
log.info("Schema updated."); |
|||
|
|||
log.info("Restoring dashboards ..."); |
|||
if (dashboardsDump != null) { |
|||
CassandraDbHelper.loadCf(ks, cluster.getSession(), DASHBOARD, |
|||
new String[]{ID, TENANT_ID, TITLE, SEARCH_TEXT, CONFIGURATION}, dashboardsDump, true); |
|||
DatabaseHelper.upgradeTo40_assignDashboards(dashboardsDump, dashboardService, false); |
|||
Files.deleteIfExists(dashboardsDump); |
|||
} |
|||
log.info("Dashboards restored."); |
|||
break; |
|||
case "1.4.0": |
|||
|
|||
log.info("Updating schema ..."); |
|||
schemaUpdateFile = Paths.get(installScripts.getDataDir(), "upgrade", "2.0.0", SCHEMA_UPDATE_CQL); |
|||
loadCql(schemaUpdateFile); |
|||
log.info("Schema updated."); |
|||
|
|||
break; |
|||
|
|||
case "2.0.0": |
|||
|
|||
log.info("Updating schema ..."); |
|||
schemaUpdateFile = Paths.get(installScripts.getDataDir(), "upgrade", "2.1.1", SCHEMA_UPDATE_CQL); |
|||
loadCql(schemaUpdateFile); |
|||
log.info("Schema updated."); |
|||
|
|||
break; |
|||
|
|||
case "2.1.1": |
|||
|
|||
log.info("Upgrading Cassandra DataBase from version {} to 2.1.2 ...", fromVersion); |
|||
|
|||
cluster.getSession(); |
|||
|
|||
ks = cluster.getCluster().getMetadata().getKeyspace(cluster.getKeyspaceName()); |
|||
|
|||
log.info("Dumping entity views ..."); |
|||
Path entityViewsDump = CassandraDbHelper.dumpCfIfExists(ks, cluster.getSession(), ENTITY_VIEWS, |
|||
new String[]{ID, ENTITY_ID, ENTITY_TYPE, TENANT_ID, CUSTOMER_ID, NAME, TYPE, KEYS, START_TS, END_TS, SEARCH_TEXT, ADDITIONAL_INFO}, |
|||
new String[]{"", "", "", "", "", "", "default", "", "0", "0", "", ""}, |
|||
"tb-entity-views"); |
|||
log.info("Entity views dumped."); |
|||
|
|||
log.info("Updating schema ..."); |
|||
schemaUpdateFile = Paths.get(installScripts.getDataDir(), "upgrade", "2.1.2", SCHEMA_UPDATE_CQL); |
|||
loadCql(schemaUpdateFile); |
|||
log.info("Schema updated."); |
|||
|
|||
log.info("Restoring entity views ..."); |
|||
if (entityViewsDump != null) { |
|||
CassandraDbHelper.loadCf(ks, cluster.getSession(), ENTITY_VIEW, |
|||
new String[]{ID, ENTITY_ID, ENTITY_TYPE, TENANT_ID, CUSTOMER_ID, NAME, TYPE, KEYS, START_TS, END_TS, SEARCH_TEXT, ADDITIONAL_INFO}, entityViewsDump); |
|||
Files.deleteIfExists(entityViewsDump); |
|||
} |
|||
log.info("Entity views restored."); |
|||
|
|||
break; |
|||
case "2.1.3": |
|||
break; |
|||
case "2.3.0": |
|||
break; |
|||
case "2.3.1": |
|||
log.info("Updating schema ..."); |
|||
String updateDeviceTableStmt = "alter table device add label text"; |
|||
try { |
|||
cluster.getSession().execute(updateDeviceTableStmt); |
|||
Thread.sleep(2500); |
|||
} catch (InvalidQueryException e) { |
|||
} |
|||
log.info("Schema updated."); |
|||
break; |
|||
case "2.4.1": |
|||
log.info("Updating schema ..."); |
|||
String updateAssetTableStmt = "alter table asset add label text"; |
|||
try { |
|||
log.info("Updating assets ..."); |
|||
cluster.getSession().execute(updateAssetTableStmt); |
|||
Thread.sleep(2500); |
|||
log.info("Assets updated."); |
|||
} catch (InvalidQueryException e) { |
|||
} |
|||
log.info("Schema updated."); |
|||
break; |
|||
case "2.4.2": |
|||
log.info("Updating schema ..."); |
|||
String updateAlarmTableStmt = "alter table alarm add propagate_relation_types text"; |
|||
try { |
|||
log.info("Updating alarms ..."); |
|||
cluster.getSession().execute(updateAlarmTableStmt); |
|||
Thread.sleep(2500); |
|||
log.info("Alarms updated."); |
|||
} catch (InvalidQueryException e) { |
|||
} |
|||
log.info("Schema updated."); |
|||
break; |
|||
case "2.4.3": |
|||
log.info("Updating schema ..."); |
|||
String updateAttributeKvTableStmt = "alter table attributes_kv_cf add json_v text"; |
|||
try { |
|||
log.info("Updating attributes ..."); |
|||
cluster.getSession().execute(updateAttributeKvTableStmt); |
|||
Thread.sleep(2500); |
|||
log.info("Attributes updated."); |
|||
} catch (InvalidQueryException e) { |
|||
} |
|||
log.info("Schema updated."); |
|||
break; |
|||
default: |
|||
throw new RuntimeException("Unable to upgrade Cassandra database, unsupported fromVersion: " + fromVersion); |
|||
} |
|||
} |
|||
|
|||
} |
|||
@ -0,0 +1,33 @@ |
|||
/** |
|||
* Copyright © 2016-2020 The Thingsboard Authors |
|||
* |
|||
* Licensed under the Apache License, Version 2.0 (the "License"); |
|||
* you may not use this file except in compliance with the License. |
|||
* You may obtain a copy of the License at |
|||
* |
|||
* http://www.apache.org/licenses/LICENSE-2.0
|
|||
* |
|||
* Unless required by applicable law or agreed to in writing, software |
|||
* distributed under the License is distributed on an "AS IS" BASIS, |
|||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|||
* See the License for the specific language governing permissions and |
|||
* limitations under the License. |
|||
*/ |
|||
package org.thingsboard.server.service.install; |
|||
|
|||
import org.springframework.context.annotation.Profile; |
|||
import org.springframework.stereotype.Service; |
|||
import org.thingsboard.server.dao.util.HsqlDao; |
|||
import org.thingsboard.server.dao.util.SqlDao; |
|||
|
|||
@Service |
|||
@HsqlDao |
|||
@SqlDao |
|||
@Profile("install") |
|||
public class HsqlEntityDatabaseSchemaService extends SqlAbstractDatabaseSchemaService |
|||
implements EntityDatabaseSchemaService { |
|||
protected HsqlEntityDatabaseSchemaService() { |
|||
super("schema-entities-hsql.sql", "schema-entities-idx.sql"); |
|||
} |
|||
} |
|||
|
|||
@ -0,0 +1,314 @@ |
|||
/** |
|||
* Copyright © 2016-2020 The Thingsboard Authors |
|||
* |
|||
* Licensed under the Apache License, Version 2.0 (the "License"); |
|||
* you may not use this file except in compliance with the License. |
|||
* You may obtain a copy of the License at |
|||
* |
|||
* http://www.apache.org/licenses/LICENSE-2.0
|
|||
* |
|||
* Unless required by applicable law or agreed to in writing, software |
|||
* distributed under the License is distributed on an "AS IS" BASIS, |
|||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|||
* See the License for the specific language governing permissions and |
|||
* limitations under the License. |
|||
*/ |
|||
package org.thingsboard.server.service.install.migrate; |
|||
|
|||
import lombok.extern.slf4j.Slf4j; |
|||
import org.springframework.beans.factory.annotation.Autowired; |
|||
import org.springframework.beans.factory.annotation.Value; |
|||
import org.springframework.context.annotation.Profile; |
|||
import org.springframework.stereotype.Service; |
|||
import org.thingsboard.server.common.data.EntityType; |
|||
import org.thingsboard.server.common.data.UUIDConverter; |
|||
import org.thingsboard.server.dao.cassandra.CassandraCluster; |
|||
import org.thingsboard.server.dao.util.NoSqlAnyDao; |
|||
import org.thingsboard.server.dao.util.SqlDao; |
|||
import org.thingsboard.server.service.install.EntityDatabaseSchemaService; |
|||
|
|||
import java.sql.Connection; |
|||
import java.sql.DriverManager; |
|||
import java.util.Arrays; |
|||
import java.util.List; |
|||
|
|||
import static org.thingsboard.server.service.install.migrate.CassandraToSqlColumn.bigintColumn; |
|||
import static org.thingsboard.server.service.install.migrate.CassandraToSqlColumn.booleanColumn; |
|||
import static org.thingsboard.server.service.install.migrate.CassandraToSqlColumn.doubleColumn; |
|||
import static org.thingsboard.server.service.install.migrate.CassandraToSqlColumn.enumToIntColumn; |
|||
import static org.thingsboard.server.service.install.migrate.CassandraToSqlColumn.idColumn; |
|||
import static org.thingsboard.server.service.install.migrate.CassandraToSqlColumn.jsonColumn; |
|||
import static org.thingsboard.server.service.install.migrate.CassandraToSqlColumn.stringColumn; |
|||
|
|||
@Service |
|||
@Profile("install") |
|||
@SqlDao |
|||
@NoSqlAnyDao |
|||
@Slf4j |
|||
public class CassandraEntitiesToSqlMigrateService implements EntitiesMigrateService { |
|||
|
|||
@Autowired |
|||
private EntityDatabaseSchemaService entityDatabaseSchemaService; |
|||
|
|||
@Autowired |
|||
protected CassandraCluster cluster; |
|||
|
|||
@Value("${spring.datasource.url}") |
|||
protected String dbUrl; |
|||
|
|||
@Value("${spring.datasource.username}") |
|||
protected String dbUserName; |
|||
|
|||
@Value("${spring.datasource.password}") |
|||
protected String dbPassword; |
|||
|
|||
|
|||
@Override |
|||
public void migrate() throws Exception { |
|||
log.info("Performing migration of entities data from cassandra to SQL database ..."); |
|||
entityDatabaseSchemaService.createDatabaseSchema(); |
|||
try (Connection conn = DriverManager.getConnection(dbUrl, dbUserName, dbPassword)) { |
|||
conn.setAutoCommit(false); |
|||
for (CassandraToSqlTable table: tables) { |
|||
table.migrateToSql(cluster.getSession(), conn); |
|||
} |
|||
} catch (Exception e) { |
|||
log.error("Unexpected error during ThingsBoard entities data migration!", e); |
|||
throw e; |
|||
} |
|||
} |
|||
|
|||
private static List<CassandraToSqlTable> tables = Arrays.asList( |
|||
new CassandraToSqlTable("admin_settings", |
|||
idColumn("id"), |
|||
stringColumn("key"), |
|||
stringColumn("json_value")), |
|||
new CassandraToSqlTable("alarm", |
|||
idColumn("id"), |
|||
idColumn("tenant_id"), |
|||
stringColumn("type"), |
|||
idColumn("originator_id"), |
|||
enumToIntColumn("originator_type", EntityType.class), |
|||
stringColumn("severity"), |
|||
stringColumn("status"), |
|||
bigintColumn("start_ts"), |
|||
bigintColumn("end_ts"), |
|||
bigintColumn("ack_ts"), |
|||
bigintColumn("clear_ts"), |
|||
stringColumn("details", "additional_info"), |
|||
booleanColumn("propagate"), |
|||
stringColumn("propagate_relation_types")), |
|||
new CassandraToSqlTable("asset", |
|||
idColumn("id"), |
|||
idColumn("tenant_id"), |
|||
idColumn("customer_id"), |
|||
stringColumn("name"), |
|||
stringColumn("type"), |
|||
stringColumn("label"), |
|||
stringColumn("search_text"), |
|||
stringColumn("additional_info")) { |
|||
@Override |
|||
protected boolean onConstraintViolation(List<CassandraToSqlColumnData[]> batchData, |
|||
CassandraToSqlColumnData[] data, String constraint) { |
|||
if (constraint.equalsIgnoreCase("asset_name_unq_key")) { |
|||
this.handleUniqueNameViolation(data, "asset"); |
|||
return true; |
|||
} |
|||
return super.onConstraintViolation(batchData, data, constraint); |
|||
} |
|||
}, |
|||
new CassandraToSqlTable("audit_log_by_tenant_id", "audit_log", |
|||
idColumn("id"), |
|||
idColumn("tenant_id"), |
|||
idColumn("customer_id"), |
|||
idColumn("entity_id"), |
|||
stringColumn("entity_type"), |
|||
stringColumn("entity_name"), |
|||
idColumn("user_id"), |
|||
stringColumn("user_name"), |
|||
stringColumn("action_type"), |
|||
stringColumn("action_data"), |
|||
stringColumn("action_status"), |
|||
stringColumn("action_failure_details")), |
|||
new CassandraToSqlTable("attributes_kv_cf", "attribute_kv", |
|||
idColumn("entity_id"), |
|||
stringColumn("entity_type"), |
|||
stringColumn("attribute_type"), |
|||
stringColumn("attribute_key"), |
|||
booleanColumn("bool_v"), |
|||
stringColumn("str_v"), |
|||
bigintColumn("long_v"), |
|||
doubleColumn("dbl_v"), |
|||
jsonColumn("json_v"), |
|||
bigintColumn("last_update_ts")), |
|||
new CassandraToSqlTable("component_descriptor", |
|||
idColumn("id"), |
|||
stringColumn("type"), |
|||
stringColumn("scope"), |
|||
stringColumn("name"), |
|||
stringColumn("search_text"), |
|||
stringColumn("clazz"), |
|||
stringColumn("configuration_descriptor"), |
|||
stringColumn("actions")), |
|||
new CassandraToSqlTable("customer", |
|||
idColumn("id"), |
|||
idColumn("tenant_id"), |
|||
stringColumn("title"), |
|||
stringColumn("search_text"), |
|||
stringColumn("country"), |
|||
stringColumn("state"), |
|||
stringColumn("city"), |
|||
stringColumn("address"), |
|||
stringColumn("address2"), |
|||
stringColumn("zip"), |
|||
stringColumn("phone"), |
|||
stringColumn("email"), |
|||
stringColumn("additional_info")), |
|||
new CassandraToSqlTable("dashboard", |
|||
idColumn("id"), |
|||
idColumn("tenant_id"), |
|||
stringColumn("title"), |
|||
stringColumn("search_text"), |
|||
stringColumn("assigned_customers"), |
|||
stringColumn("configuration")), |
|||
new CassandraToSqlTable("device", |
|||
idColumn("id"), |
|||
idColumn("tenant_id"), |
|||
idColumn("customer_id"), |
|||
stringColumn("name"), |
|||
stringColumn("type"), |
|||
stringColumn("label"), |
|||
stringColumn("search_text"), |
|||
stringColumn("additional_info")) { |
|||
@Override |
|||
protected boolean onConstraintViolation(List<CassandraToSqlColumnData[]> batchData, |
|||
CassandraToSqlColumnData[] data, String constraint) { |
|||
if (constraint.equalsIgnoreCase("device_name_unq_key")) { |
|||
this.handleUniqueNameViolation(data, "device"); |
|||
return true; |
|||
} |
|||
return super.onConstraintViolation(batchData, data, constraint); |
|||
} |
|||
}, |
|||
new CassandraToSqlTable("device_credentials", |
|||
idColumn("id"), |
|||
idColumn("device_id"), |
|||
stringColumn("credentials_type"), |
|||
stringColumn("credentials_id"), |
|||
stringColumn("credentials_value")), |
|||
new CassandraToSqlTable("event", |
|||
idColumn("id"), |
|||
idColumn("tenant_id"), |
|||
idColumn("entity_id"), |
|||
stringColumn("entity_type"), |
|||
stringColumn("event_type"), |
|||
stringColumn("event_uid"), |
|||
stringColumn("body")), |
|||
new CassandraToSqlTable("relation", |
|||
idColumn("from_id"), |
|||
stringColumn("from_type"), |
|||
idColumn("to_id"), |
|||
stringColumn("to_type"), |
|||
stringColumn("relation_type_group"), |
|||
stringColumn("relation_type"), |
|||
stringColumn("additional_info")), |
|||
new CassandraToSqlTable("user", "tb_user", |
|||
idColumn("id"), |
|||
idColumn("tenant_id"), |
|||
idColumn("customer_id"), |
|||
stringColumn("email"), |
|||
stringColumn("search_text"), |
|||
stringColumn("authority"), |
|||
stringColumn("first_name"), |
|||
stringColumn("last_name"), |
|||
stringColumn("additional_info")) { |
|||
@Override |
|||
protected boolean onConstraintViolation(List<CassandraToSqlColumnData[]> batchData, |
|||
CassandraToSqlColumnData[] data, String constraint) { |
|||
if (constraint.equalsIgnoreCase("tb_user_email_key")) { |
|||
this.handleUniqueEmailViolation(data); |
|||
return true; |
|||
} |
|||
return super.onConstraintViolation(batchData, data, constraint); |
|||
} |
|||
}, |
|||
new CassandraToSqlTable("tenant", |
|||
idColumn("id"), |
|||
stringColumn("title"), |
|||
stringColumn("search_text"), |
|||
stringColumn("region"), |
|||
stringColumn("country"), |
|||
stringColumn("state"), |
|||
stringColumn("city"), |
|||
stringColumn("address"), |
|||
stringColumn("address2"), |
|||
stringColumn("zip"), |
|||
stringColumn("phone"), |
|||
stringColumn("email"), |
|||
stringColumn("additional_info")), |
|||
new CassandraToSqlTable("user_credentials", |
|||
idColumn("id"), |
|||
idColumn("user_id"), |
|||
booleanColumn("enabled"), |
|||
stringColumn("password"), |
|||
stringColumn("activate_token"), |
|||
stringColumn("reset_token")) { |
|||
@Override |
|||
protected boolean onConstraintViolation(List<CassandraToSqlColumnData[]> batchData, |
|||
CassandraToSqlColumnData[] data, String constraint) { |
|||
if (constraint.equalsIgnoreCase("user_credentials_user_id_key")) { |
|||
String id = UUIDConverter.fromString(this.getColumnData(data, "id").getValue()).toString(); |
|||
log.warn("Found user credentials record with duplicate user_id [id:[{}]]. Record will be ignored!", id); |
|||
this.ignoreRecord(batchData, data); |
|||
return true; |
|||
} |
|||
return super.onConstraintViolation(batchData, data, constraint); |
|||
} |
|||
}, |
|||
new CassandraToSqlTable("widget_type", |
|||
idColumn("id"), |
|||
idColumn("tenant_id"), |
|||
stringColumn("bundle_alias"), |
|||
stringColumn("alias"), |
|||
stringColumn("name"), |
|||
stringColumn("descriptor")), |
|||
new CassandraToSqlTable("widgets_bundle", |
|||
idColumn("id"), |
|||
idColumn("tenant_id"), |
|||
stringColumn("alias"), |
|||
stringColumn("title"), |
|||
stringColumn("search_text")), |
|||
new CassandraToSqlTable("rule_chain", |
|||
idColumn("id"), |
|||
idColumn("tenant_id"), |
|||
stringColumn("name"), |
|||
stringColumn("search_text"), |
|||
idColumn("first_rule_node_id"), |
|||
booleanColumn("root"), |
|||
booleanColumn("debug_mode"), |
|||
stringColumn("configuration"), |
|||
stringColumn("additional_info")), |
|||
new CassandraToSqlTable("rule_node", |
|||
idColumn("id"), |
|||
idColumn("rule_chain_id"), |
|||
stringColumn("type"), |
|||
stringColumn("name"), |
|||
booleanColumn("debug_mode"), |
|||
stringColumn("search_text"), |
|||
stringColumn("configuration"), |
|||
stringColumn("additional_info")), |
|||
new CassandraToSqlTable("entity_view", |
|||
idColumn("id"), |
|||
idColumn("tenant_id"), |
|||
idColumn("customer_id"), |
|||
idColumn("entity_id"), |
|||
stringColumn("entity_type"), |
|||
stringColumn("name"), |
|||
stringColumn("type"), |
|||
stringColumn("keys"), |
|||
bigintColumn("start_ts"), |
|||
bigintColumn("end_ts"), |
|||
stringColumn("search_text"), |
|||
stringColumn("additional_info")) |
|||
); |
|||
} |
|||
@ -0,0 +1,168 @@ |
|||
/** |
|||
* Copyright © 2016-2020 The Thingsboard Authors |
|||
* |
|||
* Licensed under the Apache License, Version 2.0 (the "License"); |
|||
* you may not use this file except in compliance with the License. |
|||
* You may obtain a copy of the License at |
|||
* |
|||
* http://www.apache.org/licenses/LICENSE-2.0
|
|||
* |
|||
* Unless required by applicable law or agreed to in writing, software |
|||
* distributed under the License is distributed on an "AS IS" BASIS, |
|||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|||
* See the License for the specific language governing permissions and |
|||
* limitations under the License. |
|||
*/ |
|||
package org.thingsboard.server.service.install.migrate; |
|||
|
|||
import com.datastax.driver.core.Row; |
|||
import lombok.Data; |
|||
import org.thingsboard.server.common.data.UUIDConverter; |
|||
|
|||
import java.sql.PreparedStatement; |
|||
import java.sql.SQLException; |
|||
import java.sql.Types; |
|||
import java.util.regex.Pattern; |
|||
|
|||
@Data |
|||
public class CassandraToSqlColumn { |
|||
|
|||
private static final ThreadLocal<Pattern> PATTERN_THREAD_LOCAL = ThreadLocal.withInitial(() -> Pattern.compile(String.valueOf(Character.MIN_VALUE))); |
|||
private static final String EMPTY_STR = ""; |
|||
|
|||
private int index; |
|||
private int sqlIndex; |
|||
private String cassandraColumnName; |
|||
private String sqlColumnName; |
|||
private CassandraToSqlColumnType type; |
|||
private int sqlType; |
|||
private int size; |
|||
private Class<? extends Enum> enumClass; |
|||
|
|||
public static CassandraToSqlColumn idColumn(String name) { |
|||
return new CassandraToSqlColumn(name, CassandraToSqlColumnType.ID); |
|||
} |
|||
|
|||
public static CassandraToSqlColumn stringColumn(String name) { |
|||
return new CassandraToSqlColumn(name, CassandraToSqlColumnType.STRING); |
|||
} |
|||
|
|||
public static CassandraToSqlColumn stringColumn(String cassandraColumnName, String sqlColumnName) { |
|||
return new CassandraToSqlColumn(cassandraColumnName, sqlColumnName); |
|||
} |
|||
|
|||
public static CassandraToSqlColumn bigintColumn(String name) { |
|||
return new CassandraToSqlColumn(name, CassandraToSqlColumnType.BIGINT); |
|||
} |
|||
|
|||
public static CassandraToSqlColumn doubleColumn(String name) { |
|||
return new CassandraToSqlColumn(name, CassandraToSqlColumnType.DOUBLE); |
|||
} |
|||
|
|||
public static CassandraToSqlColumn booleanColumn(String name) { |
|||
return new CassandraToSqlColumn(name, CassandraToSqlColumnType.BOOLEAN); |
|||
} |
|||
|
|||
public static CassandraToSqlColumn jsonColumn(String name) { |
|||
return new CassandraToSqlColumn(name, CassandraToSqlColumnType.JSON); |
|||
} |
|||
|
|||
public static CassandraToSqlColumn enumToIntColumn(String name, Class<? extends Enum> enumClass) { |
|||
return new CassandraToSqlColumn(name, CassandraToSqlColumnType.ENUM_TO_INT, enumClass); |
|||
} |
|||
|
|||
public CassandraToSqlColumn(String columnName) { |
|||
this(columnName, columnName, CassandraToSqlColumnType.STRING, null); |
|||
} |
|||
|
|||
public CassandraToSqlColumn(String columnName, CassandraToSqlColumnType type) { |
|||
this(columnName, columnName, type, null); |
|||
} |
|||
|
|||
public CassandraToSqlColumn(String columnName, CassandraToSqlColumnType type, Class<? extends Enum> enumClass) { |
|||
this(columnName, columnName, type, enumClass); |
|||
} |
|||
|
|||
public CassandraToSqlColumn(String cassandraColumnName, String sqlColumnName) { |
|||
this(cassandraColumnName, sqlColumnName, CassandraToSqlColumnType.STRING, null); |
|||
} |
|||
|
|||
public CassandraToSqlColumn(String cassandraColumnName, String sqlColumnName, CassandraToSqlColumnType type, |
|||
Class<? extends Enum> enumClass) { |
|||
this.cassandraColumnName = cassandraColumnName; |
|||
this.sqlColumnName = sqlColumnName; |
|||
this.type = type; |
|||
this.enumClass = enumClass; |
|||
} |
|||
|
|||
public String getColumnValue(Row row) { |
|||
if (row.isNull(index)) { |
|||
return null; |
|||
} else { |
|||
switch (this.type) { |
|||
case ID: |
|||
return UUIDConverter.fromTimeUUID(row.getUUID(index)); |
|||
case DOUBLE: |
|||
return Double.toString(row.getDouble(index)); |
|||
case INTEGER: |
|||
return Integer.toString(row.getInt(index)); |
|||
case FLOAT: |
|||
return Float.toString(row.getFloat(index)); |
|||
case BIGINT: |
|||
return Long.toString(row.getLong(index)); |
|||
case BOOLEAN: |
|||
return Boolean.toString(row.getBool(index)); |
|||
case STRING: |
|||
case JSON: |
|||
case ENUM_TO_INT: |
|||
default: |
|||
String value = row.getString(index); |
|||
return this.replaceNullChars(value); |
|||
} |
|||
} |
|||
} |
|||
|
|||
public void setColumnValue(PreparedStatement sqlInsertStatement, String value) throws SQLException { |
|||
if (value == null) { |
|||
sqlInsertStatement.setNull(this.sqlIndex, this.sqlType); |
|||
} else { |
|||
switch (this.type) { |
|||
case DOUBLE: |
|||
sqlInsertStatement.setDouble(this.sqlIndex, Double.parseDouble(value)); |
|||
break; |
|||
case INTEGER: |
|||
sqlInsertStatement.setInt(this.sqlIndex, Integer.parseInt(value)); |
|||
break; |
|||
case FLOAT: |
|||
sqlInsertStatement.setFloat(this.sqlIndex, Float.parseFloat(value)); |
|||
break; |
|||
case BIGINT: |
|||
sqlInsertStatement.setLong(this.sqlIndex, Long.parseLong(value)); |
|||
break; |
|||
case BOOLEAN: |
|||
sqlInsertStatement.setBoolean(this.sqlIndex, Boolean.parseBoolean(value)); |
|||
break; |
|||
case ENUM_TO_INT: |
|||
Enum enumVal = Enum.valueOf(this.enumClass, value); |
|||
int intValue = enumVal.ordinal(); |
|||
sqlInsertStatement.setInt(this.sqlIndex, intValue); |
|||
break; |
|||
case JSON: |
|||
case STRING: |
|||
case ID: |
|||
default: |
|||
sqlInsertStatement.setString(this.sqlIndex, value); |
|||
break; |
|||
} |
|||
} |
|||
} |
|||
|
|||
private String replaceNullChars(String strValue) { |
|||
if (strValue != null) { |
|||
return PATTERN_THREAD_LOCAL.get().matcher(strValue).replaceAll(EMPTY_STR); |
|||
} |
|||
return strValue; |
|||
} |
|||
|
|||
} |
|||
|
|||
@ -0,0 +1,64 @@ |
|||
/** |
|||
* Copyright © 2016-2020 The Thingsboard Authors |
|||
* |
|||
* Licensed under the Apache License, Version 2.0 (the "License"); |
|||
* you may not use this file except in compliance with the License. |
|||
* You may obtain a copy of the License at |
|||
* |
|||
* http://www.apache.org/licenses/LICENSE-2.0
|
|||
* |
|||
* Unless required by applicable law or agreed to in writing, software |
|||
* distributed under the License is distributed on an "AS IS" BASIS, |
|||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|||
* See the License for the specific language governing permissions and |
|||
* limitations under the License. |
|||
*/ |
|||
package org.thingsboard.server.service.install.migrate; |
|||
|
|||
import lombok.Data; |
|||
|
|||
@Data |
|||
public class CassandraToSqlColumnData { |
|||
|
|||
private String value; |
|||
private String originalValue; |
|||
private int constraintCounter = 0; |
|||
|
|||
public CassandraToSqlColumnData(String value) { |
|||
this.value = value; |
|||
this.originalValue = value; |
|||
} |
|||
|
|||
public int nextContraintCounter() { |
|||
return ++constraintCounter; |
|||
} |
|||
|
|||
public String getNextConstraintStringValue(CassandraToSqlColumn column) { |
|||
int counter = this.nextContraintCounter(); |
|||
String newValue = this.originalValue + counter; |
|||
int overflow = newValue.length() - column.getSize(); |
|||
if (overflow > 0) { |
|||
newValue = this.originalValue.substring(0, this.originalValue.length()-overflow) + counter; |
|||
} |
|||
return newValue; |
|||
} |
|||
|
|||
public String getNextConstraintEmailValue(CassandraToSqlColumn column) { |
|||
int counter = this.nextContraintCounter(); |
|||
String[] emailValues = this.originalValue.split("@"); |
|||
String newValue = emailValues[0] + "+" + counter + "@" + emailValues[1]; |
|||
int overflow = newValue.length() - column.getSize(); |
|||
if (overflow > 0) { |
|||
newValue = emailValues[0].substring(0, emailValues[0].length()-overflow) + "+" + counter + "@" + emailValues[1]; |
|||
} |
|||
return newValue; |
|||
} |
|||
|
|||
public String getLogValue() { |
|||
if (this.value != null && this.value.length() > 255) { |
|||
return this.value.substring(0, 255) + "...[truncated " + (this.value.length() - 255) + " symbols]"; |
|||
} |
|||
return this.value; |
|||
} |
|||
|
|||
} |
|||
@ -0,0 +1,28 @@ |
|||
/** |
|||
* Copyright © 2016-2020 The Thingsboard Authors |
|||
* |
|||
* Licensed under the Apache License, Version 2.0 (the "License"); |
|||
* you may not use this file except in compliance with the License. |
|||
* You may obtain a copy of the License at |
|||
* |
|||
* http://www.apache.org/licenses/LICENSE-2.0
|
|||
* |
|||
* Unless required by applicable law or agreed to in writing, software |
|||
* distributed under the License is distributed on an "AS IS" BASIS, |
|||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|||
* See the License for the specific language governing permissions and |
|||
* limitations under the License. |
|||
*/ |
|||
package org.thingsboard.server.service.install.migrate; |
|||
|
|||
public enum CassandraToSqlColumnType { |
|||
ID, |
|||
DOUBLE, |
|||
INTEGER, |
|||
FLOAT, |
|||
BIGINT, |
|||
BOOLEAN, |
|||
STRING, |
|||
JSON, |
|||
ENUM_TO_INT |
|||
} |
|||
@ -0,0 +1,308 @@ |
|||
/** |
|||
* Copyright © 2016-2020 The Thingsboard Authors |
|||
* |
|||
* Licensed under the Apache License, Version 2.0 (the "License"); |
|||
* you may not use this file except in compliance with the License. |
|||
* You may obtain a copy of the License at |
|||
* |
|||
* http://www.apache.org/licenses/LICENSE-2.0
|
|||
* |
|||
* Unless required by applicable law or agreed to in writing, software |
|||
* distributed under the License is distributed on an "AS IS" BASIS, |
|||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|||
* See the License for the specific language governing permissions and |
|||
* limitations under the License. |
|||
*/ |
|||
package org.thingsboard.server.service.install.migrate; |
|||
|
|||
import com.datastax.driver.core.ResultSet; |
|||
import com.datastax.driver.core.Row; |
|||
import com.datastax.driver.core.Session; |
|||
import com.datastax.driver.core.SimpleStatement; |
|||
import com.datastax.driver.core.Statement; |
|||
import lombok.Data; |
|||
import lombok.extern.slf4j.Slf4j; |
|||
import org.hibernate.exception.ConstraintViolationException; |
|||
import org.hibernate.internal.util.JdbcExceptionHelper; |
|||
import org.postgresql.util.PSQLException; |
|||
import org.thingsboard.server.common.data.UUIDConverter; |
|||
import org.thingsboard.server.dao.exception.DataValidationException; |
|||
|
|||
import java.sql.BatchUpdateException; |
|||
import java.sql.Connection; |
|||
import java.sql.DatabaseMetaData; |
|||
import java.sql.PreparedStatement; |
|||
import java.sql.SQLException; |
|||
import java.util.ArrayList; |
|||
import java.util.Arrays; |
|||
import java.util.Iterator; |
|||
import java.util.List; |
|||
import java.util.Optional; |
|||
import java.util.stream.Collectors; |
|||
|
|||
@Data |
|||
@Slf4j |
|||
public class CassandraToSqlTable { |
|||
|
|||
private static final int DEFAULT_BATCH_SIZE = 10000; |
|||
|
|||
private String cassandraCf; |
|||
private String sqlTableName; |
|||
|
|||
private List<CassandraToSqlColumn> columns; |
|||
|
|||
private int batchSize = DEFAULT_BATCH_SIZE; |
|||
|
|||
private PreparedStatement sqlInsertStatement; |
|||
|
|||
public CassandraToSqlTable(String tableName, CassandraToSqlColumn... columns) { |
|||
this(tableName, tableName, DEFAULT_BATCH_SIZE, columns); |
|||
} |
|||
|
|||
public CassandraToSqlTable(String tableName, String sqlTableName, CassandraToSqlColumn... columns) { |
|||
this(tableName, sqlTableName, DEFAULT_BATCH_SIZE, columns); |
|||
} |
|||
|
|||
public CassandraToSqlTable(String tableName, int batchSize, CassandraToSqlColumn... columns) { |
|||
this(tableName, tableName, batchSize, columns); |
|||
} |
|||
|
|||
public CassandraToSqlTable(String cassandraCf, String sqlTableName, int batchSize, CassandraToSqlColumn... columns) { |
|||
this.cassandraCf = cassandraCf; |
|||
this.sqlTableName = sqlTableName; |
|||
this.batchSize = batchSize; |
|||
this.columns = Arrays.asList(columns); |
|||
for (int i=0;i<columns.length;i++) { |
|||
this.columns.get(i).setIndex(i); |
|||
this.columns.get(i).setSqlIndex(i+1); |
|||
} |
|||
} |
|||
|
|||
public void migrateToSql(Session session, Connection conn) throws SQLException { |
|||
log.info("[{}] Migrating data from cassandra '{}' Column Family to '{}' SQL table...", this.sqlTableName, this.cassandraCf, this.sqlTableName); |
|||
DatabaseMetaData metadata = conn.getMetaData(); |
|||
java.sql.ResultSet resultSet = metadata.getColumns(null, null, this.sqlTableName, null); |
|||
while (resultSet.next()) { |
|||
String name = resultSet.getString("COLUMN_NAME"); |
|||
int sqlType = resultSet.getInt("DATA_TYPE"); |
|||
int size = resultSet.getInt("COLUMN_SIZE"); |
|||
CassandraToSqlColumn column = this.getColumn(name); |
|||
column.setSize(size); |
|||
column.setSqlType(sqlType); |
|||
} |
|||
this.sqlInsertStatement = createSqlInsertStatement(conn); |
|||
Statement cassandraSelectStatement = createCassandraSelectStatement(); |
|||
cassandraSelectStatement.setFetchSize(100); |
|||
ResultSet rs = session.execute(cassandraSelectStatement); |
|||
Iterator<Row> iter = rs.iterator(); |
|||
int rowCounter = 0; |
|||
List<CassandraToSqlColumnData[]> batchData; |
|||
boolean hasNext; |
|||
do { |
|||
batchData = this.extractBatchData(iter); |
|||
hasNext = batchData.size() == this.batchSize; |
|||
this.batchInsert(batchData, conn); |
|||
rowCounter += batchData.size(); |
|||
log.info("[{}] {} records migrated so far...", this.sqlTableName, rowCounter); |
|||
} while (hasNext); |
|||
this.sqlInsertStatement.close(); |
|||
log.info("[{}] {} total records migrated.", this.sqlTableName, rowCounter); |
|||
log.info("[{}] Finished migration data from cassandra '{}' Column Family to '{}' SQL table.", |
|||
this.sqlTableName, this.cassandraCf, this.sqlTableName); |
|||
} |
|||
|
|||
private List<CassandraToSqlColumnData[]> extractBatchData(Iterator<Row> iter) { |
|||
List<CassandraToSqlColumnData[]> batchData = new ArrayList<>(); |
|||
while (iter.hasNext() && batchData.size() < this.batchSize) { |
|||
Row row = iter.next(); |
|||
if (row != null) { |
|||
CassandraToSqlColumnData[] data = this.extractRowData(row); |
|||
batchData.add(data); |
|||
} |
|||
} |
|||
return batchData; |
|||
} |
|||
|
|||
private CassandraToSqlColumnData[] extractRowData(Row row) { |
|||
CassandraToSqlColumnData[] data = new CassandraToSqlColumnData[this.columns.size()]; |
|||
for (CassandraToSqlColumn column: this.columns) { |
|||
String value = column.getColumnValue(row); |
|||
data[column.getIndex()] = new CassandraToSqlColumnData(value); |
|||
} |
|||
return this.validateColumnData(data); |
|||
} |
|||
|
|||
private CassandraToSqlColumnData[] validateColumnData(CassandraToSqlColumnData[] data) { |
|||
for (int i=0;i<data.length;i++) { |
|||
CassandraToSqlColumn column = this.columns.get(i); |
|||
if (column.getType() == CassandraToSqlColumnType.STRING) { |
|||
CassandraToSqlColumnData columnData = data[i]; |
|||
String value = columnData.getValue(); |
|||
if (value != null && value.length() > column.getSize()) { |
|||
log.warn("[{}] Value size [{}] exceeds maximum size [{}] of column [{}] and will be truncated!", |
|||
this.sqlTableName, |
|||
value.length(), column.getSize(), column.getSqlColumnName()); |
|||
log.warn("[{}] Affected data:\n{}", this.sqlTableName, this.dataToString(data)); |
|||
value = value.substring(0, column.getSize()); |
|||
columnData.setOriginalValue(value); |
|||
columnData.setValue(value); |
|||
} |
|||
} |
|||
} |
|||
return data; |
|||
} |
|||
|
|||
private void batchInsert(List<CassandraToSqlColumnData[]> batchData, Connection conn) throws SQLException { |
|||
boolean retry = false; |
|||
for (CassandraToSqlColumnData[] data : batchData) { |
|||
for (CassandraToSqlColumn column: this.columns) { |
|||
column.setColumnValue(this.sqlInsertStatement, data[column.getIndex()].getValue()); |
|||
} |
|||
try { |
|||
this.sqlInsertStatement.executeUpdate(); |
|||
} catch (SQLException e) { |
|||
if (this.handleInsertException(batchData, data, conn, e)) { |
|||
retry = true; |
|||
break; |
|||
} else { |
|||
throw e; |
|||
} |
|||
} |
|||
} |
|||
if (retry) { |
|||
this.batchInsert(batchData, conn); |
|||
} else { |
|||
conn.commit(); |
|||
} |
|||
} |
|||
|
|||
private boolean handleInsertException(List<CassandraToSqlColumnData[]> batchData, |
|||
CassandraToSqlColumnData[] data, |
|||
Connection conn, SQLException ex) throws SQLException { |
|||
conn.commit(); |
|||
String constraint = extractConstraintName(ex).orElse(null); |
|||
if (constraint != null) { |
|||
if (this.onConstraintViolation(batchData, data, constraint)) { |
|||
return true; |
|||
} else { |
|||
log.error("[{}] Unhandled constraint violation [{}] during insert!", this.sqlTableName, constraint); |
|||
log.error("[{}] Affected data:\n{}", this.sqlTableName, this.dataToString(data)); |
|||
} |
|||
} else { |
|||
log.error("[{}] Unhandled exception during insert!", this.sqlTableName); |
|||
log.error("[{}] Affected data:\n{}", this.sqlTableName, this.dataToString(data)); |
|||
} |
|||
return false; |
|||
} |
|||
|
|||
private String dataToString(CassandraToSqlColumnData[] data) { |
|||
StringBuffer stringData = new StringBuffer("{\n"); |
|||
for (int i=0;i<data.length;i++) { |
|||
String columnName = this.columns.get(i).getSqlColumnName(); |
|||
String value = data[i].getLogValue(); |
|||
stringData.append("\"").append(columnName).append("\": ").append("[").append(value).append("]\n"); |
|||
} |
|||
stringData.append("}"); |
|||
return stringData.toString(); |
|||
} |
|||
|
|||
protected boolean onConstraintViolation(List<CassandraToSqlColumnData[]> batchData, |
|||
CassandraToSqlColumnData[] data, String constraint) { |
|||
return false; |
|||
} |
|||
|
|||
protected void handleUniqueNameViolation(CassandraToSqlColumnData[] data, String entityType) { |
|||
CassandraToSqlColumn nameColumn = this.getColumn("name"); |
|||
CassandraToSqlColumn searchTextColumn = this.getColumn("search_text"); |
|||
CassandraToSqlColumnData nameColumnData = data[nameColumn.getIndex()]; |
|||
CassandraToSqlColumnData searchTextColumnData = data[searchTextColumn.getIndex()]; |
|||
String prevName = nameColumnData.getValue(); |
|||
String newName = nameColumnData.getNextConstraintStringValue(nameColumn); |
|||
nameColumnData.setValue(newName); |
|||
searchTextColumnData.setValue(searchTextColumnData.getNextConstraintStringValue(searchTextColumn)); |
|||
String id = UUIDConverter.fromString(this.getColumnData(data, "id").getValue()).toString(); |
|||
log.warn("Found {} with duplicate name [id:[{}]]. Attempting to rename {} from '{}' to '{}'...", entityType, id, entityType, prevName, newName); |
|||
} |
|||
|
|||
protected void handleUniqueEmailViolation(CassandraToSqlColumnData[] data) { |
|||
CassandraToSqlColumn emailColumn = this.getColumn("email"); |
|||
CassandraToSqlColumn searchTextColumn = this.getColumn("search_text"); |
|||
CassandraToSqlColumnData emailColumnData = data[emailColumn.getIndex()]; |
|||
CassandraToSqlColumnData searchTextColumnData = data[searchTextColumn.getIndex()]; |
|||
String prevEmail = emailColumnData.getValue(); |
|||
String newEmail = emailColumnData.getNextConstraintEmailValue(emailColumn); |
|||
emailColumnData.setValue(newEmail); |
|||
searchTextColumnData.setValue(searchTextColumnData.getNextConstraintEmailValue(searchTextColumn)); |
|||
String id = UUIDConverter.fromString(this.getColumnData(data, "id").getValue()).toString(); |
|||
log.warn("Found user with duplicate email [id:[{}]]. Attempting to rename email from '{}' to '{}'...", id, prevEmail, newEmail); |
|||
} |
|||
|
|||
protected void ignoreRecord(List<CassandraToSqlColumnData[]> batchData, CassandraToSqlColumnData[] data) { |
|||
log.warn("[{}] Affected data:\n{}", this.sqlTableName, this.dataToString(data)); |
|||
int index = batchData.indexOf(data); |
|||
if (index > 0) { |
|||
batchData.remove(index); |
|||
} |
|||
} |
|||
|
|||
protected CassandraToSqlColumn getColumn(String sqlColumnName) { |
|||
return this.columns.stream().filter(col -> col.getSqlColumnName().equals(sqlColumnName)).findFirst().get(); |
|||
} |
|||
|
|||
protected CassandraToSqlColumnData getColumnData(CassandraToSqlColumnData[] data, String sqlColumnName) { |
|||
CassandraToSqlColumn column = this.getColumn(sqlColumnName); |
|||
return data[column.getIndex()]; |
|||
} |
|||
|
|||
private Optional<String> extractConstraintName(SQLException ex) { |
|||
final String sqlState = JdbcExceptionHelper.extractSqlState( ex ); |
|||
if (sqlState != null) { |
|||
String sqlStateClassCode = JdbcExceptionHelper.determineSqlStateClassCode( sqlState ); |
|||
if ( sqlStateClassCode != null ) { |
|||
if (Arrays.asList( |
|||
"23", // "integrity constraint violation"
|
|||
"27", // "triggered data change violation"
|
|||
"44" // "with check option violation"
|
|||
).contains(sqlStateClassCode)) { |
|||
if (ex instanceof PSQLException) { |
|||
return Optional.of(((PSQLException)ex).getServerErrorMessage().getConstraint()); |
|||
} |
|||
} |
|||
} |
|||
} |
|||
return Optional.empty(); |
|||
} |
|||
|
|||
private Statement createCassandraSelectStatement() { |
|||
StringBuilder selectStatementBuilder = new StringBuilder(); |
|||
selectStatementBuilder.append("SELECT "); |
|||
for (CassandraToSqlColumn column : columns) { |
|||
selectStatementBuilder.append(column.getCassandraColumnName()).append(","); |
|||
} |
|||
selectStatementBuilder.deleteCharAt(selectStatementBuilder.length() - 1); |
|||
selectStatementBuilder.append(" FROM ").append(cassandraCf); |
|||
return new SimpleStatement(selectStatementBuilder.toString()); |
|||
} |
|||
|
|||
private PreparedStatement createSqlInsertStatement(Connection conn) throws SQLException { |
|||
StringBuilder insertStatementBuilder = new StringBuilder(); |
|||
insertStatementBuilder.append("INSERT INTO ").append(this.sqlTableName).append(" ("); |
|||
for (CassandraToSqlColumn column : columns) { |
|||
insertStatementBuilder.append(column.getSqlColumnName()).append(","); |
|||
} |
|||
insertStatementBuilder.deleteCharAt(insertStatementBuilder.length() - 1); |
|||
insertStatementBuilder.append(") VALUES ("); |
|||
for (CassandraToSqlColumn column : columns) { |
|||
if (column.getType() == CassandraToSqlColumnType.JSON) { |
|||
insertStatementBuilder.append("cast(? AS json)"); |
|||
} else { |
|||
insertStatementBuilder.append("?"); |
|||
} |
|||
insertStatementBuilder.append(","); |
|||
} |
|||
insertStatementBuilder.deleteCharAt(insertStatementBuilder.length() - 1); |
|||
insertStatementBuilder.append(")"); |
|||
return conn.prepareStatement(insertStatementBuilder.toString()); |
|||
} |
|||
|
|||
} |
|||
@ -0,0 +1,22 @@ |
|||
/** |
|||
* Copyright © 2016-2020 The Thingsboard Authors |
|||
* |
|||
* Licensed under the Apache License, Version 2.0 (the "License"); |
|||
* you may not use this file except in compliance with the License. |
|||
* You may obtain a copy of the License at |
|||
* |
|||
* http://www.apache.org/licenses/LICENSE-2.0
|
|||
* |
|||
* Unless required by applicable law or agreed to in writing, software |
|||
* distributed under the License is distributed on an "AS IS" BASIS, |
|||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|||
* See the License for the specific language governing permissions and |
|||
* limitations under the License. |
|||
*/ |
|||
package org.thingsboard.server.service.install.migrate; |
|||
|
|||
public interface EntitiesMigrateService { |
|||
|
|||
void migrate() throws Exception; |
|||
|
|||
} |
|||
@ -1,6 +1,6 @@ |
|||
#!/bin/sh |
|||
|
|||
getent group ${pkg.name} >/dev/null || groupadd -r ${pkg.name} |
|||
getent passwd ${pkg.name} >/dev/null || \ |
|||
useradd -d ${pkg.installFolder} -g ${pkg.name} -M -r ${pkg.name} -s /sbin/nologin \ |
|||
getent group ${pkg.user} >/dev/null || groupadd -r ${pkg.user} |
|||
getent passwd ${pkg.user} >/dev/null || \ |
|||
useradd -d ${pkg.installFolder} -g ${pkg.user} -M -r ${pkg.user} -s /sbin/nologin \ |
|||
-c "Thingsboard application" |
|||
|
|||
@ -0,0 +1,51 @@ |
|||
/** |
|||
* Copyright © 2016-2020 The Thingsboard Authors |
|||
* |
|||
* Licensed under the Apache License, Version 2.0 (the "License"); |
|||
* you may not use this file except in compliance with the License. |
|||
* You may obtain a copy of the License at |
|||
* |
|||
* http://www.apache.org/licenses/LICENSE-2.0
|
|||
* |
|||
* Unless required by applicable law or agreed to in writing, software |
|||
* distributed under the License is distributed on an "AS IS" BASIS, |
|||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|||
* See the License for the specific language governing permissions and |
|||
* limitations under the License. |
|||
*/ |
|||
package org.thingsboard.server.dao.sql.relation; |
|||
|
|||
import lombok.extern.slf4j.Slf4j; |
|||
import org.springframework.data.jpa.repository.Modifying; |
|||
import org.thingsboard.server.dao.model.sql.RelationEntity; |
|||
|
|||
import javax.persistence.EntityManager; |
|||
import javax.persistence.PersistenceContext; |
|||
import javax.persistence.Query; |
|||
|
|||
@Slf4j |
|||
public abstract class AbstractRelationInsertRepository implements RelationInsertRepository { |
|||
|
|||
@PersistenceContext |
|||
protected EntityManager entityManager; |
|||
|
|||
protected Query getQuery(RelationEntity entity, String query) { |
|||
Query nativeQuery = entityManager.createNativeQuery(query, RelationEntity.class); |
|||
if (entity.getAdditionalInfo() == null) { |
|||
nativeQuery.setParameter("additionalInfo", null); |
|||
} else { |
|||
nativeQuery.setParameter("additionalInfo", entity.getAdditionalInfo().toString()); |
|||
} |
|||
return nativeQuery |
|||
.setParameter("fromId", entity.getFromId()) |
|||
.setParameter("fromType", entity.getFromType()) |
|||
.setParameter("toId", entity.getToId()) |
|||
.setParameter("toType", entity.getToType()) |
|||
.setParameter("relationTypeGroup", entity.getRelationTypeGroup()) |
|||
.setParameter("relationType", entity.getRelationType()); |
|||
} |
|||
|
|||
@Modifying |
|||
protected abstract RelationEntity processSaveOrUpdate(RelationEntity entity); |
|||
|
|||
} |
|||
@ -0,0 +1,47 @@ |
|||
/** |
|||
* Copyright © 2016-2020 The Thingsboard Authors |
|||
* |
|||
* Licensed under the Apache License, Version 2.0 (the "License"); |
|||
* you may not use this file except in compliance with the License. |
|||
* You may obtain a copy of the License at |
|||
* |
|||
* http://www.apache.org/licenses/LICENSE-2.0
|
|||
* |
|||
* Unless required by applicable law or agreed to in writing, software |
|||
* distributed under the License is distributed on an "AS IS" BASIS, |
|||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|||
* See the License for the specific language governing permissions and |
|||
* limitations under the License. |
|||
*/ |
|||
package org.thingsboard.server.dao.sql.relation; |
|||
|
|||
import org.springframework.stereotype.Repository; |
|||
import org.springframework.transaction.annotation.Transactional; |
|||
import org.thingsboard.server.dao.model.sql.RelationCompositeKey; |
|||
import org.thingsboard.server.dao.model.sql.RelationEntity; |
|||
import org.thingsboard.server.dao.util.HsqlDao; |
|||
import org.thingsboard.server.dao.util.SqlDao; |
|||
|
|||
@HsqlDao |
|||
@SqlDao |
|||
@Repository |
|||
@Transactional |
|||
public class HsqlRelationInsertRepository extends AbstractRelationInsertRepository implements RelationInsertRepository { |
|||
|
|||
private static final String INSERT_ON_CONFLICT_DO_UPDATE = "MERGE INTO relation USING (VALUES :fromId, :fromType, :toId, :toType, :relationTypeGroup, :relationType, :additionalInfo) R " + |
|||
"(from_id, from_type, to_id, to_type, relation_type_group, relation_type, additional_info) " + |
|||
"ON (relation.from_id = R.from_id AND relation.from_type = R.from_type AND relation.relation_type_group = R.relation_type_group AND relation.relation_type = R.relation_type AND relation.to_id = R.to_id AND relation.to_type = R.to_type) " + |
|||
"WHEN MATCHED THEN UPDATE SET relation.additional_info = R.additional_info " + |
|||
"WHEN NOT MATCHED THEN INSERT (from_id, from_type, to_id, to_type, relation_type_group, relation_type, additional_info) VALUES (R.from_id, R.from_type, R.to_id, R.to_type, R.relation_type_group, R.relation_type, R.additional_info)"; |
|||
|
|||
@Override |
|||
public RelationEntity saveOrUpdate(RelationEntity entity) { |
|||
return processSaveOrUpdate(entity); |
|||
} |
|||
|
|||
@Override |
|||
protected RelationEntity processSaveOrUpdate(RelationEntity entity) { |
|||
getQuery(entity, INSERT_ON_CONFLICT_DO_UPDATE).executeUpdate(); |
|||
return entityManager.find(RelationEntity.class, new RelationCompositeKey(entity.toData())); |
|||
} |
|||
} |
|||
@ -0,0 +1,43 @@ |
|||
/** |
|||
* Copyright © 2016-2020 The Thingsboard Authors |
|||
* |
|||
* Licensed under the Apache License, Version 2.0 (the "License"); |
|||
* you may not use this file except in compliance with the License. |
|||
* You may obtain a copy of the License at |
|||
* |
|||
* http://www.apache.org/licenses/LICENSE-2.0
|
|||
* |
|||
* Unless required by applicable law or agreed to in writing, software |
|||
* distributed under the License is distributed on an "AS IS" BASIS, |
|||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|||
* See the License for the specific language governing permissions and |
|||
* limitations under the License. |
|||
*/ |
|||
package org.thingsboard.server.dao.sql.relation; |
|||
|
|||
import org.springframework.stereotype.Repository; |
|||
import org.springframework.transaction.annotation.Transactional; |
|||
import org.thingsboard.server.dao.model.sql.RelationEntity; |
|||
import org.thingsboard.server.dao.util.PsqlDao; |
|||
import org.thingsboard.server.dao.util.SqlDao; |
|||
|
|||
@PsqlDao |
|||
@SqlDao |
|||
@Repository |
|||
@Transactional |
|||
public class PsqlRelationInsertRepository extends AbstractRelationInsertRepository implements RelationInsertRepository { |
|||
|
|||
private static final String INSERT_ON_CONFLICT_DO_UPDATE = "INSERT INTO relation (from_id, from_type, to_id, to_type, relation_type_group, relation_type, additional_info)" + |
|||
" VALUES (:fromId, :fromType, :toId, :toType, :relationTypeGroup, :relationType, :additionalInfo) " + |
|||
"ON CONFLICT (from_id, from_type, relation_type_group, relation_type, to_id, to_type) DO UPDATE SET additional_info = :additionalInfo returning *"; |
|||
|
|||
@Override |
|||
public RelationEntity saveOrUpdate(RelationEntity entity) { |
|||
return processSaveOrUpdate(entity); |
|||
} |
|||
|
|||
@Override |
|||
protected RelationEntity processSaveOrUpdate(RelationEntity entity) { |
|||
return (RelationEntity) getQuery(entity, INSERT_ON_CONFLICT_DO_UPDATE).getSingleResult(); |
|||
} |
|||
} |
|||
@ -0,0 +1,24 @@ |
|||
/** |
|||
* Copyright © 2016-2020 The Thingsboard Authors |
|||
* |
|||
* Licensed under the Apache License, Version 2.0 (the "License"); |
|||
* you may not use this file except in compliance with the License. |
|||
* You may obtain a copy of the License at |
|||
* |
|||
* http://www.apache.org/licenses/LICENSE-2.0
|
|||
* |
|||
* Unless required by applicable law or agreed to in writing, software |
|||
* distributed under the License is distributed on an "AS IS" BASIS, |
|||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|||
* See the License for the specific language governing permissions and |
|||
* limitations under the License. |
|||
*/ |
|||
package org.thingsboard.server.dao.sql.relation; |
|||
|
|||
import org.thingsboard.server.dao.model.sql.RelationEntity; |
|||
|
|||
public interface RelationInsertRepository { |
|||
|
|||
RelationEntity saveOrUpdate(RelationEntity entity); |
|||
|
|||
} |
|||
@ -1,17 +0,0 @@ |
|||
-- |
|||
-- Copyright © 2016-2020 The Thingsboard Authors |
|||
-- |
|||
-- Licensed under the Apache License, Version 2.0 (the "License"); |
|||
-- you may not use this file except in compliance with the License. |
|||
-- You may obtain a copy of the License at |
|||
-- |
|||
-- http://www.apache.org/licenses/LICENSE-2.0 |
|||
-- |
|||
-- Unless required by applicable law or agreed to in writing, software |
|||
-- distributed under the License is distributed on an "AS IS" BASIS, |
|||
-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|||
-- See the License for the specific language governing permissions and |
|||
-- limitations under the License. |
|||
-- |
|||
|
|||
CREATE INDEX IF NOT EXISTS idx_tenant_ts_kv ON tenant_ts_kv(tenant_id, entity_id, key, ts); |
|||
@ -0,0 +1,24 @@ |
|||
#!/bin/bash |
|||
# |
|||
# Copyright © 2016-2020 The Thingsboard Authors |
|||
# |
|||
# Licensed under the Apache License, Version 2.0 (the "License"); |
|||
# you may not use this file except in compliance with the License. |
|||
# You may obtain a copy of the License at |
|||
# |
|||
# http://www.apache.org/licenses/LICENSE-2.0 |
|||
# |
|||
# Unless required by applicable law or agreed to in writing, software |
|||
# distributed under the License is distributed on an "AS IS" BASIS, |
|||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|||
# See the License for the specific language governing permissions and |
|||
# limitations under the License. |
|||
# |
|||
|
|||
mkdir -p tb-node/log/ && sudo chown -R 799:799 tb-node/log/ |
|||
|
|||
mkdir -p tb-transports/coap/log && sudo chown -R 799:799 tb-transports/coap/log |
|||
|
|||
mkdir -p tb-transports/http/log && sudo chown -R 799:799 tb-transports/http/log |
|||
|
|||
mkdir -p tb-transports/mqtt/log && sudo chown -R 799:799 tb-transports/mqtt/log |
|||
Some files were not shown because too many files changed in this diff
Loading…
Reference in new issue