committed by
GitHub
1285 changed files with 14181 additions and 157708 deletions
File diff suppressed because one or more lines are too long
File diff suppressed because one or more lines are too long
File diff suppressed because one or more lines are too long
File diff suppressed because one or more lines are too long
File diff suppressed because one or more lines are too long
File diff suppressed because one or more lines are too long
File diff suppressed because one or more lines are too long
File diff suppressed because one or more lines are too long
@ -0,0 +1,314 @@ |
|||
/** |
|||
* Copyright © 2016-2020 The Thingsboard Authors |
|||
* |
|||
* Licensed under the Apache License, Version 2.0 (the "License"); |
|||
* you may not use this file except in compliance with the License. |
|||
* You may obtain a copy of the License at |
|||
* |
|||
* http://www.apache.org/licenses/LICENSE-2.0
|
|||
* |
|||
* Unless required by applicable law or agreed to in writing, software |
|||
* distributed under the License is distributed on an "AS IS" BASIS, |
|||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|||
* See the License for the specific language governing permissions and |
|||
* limitations under the License. |
|||
*/ |
|||
package org.thingsboard.server.service.install; |
|||
|
|||
import com.datastax.driver.core.KeyspaceMetadata; |
|||
import com.datastax.driver.core.exceptions.InvalidQueryException; |
|||
import lombok.extern.slf4j.Slf4j; |
|||
import org.springframework.beans.factory.annotation.Autowired; |
|||
import org.springframework.context.annotation.Profile; |
|||
import org.springframework.stereotype.Service; |
|||
import org.thingsboard.server.dao.dashboard.DashboardService; |
|||
import org.thingsboard.server.dao.util.NoSqlDao; |
|||
import org.thingsboard.server.service.install.cql.CassandraDbHelper; |
|||
|
|||
import java.nio.file.Files; |
|||
import java.nio.file.Path; |
|||
import java.nio.file.Paths; |
|||
|
|||
import static org.thingsboard.server.service.install.DatabaseHelper.ADDITIONAL_INFO; |
|||
import static org.thingsboard.server.service.install.DatabaseHelper.ASSET; |
|||
import static org.thingsboard.server.service.install.DatabaseHelper.ASSIGNED_CUSTOMERS; |
|||
import static org.thingsboard.server.service.install.DatabaseHelper.CONFIGURATION; |
|||
import static org.thingsboard.server.service.install.DatabaseHelper.CUSTOMER_ID; |
|||
import static org.thingsboard.server.service.install.DatabaseHelper.DASHBOARD; |
|||
import static org.thingsboard.server.service.install.DatabaseHelper.DEVICE; |
|||
import static org.thingsboard.server.service.install.DatabaseHelper.END_TS; |
|||
import static org.thingsboard.server.service.install.DatabaseHelper.ENTITY_ID; |
|||
import static org.thingsboard.server.service.install.DatabaseHelper.ENTITY_TYPE; |
|||
import static org.thingsboard.server.service.install.DatabaseHelper.ENTITY_VIEW; |
|||
import static org.thingsboard.server.service.install.DatabaseHelper.ENTITY_VIEWS; |
|||
import static org.thingsboard.server.service.install.DatabaseHelper.ID; |
|||
import static org.thingsboard.server.service.install.DatabaseHelper.KEYS; |
|||
import static org.thingsboard.server.service.install.DatabaseHelper.NAME; |
|||
import static org.thingsboard.server.service.install.DatabaseHelper.SEARCH_TEXT; |
|||
import static org.thingsboard.server.service.install.DatabaseHelper.START_TS; |
|||
import static org.thingsboard.server.service.install.DatabaseHelper.TENANT_ID; |
|||
import static org.thingsboard.server.service.install.DatabaseHelper.TITLE; |
|||
import static org.thingsboard.server.service.install.DatabaseHelper.TYPE; |
|||
|
|||
@Service |
|||
@NoSqlDao |
|||
@Profile("install") |
|||
@Slf4j |
|||
public class CassandraDatabaseUpgradeService extends AbstractCassandraDatabaseUpgradeService implements DatabaseEntitiesUpgradeService { |
|||
|
|||
private static final String SCHEMA_UPDATE_CQL = "schema_update.cql"; |
|||
|
|||
@Autowired |
|||
private DashboardService dashboardService; |
|||
|
|||
@Autowired |
|||
private InstallScripts installScripts; |
|||
|
|||
@Override |
|||
public void upgradeDatabase(String fromVersion) throws Exception { |
|||
|
|||
switch (fromVersion) { |
|||
case "1.2.3": |
|||
|
|||
log.info("Upgrading Cassandara DataBase from version {} to 1.3.0 ...", fromVersion); |
|||
|
|||
//Dump devices, assets and relations
|
|||
|
|||
cluster.getSession(); |
|||
|
|||
KeyspaceMetadata ks = cluster.getCluster().getMetadata().getKeyspace(cluster.getKeyspaceName()); |
|||
|
|||
log.info("Dumping devices ..."); |
|||
Path devicesDump = CassandraDbHelper.dumpCfIfExists(ks, cluster.getSession(), DEVICE, |
|||
new String[]{"id", TENANT_ID, CUSTOMER_ID, "name", SEARCH_TEXT, ADDITIONAL_INFO, "type"}, |
|||
new String[]{"", "", "", "", "", "", "default"}, |
|||
"tb-devices"); |
|||
log.info("Devices dumped."); |
|||
|
|||
log.info("Dumping assets ..."); |
|||
Path assetsDump = CassandraDbHelper.dumpCfIfExists(ks, cluster.getSession(), ASSET, |
|||
new String[]{"id", TENANT_ID, CUSTOMER_ID, "name", SEARCH_TEXT, ADDITIONAL_INFO, "type"}, |
|||
new String[]{"", "", "", "", "", "", "default"}, |
|||
"tb-assets"); |
|||
log.info("Assets dumped."); |
|||
|
|||
log.info("Dumping relations ..."); |
|||
Path relationsDump = CassandraDbHelper.dumpCfIfExists(ks, cluster.getSession(), "relation", |
|||
new String[]{"from_id", "from_type", "to_id", "to_type", "relation_type", ADDITIONAL_INFO, "relation_type_group"}, |
|||
new String[]{"", "", "", "", "", "", "COMMON"}, |
|||
"tb-relations"); |
|||
log.info("Relations dumped."); |
|||
|
|||
log.info("Updating schema ..."); |
|||
Path schemaUpdateFile = Paths.get(installScripts.getDataDir(), "upgrade", "1.3.0", SCHEMA_UPDATE_CQL); |
|||
loadCql(schemaUpdateFile); |
|||
log.info("Schema updated."); |
|||
|
|||
//Restore devices, assets and relations
|
|||
|
|||
log.info("Restoring devices ..."); |
|||
if (devicesDump != null) { |
|||
CassandraDbHelper.loadCf(ks, cluster.getSession(), DEVICE, |
|||
new String[]{"id", TENANT_ID, CUSTOMER_ID, "name", SEARCH_TEXT, ADDITIONAL_INFO, "type"}, devicesDump); |
|||
Files.deleteIfExists(devicesDump); |
|||
} |
|||
log.info("Devices restored."); |
|||
|
|||
log.info("Dumping device types ..."); |
|||
Path deviceTypesDump = CassandraDbHelper.dumpCfIfExists(ks, cluster.getSession(), DEVICE, |
|||
new String[]{TENANT_ID, "type"}, |
|||
new String[]{"", ""}, |
|||
"tb-device-types"); |
|||
if (deviceTypesDump != null) { |
|||
CassandraDbHelper.appendToEndOfLine(deviceTypesDump, "DEVICE"); |
|||
} |
|||
log.info("Device types dumped."); |
|||
log.info("Loading device types ..."); |
|||
if (deviceTypesDump != null) { |
|||
CassandraDbHelper.loadCf(ks, cluster.getSession(), "entity_subtype", |
|||
new String[]{TENANT_ID, "type", "entity_type"}, deviceTypesDump); |
|||
Files.deleteIfExists(deviceTypesDump); |
|||
} |
|||
log.info("Device types loaded."); |
|||
|
|||
log.info("Restoring assets ..."); |
|||
if (assetsDump != null) { |
|||
CassandraDbHelper.loadCf(ks, cluster.getSession(), ASSET, |
|||
new String[]{"id", TENANT_ID, CUSTOMER_ID, "name", SEARCH_TEXT, ADDITIONAL_INFO, "type"}, assetsDump); |
|||
Files.deleteIfExists(assetsDump); |
|||
} |
|||
log.info("Assets restored."); |
|||
|
|||
log.info("Dumping asset types ..."); |
|||
Path assetTypesDump = CassandraDbHelper.dumpCfIfExists(ks, cluster.getSession(), ASSET, |
|||
new String[]{TENANT_ID, "type"}, |
|||
new String[]{"", ""}, |
|||
"tb-asset-types"); |
|||
if (assetTypesDump != null) { |
|||
CassandraDbHelper.appendToEndOfLine(assetTypesDump, "ASSET"); |
|||
} |
|||
log.info("Asset types dumped."); |
|||
log.info("Loading asset types ..."); |
|||
if (assetTypesDump != null) { |
|||
CassandraDbHelper.loadCf(ks, cluster.getSession(), "entity_subtype", |
|||
new String[]{TENANT_ID, "type", "entity_type"}, assetTypesDump); |
|||
Files.deleteIfExists(assetTypesDump); |
|||
} |
|||
log.info("Asset types loaded."); |
|||
|
|||
log.info("Restoring relations ..."); |
|||
if (relationsDump != null) { |
|||
CassandraDbHelper.loadCf(ks, cluster.getSession(), "relation", |
|||
new String[]{"from_id", "from_type", "to_id", "to_type", "relation_type", ADDITIONAL_INFO, "relation_type_group"}, relationsDump); |
|||
Files.deleteIfExists(relationsDump); |
|||
} |
|||
log.info("Relations restored."); |
|||
|
|||
break; |
|||
case "1.3.0": |
|||
break; |
|||
case "1.3.1": |
|||
|
|||
cluster.getSession(); |
|||
|
|||
ks = cluster.getCluster().getMetadata().getKeyspace(cluster.getKeyspaceName()); |
|||
|
|||
log.info("Dumping dashboards ..."); |
|||
Path dashboardsDump = CassandraDbHelper.dumpCfIfExists(ks, cluster.getSession(), DASHBOARD, |
|||
new String[]{ID, TENANT_ID, CUSTOMER_ID, TITLE, SEARCH_TEXT, ASSIGNED_CUSTOMERS, CONFIGURATION}, |
|||
new String[]{"", "", "", "", "", "", ""}, |
|||
"tb-dashboards", true); |
|||
log.info("Dashboards dumped."); |
|||
|
|||
|
|||
log.info("Updating schema ..."); |
|||
schemaUpdateFile = Paths.get(installScripts.getDataDir(), "upgrade", "1.4.0", SCHEMA_UPDATE_CQL); |
|||
loadCql(schemaUpdateFile); |
|||
log.info("Schema updated."); |
|||
|
|||
log.info("Restoring dashboards ..."); |
|||
if (dashboardsDump != null) { |
|||
CassandraDbHelper.loadCf(ks, cluster.getSession(), DASHBOARD, |
|||
new String[]{ID, TENANT_ID, TITLE, SEARCH_TEXT, CONFIGURATION}, dashboardsDump, true); |
|||
DatabaseHelper.upgradeTo40_assignDashboards(dashboardsDump, dashboardService, false); |
|||
Files.deleteIfExists(dashboardsDump); |
|||
} |
|||
log.info("Dashboards restored."); |
|||
break; |
|||
case "1.4.0": |
|||
|
|||
log.info("Updating schema ..."); |
|||
schemaUpdateFile = Paths.get(installScripts.getDataDir(), "upgrade", "2.0.0", SCHEMA_UPDATE_CQL); |
|||
loadCql(schemaUpdateFile); |
|||
log.info("Schema updated."); |
|||
|
|||
break; |
|||
|
|||
case "2.0.0": |
|||
|
|||
log.info("Updating schema ..."); |
|||
schemaUpdateFile = Paths.get(installScripts.getDataDir(), "upgrade", "2.1.1", SCHEMA_UPDATE_CQL); |
|||
loadCql(schemaUpdateFile); |
|||
log.info("Schema updated."); |
|||
|
|||
break; |
|||
|
|||
case "2.1.1": |
|||
|
|||
log.info("Upgrading Cassandra DataBase from version {} to 2.1.2 ...", fromVersion); |
|||
|
|||
cluster.getSession(); |
|||
|
|||
ks = cluster.getCluster().getMetadata().getKeyspace(cluster.getKeyspaceName()); |
|||
|
|||
log.info("Dumping entity views ..."); |
|||
Path entityViewsDump = CassandraDbHelper.dumpCfIfExists(ks, cluster.getSession(), ENTITY_VIEWS, |
|||
new String[]{ID, ENTITY_ID, ENTITY_TYPE, TENANT_ID, CUSTOMER_ID, NAME, TYPE, KEYS, START_TS, END_TS, SEARCH_TEXT, ADDITIONAL_INFO}, |
|||
new String[]{"", "", "", "", "", "", "default", "", "0", "0", "", ""}, |
|||
"tb-entity-views"); |
|||
log.info("Entity views dumped."); |
|||
|
|||
log.info("Updating schema ..."); |
|||
schemaUpdateFile = Paths.get(installScripts.getDataDir(), "upgrade", "2.1.2", SCHEMA_UPDATE_CQL); |
|||
loadCql(schemaUpdateFile); |
|||
log.info("Schema updated."); |
|||
|
|||
log.info("Restoring entity views ..."); |
|||
if (entityViewsDump != null) { |
|||
CassandraDbHelper.loadCf(ks, cluster.getSession(), ENTITY_VIEW, |
|||
new String[]{ID, ENTITY_ID, ENTITY_TYPE, TENANT_ID, CUSTOMER_ID, NAME, TYPE, KEYS, START_TS, END_TS, SEARCH_TEXT, ADDITIONAL_INFO}, entityViewsDump); |
|||
Files.deleteIfExists(entityViewsDump); |
|||
} |
|||
log.info("Entity views restored."); |
|||
|
|||
break; |
|||
case "2.1.3": |
|||
break; |
|||
case "2.3.0": |
|||
break; |
|||
case "2.3.1": |
|||
log.info("Updating schema ..."); |
|||
String updateDeviceTableStmt = "alter table device add label text"; |
|||
try { |
|||
cluster.getSession().execute(updateDeviceTableStmt); |
|||
Thread.sleep(2500); |
|||
} catch (InvalidQueryException e) { |
|||
} |
|||
log.info("Schema updated."); |
|||
break; |
|||
case "2.4.1": |
|||
log.info("Updating schema ..."); |
|||
String updateAssetTableStmt = "alter table asset add label text"; |
|||
try { |
|||
log.info("Updating assets ..."); |
|||
cluster.getSession().execute(updateAssetTableStmt); |
|||
Thread.sleep(2500); |
|||
log.info("Assets updated."); |
|||
} catch (InvalidQueryException e) { |
|||
} |
|||
log.info("Schema updated."); |
|||
break; |
|||
case "2.4.2": |
|||
log.info("Updating schema ..."); |
|||
String updateAlarmTableStmt = "alter table alarm add propagate_relation_types text"; |
|||
try { |
|||
log.info("Updating alarms ..."); |
|||
cluster.getSession().execute(updateAlarmTableStmt); |
|||
Thread.sleep(2500); |
|||
log.info("Alarms updated."); |
|||
} catch (InvalidQueryException e) { |
|||
} |
|||
log.info("Schema updated."); |
|||
break; |
|||
case "2.4.3": |
|||
log.info("Updating schema ..."); |
|||
String updateAttributeKvTableStmt = "alter table attributes_kv_cf add json_v text"; |
|||
try { |
|||
log.info("Updating attributes ..."); |
|||
cluster.getSession().execute(updateAttributeKvTableStmt); |
|||
Thread.sleep(2500); |
|||
log.info("Attributes updated."); |
|||
} catch (InvalidQueryException e) { |
|||
} |
|||
|
|||
String updateTenantCoreTableStmt = "alter table tenant add isolated_tb_core boolean"; |
|||
String updateTenantRuleEngineTableStmt = "alter table tenant add isolated_tb_rule_engine boolean"; |
|||
|
|||
try { |
|||
log.info("Updating tenant..."); |
|||
cluster.getSession().execute(updateTenantCoreTableStmt); |
|||
Thread.sleep(2500); |
|||
|
|||
cluster.getSession().execute(updateTenantRuleEngineTableStmt); |
|||
Thread.sleep(2500); |
|||
log.info("Tenant updated."); |
|||
} catch (InvalidQueryException e) { |
|||
} |
|||
log.info("Schema updated."); |
|||
break; |
|||
default: |
|||
throw new RuntimeException("Unable to upgrade Cassandra database, unsupported fromVersion: " + fromVersion); |
|||
} |
|||
} |
|||
|
|||
} |
|||
@ -1,329 +0,0 @@ |
|||
/** |
|||
* Copyright © 2016-2020 The Thingsboard Authors |
|||
* |
|||
* Licensed under the Apache License, Version 2.0 (the "License"); |
|||
* you may not use this file except in compliance with the License. |
|||
* You may obtain a copy of the License at |
|||
* |
|||
* http://www.apache.org/licenses/LICENSE-2.0
|
|||
* |
|||
* Unless required by applicable law or agreed to in writing, software |
|||
* distributed under the License is distributed on an "AS IS" BASIS, |
|||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|||
* See the License for the specific language governing permissions and |
|||
* limitations under the License. |
|||
*/ |
|||
package org.thingsboard.server.service.install.migrate; |
|||
|
|||
import lombok.extern.slf4j.Slf4j; |
|||
import org.springframework.beans.factory.annotation.Autowired; |
|||
import org.springframework.beans.factory.annotation.Value; |
|||
import org.springframework.context.annotation.Profile; |
|||
import org.springframework.stereotype.Service; |
|||
import org.thingsboard.server.common.data.EntityType; |
|||
import org.thingsboard.server.common.data.UUIDConverter; |
|||
import org.thingsboard.server.dao.cassandra.CassandraCluster; |
|||
import org.thingsboard.server.dao.util.NoSqlAnyDao; |
|||
import org.thingsboard.server.dao.util.SqlDao; |
|||
import org.thingsboard.server.service.install.EntityDatabaseSchemaService; |
|||
|
|||
import java.sql.Connection; |
|||
import java.sql.DriverManager; |
|||
import java.util.Arrays; |
|||
import java.util.List; |
|||
|
|||
import static org.thingsboard.server.service.install.migrate.CassandraToSqlColumn.bigintColumn; |
|||
import static org.thingsboard.server.service.install.migrate.CassandraToSqlColumn.booleanColumn; |
|||
import static org.thingsboard.server.service.install.migrate.CassandraToSqlColumn.doubleColumn; |
|||
import static org.thingsboard.server.service.install.migrate.CassandraToSqlColumn.enumToIntColumn; |
|||
import static org.thingsboard.server.service.install.migrate.CassandraToSqlColumn.idColumn; |
|||
import static org.thingsboard.server.service.install.migrate.CassandraToSqlColumn.jsonColumn; |
|||
import static org.thingsboard.server.service.install.migrate.CassandraToSqlColumn.stringColumn; |
|||
|
|||
@Service |
|||
@Profile("install") |
|||
@SqlDao |
|||
@NoSqlAnyDao |
|||
@Slf4j |
|||
public class CassandraEntitiesToSqlMigrateService implements EntitiesMigrateService { |
|||
|
|||
@Autowired |
|||
private EntityDatabaseSchemaService entityDatabaseSchemaService; |
|||
|
|||
@Autowired |
|||
protected CassandraCluster cluster; |
|||
|
|||
@Value("${spring.datasource.url}") |
|||
protected String dbUrl; |
|||
|
|||
@Value("${spring.datasource.username}") |
|||
protected String dbUserName; |
|||
|
|||
@Value("${spring.datasource.password}") |
|||
protected String dbPassword; |
|||
|
|||
@Override |
|||
public void migrate() throws Exception { |
|||
log.info("Performing migration of entities data from cassandra to SQL database ..."); |
|||
entityDatabaseSchemaService.createDatabaseSchema(false); |
|||
try (Connection conn = DriverManager.getConnection(dbUrl, dbUserName, dbPassword)) { |
|||
conn.setAutoCommit(false); |
|||
for (CassandraToSqlTable table: tables) { |
|||
table.migrateToSql(cluster.getSession(), conn); |
|||
} |
|||
} catch (Exception e) { |
|||
log.error("Unexpected error during ThingsBoard entities data migration!", e); |
|||
throw e; |
|||
} |
|||
entityDatabaseSchemaService.createDatabaseIndexes(); |
|||
} |
|||
|
|||
private static List<CassandraToSqlTable> tables = Arrays.asList( |
|||
new CassandraToSqlTable("admin_settings", |
|||
idColumn("id"), |
|||
stringColumn("key"), |
|||
stringColumn("json_value")), |
|||
new CassandraToSqlTable("alarm", |
|||
idColumn("id"), |
|||
idColumn("tenant_id"), |
|||
stringColumn("type"), |
|||
idColumn("originator_id"), |
|||
enumToIntColumn("originator_type", EntityType.class), |
|||
stringColumn("severity"), |
|||
stringColumn("status"), |
|||
bigintColumn("start_ts"), |
|||
bigintColumn("end_ts"), |
|||
bigintColumn("ack_ts"), |
|||
bigintColumn("clear_ts"), |
|||
stringColumn("details", "additional_info"), |
|||
booleanColumn("propagate"), |
|||
stringColumn("propagate_relation_types")), |
|||
new CassandraToSqlTable("asset", |
|||
idColumn("id"), |
|||
idColumn("tenant_id"), |
|||
idColumn("customer_id"), |
|||
stringColumn("name"), |
|||
stringColumn("type"), |
|||
stringColumn("label"), |
|||
stringColumn("search_text"), |
|||
stringColumn("additional_info")) { |
|||
@Override |
|||
protected boolean onConstraintViolation(List<CassandraToSqlColumnData[]> batchData, |
|||
CassandraToSqlColumnData[] data, String constraint) { |
|||
if (constraint.equalsIgnoreCase("asset_name_unq_key")) { |
|||
this.handleUniqueNameViolation(data, "asset"); |
|||
return true; |
|||
} |
|||
return super.onConstraintViolation(batchData, data, constraint); |
|||
} |
|||
}, |
|||
new CassandraToSqlTable("audit_log_by_tenant_id", "audit_log", |
|||
idColumn("id"), |
|||
idColumn("tenant_id"), |
|||
idColumn("customer_id"), |
|||
idColumn("entity_id"), |
|||
stringColumn("entity_type"), |
|||
stringColumn("entity_name"), |
|||
idColumn("user_id"), |
|||
stringColumn("user_name"), |
|||
stringColumn("action_type"), |
|||
stringColumn("action_data"), |
|||
stringColumn("action_status"), |
|||
stringColumn("action_failure_details")), |
|||
new CassandraToSqlTable("attributes_kv_cf", "attribute_kv", |
|||
idColumn("entity_id"), |
|||
stringColumn("entity_type"), |
|||
stringColumn("attribute_type"), |
|||
stringColumn("attribute_key"), |
|||
booleanColumn("bool_v"), |
|||
stringColumn("str_v"), |
|||
bigintColumn("long_v"), |
|||
doubleColumn("dbl_v"), |
|||
jsonColumn("json_v"), |
|||
bigintColumn("last_update_ts")), |
|||
new CassandraToSqlTable("component_descriptor", |
|||
idColumn("id"), |
|||
stringColumn("type"), |
|||
stringColumn("scope"), |
|||
stringColumn("name"), |
|||
stringColumn("search_text"), |
|||
stringColumn("clazz"), |
|||
stringColumn("configuration_descriptor"), |
|||
stringColumn("actions")) { |
|||
@Override |
|||
protected boolean onConstraintViolation(List<CassandraToSqlColumnData[]> batchData, |
|||
CassandraToSqlColumnData[] data, String constraint) { |
|||
if (constraint.equalsIgnoreCase("component_descriptor_clazz_key")) { |
|||
String clazz = this.getColumnData(data, "clazz").getValue(); |
|||
log.warn("Found component_descriptor record with duplicate clazz [{}]. Record will be ignored!", clazz); |
|||
this.ignoreRecord(batchData, data); |
|||
return true; |
|||
} |
|||
return super.onConstraintViolation(batchData, data, constraint); |
|||
} |
|||
}, |
|||
new CassandraToSqlTable("customer", |
|||
idColumn("id"), |
|||
idColumn("tenant_id"), |
|||
stringColumn("title"), |
|||
stringColumn("search_text"), |
|||
stringColumn("country"), |
|||
stringColumn("state"), |
|||
stringColumn("city"), |
|||
stringColumn("address"), |
|||
stringColumn("address2"), |
|||
stringColumn("zip"), |
|||
stringColumn("phone"), |
|||
stringColumn("email"), |
|||
stringColumn("additional_info")), |
|||
new CassandraToSqlTable("dashboard", |
|||
idColumn("id"), |
|||
idColumn("tenant_id"), |
|||
stringColumn("title"), |
|||
stringColumn("search_text"), |
|||
stringColumn("assigned_customers"), |
|||
stringColumn("configuration")), |
|||
new CassandraToSqlTable("device", |
|||
idColumn("id"), |
|||
idColumn("tenant_id"), |
|||
idColumn("customer_id"), |
|||
stringColumn("name"), |
|||
stringColumn("type"), |
|||
stringColumn("label"), |
|||
stringColumn("search_text"), |
|||
stringColumn("additional_info")) { |
|||
@Override |
|||
protected boolean onConstraintViolation(List<CassandraToSqlColumnData[]> batchData, |
|||
CassandraToSqlColumnData[] data, String constraint) { |
|||
if (constraint.equalsIgnoreCase("device_name_unq_key")) { |
|||
this.handleUniqueNameViolation(data, "device"); |
|||
return true; |
|||
} |
|||
return super.onConstraintViolation(batchData, data, constraint); |
|||
} |
|||
}, |
|||
new CassandraToSqlTable("device_credentials", |
|||
idColumn("id"), |
|||
idColumn("device_id"), |
|||
stringColumn("credentials_type"), |
|||
stringColumn("credentials_id"), |
|||
stringColumn("credentials_value")), |
|||
new CassandraToSqlTable("event", |
|||
idColumn("id"), |
|||
idColumn("tenant_id"), |
|||
idColumn("entity_id"), |
|||
stringColumn("entity_type"), |
|||
stringColumn("event_type"), |
|||
stringColumn("event_uid"), |
|||
stringColumn("body"), |
|||
new CassandraToSqlEventTsColumn()), |
|||
new CassandraToSqlTable("relation", |
|||
idColumn("from_id"), |
|||
stringColumn("from_type"), |
|||
idColumn("to_id"), |
|||
stringColumn("to_type"), |
|||
stringColumn("relation_type_group"), |
|||
stringColumn("relation_type"), |
|||
stringColumn("additional_info")), |
|||
new CassandraToSqlTable("user", "tb_user", |
|||
idColumn("id"), |
|||
idColumn("tenant_id"), |
|||
idColumn("customer_id"), |
|||
stringColumn("email"), |
|||
stringColumn("search_text"), |
|||
stringColumn("authority"), |
|||
stringColumn("first_name"), |
|||
stringColumn("last_name"), |
|||
stringColumn("additional_info")) { |
|||
@Override |
|||
protected boolean onConstraintViolation(List<CassandraToSqlColumnData[]> batchData, |
|||
CassandraToSqlColumnData[] data, String constraint) { |
|||
if (constraint.equalsIgnoreCase("tb_user_email_key")) { |
|||
this.handleUniqueEmailViolation(data); |
|||
return true; |
|||
} |
|||
return super.onConstraintViolation(batchData, data, constraint); |
|||
} |
|||
}, |
|||
new CassandraToSqlTable("tenant", |
|||
idColumn("id"), |
|||
stringColumn("title"), |
|||
stringColumn("search_text"), |
|||
stringColumn("region"), |
|||
stringColumn("country"), |
|||
stringColumn("state"), |
|||
stringColumn("city"), |
|||
stringColumn("address"), |
|||
stringColumn("address2"), |
|||
stringColumn("zip"), |
|||
stringColumn("phone"), |
|||
stringColumn("email"), |
|||
stringColumn("additional_info"), |
|||
booleanColumn("isolated_tb_core"), |
|||
booleanColumn("isolated_tb_rule_engine")), |
|||
new CassandraToSqlTable("user_credentials", |
|||
idColumn("id"), |
|||
idColumn("user_id"), |
|||
booleanColumn("enabled"), |
|||
stringColumn("password"), |
|||
stringColumn("activate_token"), |
|||
stringColumn("reset_token")) { |
|||
@Override |
|||
protected boolean onConstraintViolation(List<CassandraToSqlColumnData[]> batchData, |
|||
CassandraToSqlColumnData[] data, String constraint) { |
|||
if (constraint.equalsIgnoreCase("user_credentials_user_id_key")) { |
|||
String id = UUIDConverter.fromString(this.getColumnData(data, "id").getValue()).toString(); |
|||
log.warn("Found user credentials record with duplicate user_id [id:[{}]]. Record will be ignored!", id); |
|||
this.ignoreRecord(batchData, data); |
|||
return true; |
|||
} |
|||
return super.onConstraintViolation(batchData, data, constraint); |
|||
} |
|||
}, |
|||
new CassandraToSqlTable("widget_type", |
|||
idColumn("id"), |
|||
idColumn("tenant_id"), |
|||
stringColumn("bundle_alias"), |
|||
stringColumn("alias"), |
|||
stringColumn("name"), |
|||
stringColumn("descriptor")), |
|||
new CassandraToSqlTable("widgets_bundle", |
|||
idColumn("id"), |
|||
idColumn("tenant_id"), |
|||
stringColumn("alias"), |
|||
stringColumn("title"), |
|||
stringColumn("search_text")), |
|||
new CassandraToSqlTable("rule_chain", |
|||
idColumn("id"), |
|||
idColumn("tenant_id"), |
|||
stringColumn("name"), |
|||
stringColumn("search_text"), |
|||
idColumn("first_rule_node_id"), |
|||
booleanColumn("root"), |
|||
booleanColumn("debug_mode"), |
|||
stringColumn("configuration"), |
|||
stringColumn("additional_info")), |
|||
new CassandraToSqlTable("rule_node", |
|||
idColumn("id"), |
|||
idColumn("rule_chain_id"), |
|||
stringColumn("type"), |
|||
stringColumn("name"), |
|||
booleanColumn("debug_mode"), |
|||
stringColumn("search_text"), |
|||
stringColumn("configuration"), |
|||
stringColumn("additional_info")), |
|||
new CassandraToSqlTable("entity_view", |
|||
idColumn("id"), |
|||
idColumn("tenant_id"), |
|||
idColumn("customer_id"), |
|||
idColumn("entity_id"), |
|||
stringColumn("entity_type"), |
|||
stringColumn("name"), |
|||
stringColumn("type"), |
|||
stringColumn("keys"), |
|||
bigintColumn("start_ts"), |
|||
bigintColumn("end_ts"), |
|||
stringColumn("search_text"), |
|||
stringColumn("additional_info")) |
|||
); |
|||
} |
|||
@ -1,172 +0,0 @@ |
|||
/** |
|||
* Copyright © 2016-2020 The Thingsboard Authors |
|||
* |
|||
* Licensed under the Apache License, Version 2.0 (the "License"); |
|||
* you may not use this file except in compliance with the License. |
|||
* You may obtain a copy of the License at |
|||
* |
|||
* http://www.apache.org/licenses/LICENSE-2.0
|
|||
* |
|||
* Unless required by applicable law or agreed to in writing, software |
|||
* distributed under the License is distributed on an "AS IS" BASIS, |
|||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|||
* See the License for the specific language governing permissions and |
|||
* limitations under the License. |
|||
*/ |
|||
package org.thingsboard.server.service.install.migrate; |
|||
|
|||
import com.datastax.oss.driver.api.core.cql.Row; |
|||
import lombok.Data; |
|||
import org.thingsboard.server.common.data.UUIDConverter; |
|||
|
|||
import java.sql.PreparedStatement; |
|||
import java.sql.SQLException; |
|||
import java.sql.Types; |
|||
import java.util.regex.Pattern; |
|||
|
|||
@Data |
|||
public class CassandraToSqlColumn { |
|||
|
|||
private static final ThreadLocal<Pattern> PATTERN_THREAD_LOCAL = ThreadLocal.withInitial(() -> Pattern.compile(String.valueOf(Character.MIN_VALUE))); |
|||
private static final String EMPTY_STR = ""; |
|||
|
|||
private int index; |
|||
private int sqlIndex; |
|||
private String cassandraColumnName; |
|||
private String sqlColumnName; |
|||
private CassandraToSqlColumnType type; |
|||
private int sqlType; |
|||
private int size; |
|||
private Class<? extends Enum> enumClass; |
|||
|
|||
public static CassandraToSqlColumn idColumn(String name) { |
|||
return new CassandraToSqlColumn(name, CassandraToSqlColumnType.ID); |
|||
} |
|||
|
|||
public static CassandraToSqlColumn stringColumn(String name) { |
|||
return new CassandraToSqlColumn(name, CassandraToSqlColumnType.STRING); |
|||
} |
|||
|
|||
public static CassandraToSqlColumn stringColumn(String cassandraColumnName, String sqlColumnName) { |
|||
return new CassandraToSqlColumn(cassandraColumnName, sqlColumnName); |
|||
} |
|||
|
|||
public static CassandraToSqlColumn bigintColumn(String name) { |
|||
return new CassandraToSqlColumn(name, CassandraToSqlColumnType.BIGINT); |
|||
} |
|||
|
|||
public static CassandraToSqlColumn doubleColumn(String name) { |
|||
return new CassandraToSqlColumn(name, CassandraToSqlColumnType.DOUBLE); |
|||
} |
|||
|
|||
public static CassandraToSqlColumn booleanColumn(String name) { |
|||
return new CassandraToSqlColumn(name, CassandraToSqlColumnType.BOOLEAN); |
|||
} |
|||
|
|||
public static CassandraToSqlColumn jsonColumn(String name) { |
|||
return new CassandraToSqlColumn(name, CassandraToSqlColumnType.JSON); |
|||
} |
|||
|
|||
public static CassandraToSqlColumn enumToIntColumn(String name, Class<? extends Enum> enumClass) { |
|||
return new CassandraToSqlColumn(name, CassandraToSqlColumnType.ENUM_TO_INT, enumClass); |
|||
} |
|||
|
|||
public CassandraToSqlColumn(String columnName) { |
|||
this(columnName, columnName, CassandraToSqlColumnType.STRING, null); |
|||
} |
|||
|
|||
public CassandraToSqlColumn(String columnName, CassandraToSqlColumnType type) { |
|||
this(columnName, columnName, type, null); |
|||
} |
|||
|
|||
public CassandraToSqlColumn(String columnName, CassandraToSqlColumnType type, Class<? extends Enum> enumClass) { |
|||
this(columnName, columnName, type, enumClass); |
|||
} |
|||
|
|||
public CassandraToSqlColumn(String cassandraColumnName, String sqlColumnName) { |
|||
this(cassandraColumnName, sqlColumnName, CassandraToSqlColumnType.STRING, null); |
|||
} |
|||
|
|||
public CassandraToSqlColumn(String cassandraColumnName, String sqlColumnName, CassandraToSqlColumnType type, |
|||
Class<? extends Enum> enumClass) { |
|||
this.cassandraColumnName = cassandraColumnName; |
|||
this.sqlColumnName = sqlColumnName; |
|||
this.type = type; |
|||
this.enumClass = enumClass; |
|||
} |
|||
|
|||
public String getColumnValue(Row row) { |
|||
if (row.isNull(index)) { |
|||
if (this.type == CassandraToSqlColumnType.BOOLEAN) { |
|||
return Boolean.toString(false); |
|||
} else { |
|||
return null; |
|||
} |
|||
} else { |
|||
switch (this.type) { |
|||
case ID: |
|||
return UUIDConverter.fromTimeUUID(row.getUuid(index)); |
|||
case DOUBLE: |
|||
return Double.toString(row.getDouble(index)); |
|||
case INTEGER: |
|||
return Integer.toString(row.getInt(index)); |
|||
case FLOAT: |
|||
return Float.toString(row.getFloat(index)); |
|||
case BIGINT: |
|||
return Long.toString(row.getLong(index)); |
|||
case BOOLEAN: |
|||
return Boolean.toString(row.getBoolean(index)); |
|||
case STRING: |
|||
case JSON: |
|||
case ENUM_TO_INT: |
|||
default: |
|||
String value = row.getString(index); |
|||
return this.replaceNullChars(value); |
|||
} |
|||
} |
|||
} |
|||
|
|||
public void setColumnValue(PreparedStatement sqlInsertStatement, String value) throws SQLException { |
|||
if (value == null) { |
|||
sqlInsertStatement.setNull(this.sqlIndex, this.sqlType); |
|||
} else { |
|||
switch (this.type) { |
|||
case DOUBLE: |
|||
sqlInsertStatement.setDouble(this.sqlIndex, Double.parseDouble(value)); |
|||
break; |
|||
case INTEGER: |
|||
sqlInsertStatement.setInt(this.sqlIndex, Integer.parseInt(value)); |
|||
break; |
|||
case FLOAT: |
|||
sqlInsertStatement.setFloat(this.sqlIndex, Float.parseFloat(value)); |
|||
break; |
|||
case BIGINT: |
|||
sqlInsertStatement.setLong(this.sqlIndex, Long.parseLong(value)); |
|||
break; |
|||
case BOOLEAN: |
|||
sqlInsertStatement.setBoolean(this.sqlIndex, Boolean.parseBoolean(value)); |
|||
break; |
|||
case ENUM_TO_INT: |
|||
Enum enumVal = Enum.valueOf(this.enumClass, value); |
|||
int intValue = enumVal.ordinal(); |
|||
sqlInsertStatement.setInt(this.sqlIndex, intValue); |
|||
break; |
|||
case JSON: |
|||
case STRING: |
|||
case ID: |
|||
default: |
|||
sqlInsertStatement.setString(this.sqlIndex, value); |
|||
break; |
|||
} |
|||
} |
|||
} |
|||
|
|||
private String replaceNullChars(String strValue) { |
|||
if (strValue != null) { |
|||
return PATTERN_THREAD_LOCAL.get().matcher(strValue).replaceAll(EMPTY_STR); |
|||
} |
|||
return strValue; |
|||
} |
|||
|
|||
} |
|||
|
|||
@ -1,64 +0,0 @@ |
|||
/** |
|||
* Copyright © 2016-2020 The Thingsboard Authors |
|||
* |
|||
* Licensed under the Apache License, Version 2.0 (the "License"); |
|||
* you may not use this file except in compliance with the License. |
|||
* You may obtain a copy of the License at |
|||
* |
|||
* http://www.apache.org/licenses/LICENSE-2.0
|
|||
* |
|||
* Unless required by applicable law or agreed to in writing, software |
|||
* distributed under the License is distributed on an "AS IS" BASIS, |
|||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|||
* See the License for the specific language governing permissions and |
|||
* limitations under the License. |
|||
*/ |
|||
package org.thingsboard.server.service.install.migrate; |
|||
|
|||
import lombok.Data; |
|||
|
|||
@Data |
|||
public class CassandraToSqlColumnData { |
|||
|
|||
private String value; |
|||
private String originalValue; |
|||
private int constraintCounter = 0; |
|||
|
|||
public CassandraToSqlColumnData(String value) { |
|||
this.value = value; |
|||
this.originalValue = value; |
|||
} |
|||
|
|||
public int nextContraintCounter() { |
|||
return ++constraintCounter; |
|||
} |
|||
|
|||
public String getNextConstraintStringValue(CassandraToSqlColumn column) { |
|||
int counter = this.nextContraintCounter(); |
|||
String newValue = this.originalValue + counter; |
|||
int overflow = newValue.length() - column.getSize(); |
|||
if (overflow > 0) { |
|||
newValue = this.originalValue.substring(0, this.originalValue.length()-overflow) + counter; |
|||
} |
|||
return newValue; |
|||
} |
|||
|
|||
public String getNextConstraintEmailValue(CassandraToSqlColumn column) { |
|||
int counter = this.nextContraintCounter(); |
|||
String[] emailValues = this.originalValue.split("@"); |
|||
String newValue = emailValues[0] + "+" + counter + "@" + emailValues[1]; |
|||
int overflow = newValue.length() - column.getSize(); |
|||
if (overflow > 0) { |
|||
newValue = emailValues[0].substring(0, emailValues[0].length()-overflow) + "+" + counter + "@" + emailValues[1]; |
|||
} |
|||
return newValue; |
|||
} |
|||
|
|||
public String getLogValue() { |
|||
if (this.value != null && this.value.length() > 255) { |
|||
return this.value.substring(0, 255) + "...[truncated " + (this.value.length() - 255) + " symbols]"; |
|||
} |
|||
return this.value; |
|||
} |
|||
|
|||
} |
|||
@ -1,28 +0,0 @@ |
|||
/** |
|||
* Copyright © 2016-2020 The Thingsboard Authors |
|||
* |
|||
* Licensed under the Apache License, Version 2.0 (the "License"); |
|||
* you may not use this file except in compliance with the License. |
|||
* You may obtain a copy of the License at |
|||
* |
|||
* http://www.apache.org/licenses/LICENSE-2.0
|
|||
* |
|||
* Unless required by applicable law or agreed to in writing, software |
|||
* distributed under the License is distributed on an "AS IS" BASIS, |
|||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|||
* See the License for the specific language governing permissions and |
|||
* limitations under the License. |
|||
*/ |
|||
package org.thingsboard.server.service.install.migrate; |
|||
|
|||
public enum CassandraToSqlColumnType { |
|||
ID, |
|||
DOUBLE, |
|||
INTEGER, |
|||
FLOAT, |
|||
BIGINT, |
|||
BOOLEAN, |
|||
STRING, |
|||
JSON, |
|||
ENUM_TO_INT |
|||
} |
|||
@ -1,40 +0,0 @@ |
|||
/** |
|||
* Copyright © 2016-2020 The Thingsboard Authors |
|||
* |
|||
* Licensed under the Apache License, Version 2.0 (the "License"); |
|||
* you may not use this file except in compliance with the License. |
|||
* You may obtain a copy of the License at |
|||
* |
|||
* http://www.apache.org/licenses/LICENSE-2.0
|
|||
* |
|||
* Unless required by applicable law or agreed to in writing, software |
|||
* distributed under the License is distributed on an "AS IS" BASIS, |
|||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|||
* See the License for the specific language governing permissions and |
|||
* limitations under the License. |
|||
*/ |
|||
package org.thingsboard.server.service.install.migrate; |
|||
|
|||
import com.datastax.oss.driver.api.core.cql.Row; |
|||
|
|||
import java.util.UUID; |
|||
|
|||
import static org.thingsboard.server.dao.model.ModelConstants.EPOCH_DIFF; |
|||
|
|||
public class CassandraToSqlEventTsColumn extends CassandraToSqlColumn { |
|||
|
|||
CassandraToSqlEventTsColumn() { |
|||
super("id", "ts", CassandraToSqlColumnType.BIGINT, null); |
|||
} |
|||
|
|||
@Override |
|||
public String getColumnValue(Row row) { |
|||
UUID id = row.getUuid(getIndex()); |
|||
long ts = getTs(id); |
|||
return ts + ""; |
|||
} |
|||
|
|||
private long getTs(UUID uuid) { |
|||
return (uuid.timestamp() - EPOCH_DIFF) / 10000; |
|||
} |
|||
} |
|||
@ -1,304 +0,0 @@ |
|||
/** |
|||
* Copyright © 2016-2020 The Thingsboard Authors |
|||
* |
|||
* Licensed under the Apache License, Version 2.0 (the "License"); |
|||
* you may not use this file except in compliance with the License. |
|||
* You may obtain a copy of the License at |
|||
* |
|||
* http://www.apache.org/licenses/LICENSE-2.0
|
|||
* |
|||
* Unless required by applicable law or agreed to in writing, software |
|||
* distributed under the License is distributed on an "AS IS" BASIS, |
|||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|||
* See the License for the specific language governing permissions and |
|||
* limitations under the License. |
|||
*/ |
|||
package org.thingsboard.server.service.install.migrate; |
|||
|
|||
import com.datastax.oss.driver.api.core.cql.ResultSet; |
|||
import com.datastax.oss.driver.api.core.cql.Row; |
|||
import com.datastax.oss.driver.api.core.cql.SimpleStatement; |
|||
import com.datastax.oss.driver.api.core.cql.Statement; |
|||
import lombok.Data; |
|||
import lombok.extern.slf4j.Slf4j; |
|||
import org.hibernate.internal.util.JdbcExceptionHelper; |
|||
import org.postgresql.util.PSQLException; |
|||
import org.thingsboard.server.common.data.UUIDConverter; |
|||
import org.thingsboard.server.dao.cassandra.guava.GuavaSession; |
|||
|
|||
import java.sql.Connection; |
|||
import java.sql.DatabaseMetaData; |
|||
import java.sql.PreparedStatement; |
|||
import java.sql.SQLException; |
|||
import java.util.ArrayList; |
|||
import java.util.Arrays; |
|||
import java.util.Iterator; |
|||
import java.util.List; |
|||
import java.util.Optional; |
|||
|
|||
@Data |
|||
@Slf4j |
|||
public class CassandraToSqlTable { |
|||
|
|||
private static final int DEFAULT_BATCH_SIZE = 10000; |
|||
|
|||
private String cassandraCf; |
|||
private String sqlTableName; |
|||
|
|||
private List<CassandraToSqlColumn> columns; |
|||
|
|||
private int batchSize = DEFAULT_BATCH_SIZE; |
|||
|
|||
private PreparedStatement sqlInsertStatement; |
|||
|
|||
public CassandraToSqlTable(String tableName, CassandraToSqlColumn... columns) { |
|||
this(tableName, tableName, DEFAULT_BATCH_SIZE, columns); |
|||
} |
|||
|
|||
public CassandraToSqlTable(String tableName, String sqlTableName, CassandraToSqlColumn... columns) { |
|||
this(tableName, sqlTableName, DEFAULT_BATCH_SIZE, columns); |
|||
} |
|||
|
|||
public CassandraToSqlTable(String tableName, int batchSize, CassandraToSqlColumn... columns) { |
|||
this(tableName, tableName, batchSize, columns); |
|||
} |
|||
|
|||
public CassandraToSqlTable(String cassandraCf, String sqlTableName, int batchSize, CassandraToSqlColumn... columns) { |
|||
this.cassandraCf = cassandraCf; |
|||
this.sqlTableName = sqlTableName; |
|||
this.batchSize = batchSize; |
|||
this.columns = Arrays.asList(columns); |
|||
for (int i=0;i<columns.length;i++) { |
|||
this.columns.get(i).setIndex(i); |
|||
this.columns.get(i).setSqlIndex(i+1); |
|||
} |
|||
} |
|||
|
|||
public void migrateToSql(GuavaSession session, Connection conn) throws SQLException { |
|||
log.info("[{}] Migrating data from cassandra '{}' Column Family to '{}' SQL table...", this.sqlTableName, this.cassandraCf, this.sqlTableName); |
|||
DatabaseMetaData metadata = conn.getMetaData(); |
|||
java.sql.ResultSet resultSet = metadata.getColumns(null, null, this.sqlTableName, null); |
|||
while (resultSet.next()) { |
|||
String name = resultSet.getString("COLUMN_NAME"); |
|||
int sqlType = resultSet.getInt("DATA_TYPE"); |
|||
int size = resultSet.getInt("COLUMN_SIZE"); |
|||
CassandraToSqlColumn column = this.getColumn(name); |
|||
column.setSize(size); |
|||
column.setSqlType(sqlType); |
|||
} |
|||
this.sqlInsertStatement = createSqlInsertStatement(conn); |
|||
Statement cassandraSelectStatement = createCassandraSelectStatement(); |
|||
cassandraSelectStatement.setPageSize(100); |
|||
ResultSet rs = session.execute(cassandraSelectStatement); |
|||
Iterator<Row> iter = rs.iterator(); |
|||
int rowCounter = 0; |
|||
List<CassandraToSqlColumnData[]> batchData; |
|||
boolean hasNext; |
|||
do { |
|||
batchData = this.extractBatchData(iter); |
|||
hasNext = batchData.size() == this.batchSize; |
|||
this.batchInsert(batchData, conn); |
|||
rowCounter += batchData.size(); |
|||
log.info("[{}] {} records migrated so far...", this.sqlTableName, rowCounter); |
|||
} while (hasNext); |
|||
this.sqlInsertStatement.close(); |
|||
log.info("[{}] {} total records migrated.", this.sqlTableName, rowCounter); |
|||
log.info("[{}] Finished migration data from cassandra '{}' Column Family to '{}' SQL table.", |
|||
this.sqlTableName, this.cassandraCf, this.sqlTableName); |
|||
} |
|||
|
|||
private List<CassandraToSqlColumnData[]> extractBatchData(Iterator<Row> iter) { |
|||
List<CassandraToSqlColumnData[]> batchData = new ArrayList<>(); |
|||
while (iter.hasNext() && batchData.size() < this.batchSize) { |
|||
Row row = iter.next(); |
|||
if (row != null) { |
|||
CassandraToSqlColumnData[] data = this.extractRowData(row); |
|||
batchData.add(data); |
|||
} |
|||
} |
|||
return batchData; |
|||
} |
|||
|
|||
private CassandraToSqlColumnData[] extractRowData(Row row) { |
|||
CassandraToSqlColumnData[] data = new CassandraToSqlColumnData[this.columns.size()]; |
|||
for (CassandraToSqlColumn column: this.columns) { |
|||
String value = column.getColumnValue(row); |
|||
data[column.getIndex()] = new CassandraToSqlColumnData(value); |
|||
} |
|||
return this.validateColumnData(data); |
|||
} |
|||
|
|||
private CassandraToSqlColumnData[] validateColumnData(CassandraToSqlColumnData[] data) { |
|||
for (int i=0;i<data.length;i++) { |
|||
CassandraToSqlColumn column = this.columns.get(i); |
|||
if (column.getType() == CassandraToSqlColumnType.STRING) { |
|||
CassandraToSqlColumnData columnData = data[i]; |
|||
String value = columnData.getValue(); |
|||
if (value != null && value.length() > column.getSize()) { |
|||
log.warn("[{}] Value size [{}] exceeds maximum size [{}] of column [{}] and will be truncated!", |
|||
this.sqlTableName, |
|||
value.length(), column.getSize(), column.getSqlColumnName()); |
|||
log.warn("[{}] Affected data:\n{}", this.sqlTableName, this.dataToString(data)); |
|||
value = value.substring(0, column.getSize()); |
|||
columnData.setOriginalValue(value); |
|||
columnData.setValue(value); |
|||
} |
|||
} |
|||
} |
|||
return data; |
|||
} |
|||
|
|||
private void batchInsert(List<CassandraToSqlColumnData[]> batchData, Connection conn) throws SQLException { |
|||
boolean retry = false; |
|||
for (CassandraToSqlColumnData[] data : batchData) { |
|||
for (CassandraToSqlColumn column: this.columns) { |
|||
column.setColumnValue(this.sqlInsertStatement, data[column.getIndex()].getValue()); |
|||
} |
|||
try { |
|||
this.sqlInsertStatement.executeUpdate(); |
|||
} catch (SQLException e) { |
|||
if (this.handleInsertException(batchData, data, conn, e)) { |
|||
retry = true; |
|||
break; |
|||
} else { |
|||
throw e; |
|||
} |
|||
} |
|||
} |
|||
if (retry) { |
|||
this.batchInsert(batchData, conn); |
|||
} else { |
|||
conn.commit(); |
|||
} |
|||
} |
|||
|
|||
private boolean handleInsertException(List<CassandraToSqlColumnData[]> batchData, |
|||
CassandraToSqlColumnData[] data, |
|||
Connection conn, SQLException ex) throws SQLException { |
|||
conn.commit(); |
|||
String constraint = extractConstraintName(ex).orElse(null); |
|||
if (constraint != null) { |
|||
if (this.onConstraintViolation(batchData, data, constraint)) { |
|||
return true; |
|||
} else { |
|||
log.error("[{}] Unhandled constraint violation [{}] during insert!", this.sqlTableName, constraint); |
|||
log.error("[{}] Affected data:\n{}", this.sqlTableName, this.dataToString(data)); |
|||
} |
|||
} else { |
|||
log.error("[{}] Unhandled exception during insert!", this.sqlTableName); |
|||
log.error("[{}] Affected data:\n{}", this.sqlTableName, this.dataToString(data)); |
|||
} |
|||
return false; |
|||
} |
|||
|
|||
private String dataToString(CassandraToSqlColumnData[] data) { |
|||
StringBuffer stringData = new StringBuffer("{\n"); |
|||
for (int i=0;i<data.length;i++) { |
|||
String columnName = this.columns.get(i).getSqlColumnName(); |
|||
String value = data[i].getLogValue(); |
|||
stringData.append("\"").append(columnName).append("\": ").append("[").append(value).append("]\n"); |
|||
} |
|||
stringData.append("}"); |
|||
return stringData.toString(); |
|||
} |
|||
|
|||
protected boolean onConstraintViolation(List<CassandraToSqlColumnData[]> batchData, |
|||
CassandraToSqlColumnData[] data, String constraint) { |
|||
return false; |
|||
} |
|||
|
|||
protected void handleUniqueNameViolation(CassandraToSqlColumnData[] data, String entityType) { |
|||
CassandraToSqlColumn nameColumn = this.getColumn("name"); |
|||
CassandraToSqlColumn searchTextColumn = this.getColumn("search_text"); |
|||
CassandraToSqlColumnData nameColumnData = data[nameColumn.getIndex()]; |
|||
CassandraToSqlColumnData searchTextColumnData = data[searchTextColumn.getIndex()]; |
|||
String prevName = nameColumnData.getValue(); |
|||
String newName = nameColumnData.getNextConstraintStringValue(nameColumn); |
|||
nameColumnData.setValue(newName); |
|||
searchTextColumnData.setValue(searchTextColumnData.getNextConstraintStringValue(searchTextColumn)); |
|||
String id = UUIDConverter.fromString(this.getColumnData(data, "id").getValue()).toString(); |
|||
log.warn("Found {} with duplicate name [id:[{}]]. Attempting to rename {} from '{}' to '{}'...", entityType, id, entityType, prevName, newName); |
|||
} |
|||
|
|||
protected void handleUniqueEmailViolation(CassandraToSqlColumnData[] data) { |
|||
CassandraToSqlColumn emailColumn = this.getColumn("email"); |
|||
CassandraToSqlColumn searchTextColumn = this.getColumn("search_text"); |
|||
CassandraToSqlColumnData emailColumnData = data[emailColumn.getIndex()]; |
|||
CassandraToSqlColumnData searchTextColumnData = data[searchTextColumn.getIndex()]; |
|||
String prevEmail = emailColumnData.getValue(); |
|||
String newEmail = emailColumnData.getNextConstraintEmailValue(emailColumn); |
|||
emailColumnData.setValue(newEmail); |
|||
searchTextColumnData.setValue(searchTextColumnData.getNextConstraintEmailValue(searchTextColumn)); |
|||
String id = UUIDConverter.fromString(this.getColumnData(data, "id").getValue()).toString(); |
|||
log.warn("Found user with duplicate email [id:[{}]]. Attempting to rename email from '{}' to '{}'...", id, prevEmail, newEmail); |
|||
} |
|||
|
|||
protected void ignoreRecord(List<CassandraToSqlColumnData[]> batchData, CassandraToSqlColumnData[] data) { |
|||
log.warn("[{}] Affected data:\n{}", this.sqlTableName, this.dataToString(data)); |
|||
int index = batchData.indexOf(data); |
|||
if (index > 0) { |
|||
batchData.remove(index); |
|||
} |
|||
} |
|||
|
|||
protected CassandraToSqlColumn getColumn(String sqlColumnName) { |
|||
return this.columns.stream().filter(col -> col.getSqlColumnName().equals(sqlColumnName)).findFirst().get(); |
|||
} |
|||
|
|||
protected CassandraToSqlColumnData getColumnData(CassandraToSqlColumnData[] data, String sqlColumnName) { |
|||
CassandraToSqlColumn column = this.getColumn(sqlColumnName); |
|||
return data[column.getIndex()]; |
|||
} |
|||
|
|||
private Optional<String> extractConstraintName(SQLException ex) { |
|||
final String sqlState = JdbcExceptionHelper.extractSqlState( ex ); |
|||
if (sqlState != null) { |
|||
String sqlStateClassCode = JdbcExceptionHelper.determineSqlStateClassCode( sqlState ); |
|||
if ( sqlStateClassCode != null ) { |
|||
if (Arrays.asList( |
|||
"23", // "integrity constraint violation"
|
|||
"27", // "triggered data change violation"
|
|||
"44" // "with check option violation"
|
|||
).contains(sqlStateClassCode)) { |
|||
if (ex instanceof PSQLException) { |
|||
return Optional.of(((PSQLException)ex).getServerErrorMessage().getConstraint()); |
|||
} |
|||
} |
|||
} |
|||
} |
|||
return Optional.empty(); |
|||
} |
|||
|
|||
private Statement createCassandraSelectStatement() { |
|||
StringBuilder selectStatementBuilder = new StringBuilder(); |
|||
selectStatementBuilder.append("SELECT "); |
|||
for (CassandraToSqlColumn column : columns) { |
|||
selectStatementBuilder.append(column.getCassandraColumnName()).append(","); |
|||
} |
|||
selectStatementBuilder.deleteCharAt(selectStatementBuilder.length() - 1); |
|||
selectStatementBuilder.append(" FROM ").append(cassandraCf); |
|||
return SimpleStatement.newInstance(selectStatementBuilder.toString()); |
|||
} |
|||
|
|||
private PreparedStatement createSqlInsertStatement(Connection conn) throws SQLException { |
|||
StringBuilder insertStatementBuilder = new StringBuilder(); |
|||
insertStatementBuilder.append("INSERT INTO ").append(this.sqlTableName).append(" ("); |
|||
for (CassandraToSqlColumn column : columns) { |
|||
insertStatementBuilder.append(column.getSqlColumnName()).append(","); |
|||
} |
|||
insertStatementBuilder.deleteCharAt(insertStatementBuilder.length() - 1); |
|||
insertStatementBuilder.append(") VALUES ("); |
|||
for (CassandraToSqlColumn column : columns) { |
|||
if (column.getType() == CassandraToSqlColumnType.JSON) { |
|||
insertStatementBuilder.append("cast(? AS json)"); |
|||
} else { |
|||
insertStatementBuilder.append("?"); |
|||
} |
|||
insertStatementBuilder.append(","); |
|||
} |
|||
insertStatementBuilder.deleteCharAt(insertStatementBuilder.length() - 1); |
|||
insertStatementBuilder.append(")"); |
|||
return conn.prepareStatement(insertStatementBuilder.toString()); |
|||
} |
|||
|
|||
} |
|||
@ -0,0 +1,47 @@ |
|||
/** |
|||
* Copyright © 2016-2020 The Thingsboard Authors |
|||
* |
|||
* Licensed under the Apache License, Version 2.0 (the "License"); |
|||
* you may not use this file except in compliance with the License. |
|||
* You may obtain a copy of the License at |
|||
* |
|||
* http://www.apache.org/licenses/LICENSE-2.0
|
|||
* |
|||
* Unless required by applicable law or agreed to in writing, software |
|||
* distributed under the License is distributed on an "AS IS" BASIS, |
|||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|||
* See the License for the specific language governing permissions and |
|||
* limitations under the License. |
|||
*/ |
|||
package org.thingsboard.server.controller; |
|||
|
|||
import org.cassandraunit.dataset.cql.ClassPathCQLDataSet; |
|||
import org.junit.BeforeClass; |
|||
import org.junit.ClassRule; |
|||
import org.junit.extensions.cpsuite.ClasspathSuite; |
|||
import org.junit.runner.RunWith; |
|||
import org.thingsboard.server.dao.CustomCassandraCQLUnit; |
|||
import org.thingsboard.server.queue.memory.InMemoryStorage; |
|||
|
|||
import java.util.Arrays; |
|||
|
|||
@RunWith(ClasspathSuite.class) |
|||
@ClasspathSuite.ClassnameFilters({ |
|||
"org.thingsboard.server.controller.nosql.*Test"}) |
|||
public class ControllerNoSqlTestSuite { |
|||
|
|||
@ClassRule |
|||
public static CustomCassandraCQLUnit cassandraUnit = |
|||
new CustomCassandraCQLUnit( |
|||
Arrays.asList( |
|||
new ClassPathCQLDataSet("cassandra/schema-ts.cql", false, false), |
|||
new ClassPathCQLDataSet("cassandra/schema-entities.cql", false, false), |
|||
new ClassPathCQLDataSet("cassandra/system-data.cql", false, false), |
|||
new ClassPathCQLDataSet("cassandra/system-test.cql", false, false)), |
|||
"cassandra-test.yaml", 30000l); |
|||
|
|||
@BeforeClass |
|||
public static void cleanupInMemStorage(){ |
|||
InMemoryStorage.getInstance().cleanup(); |
|||
} |
|||
} |
|||
@ -0,0 +1,26 @@ |
|||
/** |
|||
* Copyright © 2016-2020 The Thingsboard Authors |
|||
* |
|||
* Licensed under the Apache License, Version 2.0 (the "License"); |
|||
* you may not use this file except in compliance with the License. |
|||
* You may obtain a copy of the License at |
|||
* |
|||
* http://www.apache.org/licenses/LICENSE-2.0
|
|||
* |
|||
* Unless required by applicable law or agreed to in writing, software |
|||
* distributed under the License is distributed on an "AS IS" BASIS, |
|||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|||
* See the License for the specific language governing permissions and |
|||
* limitations under the License. |
|||
*/ |
|||
package org.thingsboard.server.controller.nosql; |
|||
|
|||
import org.thingsboard.server.controller.BaseAdminControllerTest; |
|||
import org.thingsboard.server.dao.service.DaoNoSqlTest; |
|||
|
|||
/** |
|||
* Created by Valerii Sosliuk on 6/28/2017. |
|||
*/ |
|||
@DaoNoSqlTest |
|||
public class AdminControllerNoSqlTest extends BaseAdminControllerTest { |
|||
} |
|||
@ -0,0 +1,27 @@ |
|||
/** |
|||
* Copyright © 2016-2020 The Thingsboard Authors |
|||
* |
|||
* Licensed under the Apache License, Version 2.0 (the "License"); |
|||
* you may not use this file except in compliance with the License. |
|||
* You may obtain a copy of the License at |
|||
* |
|||
* http://www.apache.org/licenses/LICENSE-2.0
|
|||
* |
|||
* Unless required by applicable law or agreed to in writing, software |
|||
* distributed under the License is distributed on an "AS IS" BASIS, |
|||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|||
* See the License for the specific language governing permissions and |
|||
* limitations under the License. |
|||
*/ |
|||
package org.thingsboard.server.controller.nosql; |
|||
|
|||
import org.thingsboard.server.controller.BaseAssetControllerTest; |
|||
import org.thingsboard.server.dao.service.DaoNoSqlTest; |
|||
import org.thingsboard.server.dao.util.NoSqlDao; |
|||
|
|||
/** |
|||
* Created by Valerii Sosliuk on 6/28/2017. |
|||
*/ |
|||
@DaoNoSqlTest |
|||
public class AssetControllerNoSqlTest extends BaseAssetControllerTest { |
|||
} |
|||
@ -0,0 +1,23 @@ |
|||
/** |
|||
* Copyright © 2016-2020 The Thingsboard Authors |
|||
* |
|||
* Licensed under the Apache License, Version 2.0 (the "License"); |
|||
* you may not use this file except in compliance with the License. |
|||
* You may obtain a copy of the License at |
|||
* |
|||
* http://www.apache.org/licenses/LICENSE-2.0
|
|||
* |
|||
* Unless required by applicable law or agreed to in writing, software |
|||
* distributed under the License is distributed on an "AS IS" BASIS, |
|||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|||
* See the License for the specific language governing permissions and |
|||
* limitations under the License. |
|||
*/ |
|||
package org.thingsboard.server.controller.nosql; |
|||
|
|||
import org.thingsboard.server.controller.BaseAuditLogControllerTest; |
|||
import org.thingsboard.server.dao.service.DaoNoSqlTest; |
|||
|
|||
@DaoNoSqlTest |
|||
public class AuditLogControllerNoSqlTest extends BaseAuditLogControllerTest { |
|||
} |
|||
@ -0,0 +1,26 @@ |
|||
/** |
|||
* Copyright © 2016-2020 The Thingsboard Authors |
|||
* |
|||
* Licensed under the Apache License, Version 2.0 (the "License"); |
|||
* you may not use this file except in compliance with the License. |
|||
* You may obtain a copy of the License at |
|||
* |
|||
* http://www.apache.org/licenses/LICENSE-2.0
|
|||
* |
|||
* Unless required by applicable law or agreed to in writing, software |
|||
* distributed under the License is distributed on an "AS IS" BASIS, |
|||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|||
* See the License for the specific language governing permissions and |
|||
* limitations under the License. |
|||
*/ |
|||
package org.thingsboard.server.controller.nosql; |
|||
|
|||
import org.thingsboard.server.controller.BaseAuthControllerTest; |
|||
import org.thingsboard.server.dao.service.DaoNoSqlTest; |
|||
|
|||
/** |
|||
* Created by Valerii Sosliuk on 6/28/2017. |
|||
*/ |
|||
@DaoNoSqlTest |
|||
public class AuthControllerNoSqlTest extends BaseAuthControllerTest { |
|||
} |
|||
@ -0,0 +1,26 @@ |
|||
/** |
|||
* Copyright © 2016-2020 The Thingsboard Authors |
|||
* |
|||
* Licensed under the Apache License, Version 2.0 (the "License"); |
|||
* you may not use this file except in compliance with the License. |
|||
* You may obtain a copy of the License at |
|||
* |
|||
* http://www.apache.org/licenses/LICENSE-2.0
|
|||
* |
|||
* Unless required by applicable law or agreed to in writing, software |
|||
* distributed under the License is distributed on an "AS IS" BASIS, |
|||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|||
* See the License for the specific language governing permissions and |
|||
* limitations under the License. |
|||
*/ |
|||
package org.thingsboard.server.controller.nosql; |
|||
|
|||
import org.thingsboard.server.controller.BaseComponentDescriptorControllerTest; |
|||
import org.thingsboard.server.dao.service.DaoNoSqlTest; |
|||
|
|||
/** |
|||
* Created by Valerii Sosliuk on 6/28/2017. |
|||
*/ |
|||
@DaoNoSqlTest |
|||
public class ComponentDescriptorControllerNoSqlTest extends BaseComponentDescriptorControllerTest { |
|||
} |
|||
@ -0,0 +1,26 @@ |
|||
/** |
|||
* Copyright © 2016-2020 The Thingsboard Authors |
|||
* |
|||
* Licensed under the Apache License, Version 2.0 (the "License"); |
|||
* you may not use this file except in compliance with the License. |
|||
* You may obtain a copy of the License at |
|||
* |
|||
* http://www.apache.org/licenses/LICENSE-2.0
|
|||
* |
|||
* Unless required by applicable law or agreed to in writing, software |
|||
* distributed under the License is distributed on an "AS IS" BASIS, |
|||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|||
* See the License for the specific language governing permissions and |
|||
* limitations under the License. |
|||
*/ |
|||
package org.thingsboard.server.controller.nosql; |
|||
|
|||
import org.thingsboard.server.controller.BaseCustomerControllerTest; |
|||
import org.thingsboard.server.dao.service.DaoNoSqlTest; |
|||
|
|||
/** |
|||
* Created by Valerii Sosliuk on 6/28/2017. |
|||
*/ |
|||
@DaoNoSqlTest |
|||
public class CustomerControllerNoSqlTest extends BaseCustomerControllerTest { |
|||
} |
|||
@ -0,0 +1,26 @@ |
|||
/** |
|||
* Copyright © 2016-2020 The Thingsboard Authors |
|||
* |
|||
* Licensed under the Apache License, Version 2.0 (the "License"); |
|||
* you may not use this file except in compliance with the License. |
|||
* You may obtain a copy of the License at |
|||
* |
|||
* http://www.apache.org/licenses/LICENSE-2.0
|
|||
* |
|||
* Unless required by applicable law or agreed to in writing, software |
|||
* distributed under the License is distributed on an "AS IS" BASIS, |
|||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|||
* See the License for the specific language governing permissions and |
|||
* limitations under the License. |
|||
*/ |
|||
package org.thingsboard.server.controller.nosql; |
|||
|
|||
import org.thingsboard.server.controller.BaseDashboardControllerTest; |
|||
import org.thingsboard.server.dao.service.DaoNoSqlTest; |
|||
|
|||
/** |
|||
* Created by Valerii Sosliuk on 6/28/2017. |
|||
*/ |
|||
@DaoNoSqlTest |
|||
public class DashboardControllerNoSqlTest extends BaseDashboardControllerTest { |
|||
} |
|||
@ -0,0 +1,26 @@ |
|||
/** |
|||
* Copyright © 2016-2020 The Thingsboard Authors |
|||
* |
|||
* Licensed under the Apache License, Version 2.0 (the "License"); |
|||
* you may not use this file except in compliance with the License. |
|||
* You may obtain a copy of the License at |
|||
* |
|||
* http://www.apache.org/licenses/LICENSE-2.0
|
|||
* |
|||
* Unless required by applicable law or agreed to in writing, software |
|||
* distributed under the License is distributed on an "AS IS" BASIS, |
|||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|||
* See the License for the specific language governing permissions and |
|||
* limitations under the License. |
|||
*/ |
|||
package org.thingsboard.server.controller.nosql; |
|||
|
|||
import org.thingsboard.server.controller.BaseDeviceControllerTest; |
|||
import org.thingsboard.server.dao.service.DaoNoSqlTest; |
|||
|
|||
/** |
|||
* Created by Valerii Sosliuk on 6/28/2017. |
|||
*/ |
|||
@DaoNoSqlTest |
|||
public class DeviceControllerNoSqlTest extends BaseDeviceControllerTest { |
|||
} |
|||
@ -0,0 +1,26 @@ |
|||
/** |
|||
* Copyright © 2016-2020 The Thingsboard Authors |
|||
* |
|||
* Licensed under the Apache License, Version 2.0 (the "License"); |
|||
* you may not use this file except in compliance with the License. |
|||
* You may obtain a copy of the License at |
|||
* |
|||
* http://www.apache.org/licenses/LICENSE-2.0
|
|||
* |
|||
* Unless required by applicable law or agreed to in writing, software |
|||
* distributed under the License is distributed on an "AS IS" BASIS, |
|||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|||
* See the License for the specific language governing permissions and |
|||
* limitations under the License. |
|||
*/ |
|||
package org.thingsboard.server.controller.nosql; |
|||
|
|||
import org.thingsboard.server.controller.BaseEntityViewControllerTest; |
|||
import org.thingsboard.server.dao.service.DaoNoSqlTest; |
|||
|
|||
/** |
|||
* Created by Victor Basanets on 8/27/2017. |
|||
*/ |
|||
@DaoNoSqlTest |
|||
public class EntityViewControllerNoSqlTest extends BaseEntityViewControllerTest { |
|||
} |
|||
@ -0,0 +1,26 @@ |
|||
/** |
|||
* Copyright © 2016-2020 The Thingsboard Authors |
|||
* |
|||
* Licensed under the Apache License, Version 2.0 (the "License"); |
|||
* you may not use this file except in compliance with the License. |
|||
* You may obtain a copy of the License at |
|||
* |
|||
* http://www.apache.org/licenses/LICENSE-2.0
|
|||
* |
|||
* Unless required by applicable law or agreed to in writing, software |
|||
* distributed under the License is distributed on an "AS IS" BASIS, |
|||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|||
* See the License for the specific language governing permissions and |
|||
* limitations under the License. |
|||
*/ |
|||
package org.thingsboard.server.controller.nosql; |
|||
|
|||
import org.thingsboard.server.controller.BaseTenantControllerTest; |
|||
import org.thingsboard.server.dao.service.DaoNoSqlTest; |
|||
|
|||
/** |
|||
* Created by Valerii Sosliuk on 6/28/2017. |
|||
*/ |
|||
@DaoNoSqlTest |
|||
public class TenantControllerNoSqlTest extends BaseTenantControllerTest { |
|||
} |
|||
@ -0,0 +1,26 @@ |
|||
/** |
|||
* Copyright © 2016-2020 The Thingsboard Authors |
|||
* |
|||
* Licensed under the Apache License, Version 2.0 (the "License"); |
|||
* you may not use this file except in compliance with the License. |
|||
* You may obtain a copy of the License at |
|||
* |
|||
* http://www.apache.org/licenses/LICENSE-2.0
|
|||
* |
|||
* Unless required by applicable law or agreed to in writing, software |
|||
* distributed under the License is distributed on an "AS IS" BASIS, |
|||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|||
* See the License for the specific language governing permissions and |
|||
* limitations under the License. |
|||
*/ |
|||
package org.thingsboard.server.controller.nosql; |
|||
|
|||
import org.thingsboard.server.controller.BaseUserControllerTest; |
|||
import org.thingsboard.server.dao.service.DaoNoSqlTest; |
|||
|
|||
/** |
|||
* Created by Valerii Sosliuk on 6/28/2017. |
|||
*/ |
|||
@DaoNoSqlTest |
|||
public class UserControllerNoSqlTest extends BaseUserControllerTest { |
|||
} |
|||
@ -0,0 +1,26 @@ |
|||
/** |
|||
* Copyright © 2016-2020 The Thingsboard Authors |
|||
* |
|||
* Licensed under the Apache License, Version 2.0 (the "License"); |
|||
* you may not use this file except in compliance with the License. |
|||
* You may obtain a copy of the License at |
|||
* |
|||
* http://www.apache.org/licenses/LICENSE-2.0
|
|||
* |
|||
* Unless required by applicable law or agreed to in writing, software |
|||
* distributed under the License is distributed on an "AS IS" BASIS, |
|||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|||
* See the License for the specific language governing permissions and |
|||
* limitations under the License. |
|||
*/ |
|||
package org.thingsboard.server.controller.nosql; |
|||
|
|||
import org.thingsboard.server.controller.BaseWidgetTypeControllerTest; |
|||
import org.thingsboard.server.dao.service.DaoNoSqlTest; |
|||
|
|||
/** |
|||
* Created by Valerii Sosliuk on 6/28/2017. |
|||
*/ |
|||
@DaoNoSqlTest |
|||
public class WidgetTypeControllerNoSqlTest extends BaseWidgetTypeControllerTest { |
|||
} |
|||
@ -0,0 +1,26 @@ |
|||
/** |
|||
* Copyright © 2016-2020 The Thingsboard Authors |
|||
* |
|||
* Licensed under the Apache License, Version 2.0 (the "License"); |
|||
* you may not use this file except in compliance with the License. |
|||
* You may obtain a copy of the License at |
|||
* |
|||
* http://www.apache.org/licenses/LICENSE-2.0
|
|||
* |
|||
* Unless required by applicable law or agreed to in writing, software |
|||
* distributed under the License is distributed on an "AS IS" BASIS, |
|||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|||
* See the License for the specific language governing permissions and |
|||
* limitations under the License. |
|||
*/ |
|||
package org.thingsboard.server.controller.nosql; |
|||
|
|||
import org.thingsboard.server.controller.BaseWidgetsBundleControllerTest; |
|||
import org.thingsboard.server.dao.service.DaoNoSqlTest; |
|||
|
|||
/** |
|||
* Created by Valerii Sosliuk on 6/28/2017. |
|||
*/ |
|||
@DaoNoSqlTest |
|||
public class WidgetsBundleControllerNoSqlTest extends BaseWidgetsBundleControllerTest { |
|||
} |
|||
@ -0,0 +1,50 @@ |
|||
/** |
|||
* Copyright © 2016-2020 The Thingsboard Authors |
|||
* |
|||
* Licensed under the Apache License, Version 2.0 (the "License"); |
|||
* you may not use this file except in compliance with the License. |
|||
* You may obtain a copy of the License at |
|||
* |
|||
* http://www.apache.org/licenses/LICENSE-2.0
|
|||
* |
|||
* Unless required by applicable law or agreed to in writing, software |
|||
* distributed under the License is distributed on an "AS IS" BASIS, |
|||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|||
* See the License for the specific language governing permissions and |
|||
* limitations under the License. |
|||
*/ |
|||
package org.thingsboard.server.rules; |
|||
|
|||
import org.cassandraunit.dataset.cql.ClassPathCQLDataSet; |
|||
import org.junit.BeforeClass; |
|||
import org.junit.ClassRule; |
|||
import org.junit.extensions.cpsuite.ClasspathSuite; |
|||
import org.junit.runner.RunWith; |
|||
import org.thingsboard.server.dao.CustomCassandraCQLUnit; |
|||
import org.thingsboard.server.dao.CustomSqlUnit; |
|||
import org.thingsboard.server.queue.memory.InMemoryStorage; |
|||
|
|||
import java.util.Arrays; |
|||
|
|||
@RunWith(ClasspathSuite.class) |
|||
@ClasspathSuite.ClassnameFilters({ |
|||
"org.thingsboard.server.rules.flow.nosql.*Test", |
|||
"org.thingsboard.server.rules.lifecycle.nosql.*Test" |
|||
}) |
|||
public class RuleEngineNoSqlTestSuite { |
|||
|
|||
@ClassRule |
|||
public static CustomCassandraCQLUnit cassandraUnit = |
|||
new CustomCassandraCQLUnit( |
|||
Arrays.asList( |
|||
new ClassPathCQLDataSet("cassandra/schema-ts.cql", false, false), |
|||
new ClassPathCQLDataSet("cassandra/schema-entities.cql", false, false), |
|||
new ClassPathCQLDataSet("cassandra/system-data.cql", false, false)), |
|||
"cassandra-test.yaml", 30000l); |
|||
|
|||
@BeforeClass |
|||
public static void cleanupInMemStorage(){ |
|||
InMemoryStorage.getInstance().cleanup(); |
|||
} |
|||
|
|||
} |
|||
@ -0,0 +1,48 @@ |
|||
/** |
|||
* Copyright © 2016-2020 The Thingsboard Authors |
|||
* |
|||
* Licensed under the Apache License, Version 2.0 (the "License"); |
|||
* you may not use this file except in compliance with the License. |
|||
* You may obtain a copy of the License at |
|||
* |
|||
* http://www.apache.org/licenses/LICENSE-2.0
|
|||
* |
|||
* Unless required by applicable law or agreed to in writing, software |
|||
* distributed under the License is distributed on an "AS IS" BASIS, |
|||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|||
* See the License for the specific language governing permissions and |
|||
* limitations under the License. |
|||
*/ |
|||
package org.thingsboard.server.system; |
|||
|
|||
import org.cassandraunit.dataset.cql.ClassPathCQLDataSet; |
|||
import org.junit.BeforeClass; |
|||
import org.junit.ClassRule; |
|||
import org.junit.extensions.cpsuite.ClasspathSuite; |
|||
import org.junit.runner.RunWith; |
|||
import org.thingsboard.server.dao.CustomCassandraCQLUnit; |
|||
import org.thingsboard.server.queue.memory.InMemoryStorage; |
|||
|
|||
import java.util.Arrays; |
|||
|
|||
/** |
|||
* @author Andrew Shvayka |
|||
*/ |
|||
@RunWith(ClasspathSuite.class) |
|||
@ClasspathSuite.ClassnameFilters({"org.thingsboard.server.system.*NoSqlTest"}) |
|||
public class SystemNoSqlTestSuite { |
|||
|
|||
@ClassRule |
|||
public static CustomCassandraCQLUnit cassandraUnit = |
|||
new CustomCassandraCQLUnit( |
|||
Arrays.asList( |
|||
new ClassPathCQLDataSet("cassandra/schema-ts.cql", false, false), |
|||
new ClassPathCQLDataSet("cassandra/schema-entities.cql", false, false), |
|||
new ClassPathCQLDataSet("cassandra/system-data.cql", false, false)), |
|||
"cassandra-test.yaml", 30000l); |
|||
|
|||
@BeforeClass |
|||
public static void cleanupInMemStorage(){ |
|||
InMemoryStorage.getInstance().cleanup(); |
|||
} |
|||
} |
|||
@ -1,232 +0,0 @@ |
|||
/** |
|||
* Copyright © 2016-2020 The Thingsboard Authors |
|||
* |
|||
* Licensed under the Apache License, Version 2.0 (the "License"); |
|||
* you may not use this file except in compliance with the License. |
|||
* You may obtain a copy of the License at |
|||
* |
|||
* http://www.apache.org/licenses/LICENSE-2.0
|
|||
* |
|||
* Unless required by applicable law or agreed to in writing, software |
|||
* distributed under the License is distributed on an "AS IS" BASIS, |
|||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|||
* See the License for the specific language governing permissions and |
|||
* limitations under the License. |
|||
*/ |
|||
package org.thingsboard.server.dao.cassandra; |
|||
|
|||
import com.datastax.oss.driver.api.core.ConsistencyLevel; |
|||
import com.datastax.oss.driver.api.core.DefaultConsistencyLevel; |
|||
import com.datastax.oss.driver.api.core.config.DefaultDriverOption; |
|||
import com.datastax.oss.driver.api.core.config.DriverConfigLoader; |
|||
import com.datastax.oss.driver.api.core.config.ProgrammaticDriverConfigLoaderBuilder; |
|||
import com.datastax.oss.driver.api.core.metrics.DefaultSessionMetric; |
|||
import com.datastax.oss.driver.api.core.metrics.DefaultNodeMetric; |
|||
import lombok.Data; |
|||
import org.apache.commons.lang3.StringUtils; |
|||
import org.springframework.beans.factory.annotation.Value; |
|||
import org.springframework.context.annotation.Configuration; |
|||
import org.springframework.stereotype.Component; |
|||
import org.thingsboard.server.dao.util.NoSqlAnyDao; |
|||
|
|||
import javax.annotation.PostConstruct; |
|||
import java.time.Duration; |
|||
import java.util.ArrayList; |
|||
import java.util.Arrays; |
|||
import java.util.Collections; |
|||
import java.util.List; |
|||
|
|||
@Component |
|||
@Configuration |
|||
@Data |
|||
@NoSqlAnyDao |
|||
public class CassandraDriverOptions { |
|||
|
|||
private static final String COMMA = ","; |
|||
|
|||
@Value("${cassandra.cluster_name}") |
|||
private String clusterName; |
|||
@Value("${cassandra.url}") |
|||
private String url; |
|||
|
|||
@Value("${cassandra.socket.connect_timeout}") |
|||
private int connectTimeoutMillis; |
|||
@Value("${cassandra.socket.read_timeout}") |
|||
private int readTimeoutMillis; |
|||
@Value("${cassandra.socket.keep_alive}") |
|||
private Boolean keepAlive; |
|||
@Value("${cassandra.socket.reuse_address}") |
|||
private Boolean reuseAddress; |
|||
@Value("${cassandra.socket.so_linger}") |
|||
private Integer soLinger; |
|||
@Value("${cassandra.socket.tcp_no_delay}") |
|||
private Boolean tcpNoDelay; |
|||
@Value("${cassandra.socket.receive_buffer_size}") |
|||
private Integer receiveBufferSize; |
|||
@Value("${cassandra.socket.send_buffer_size}") |
|||
private Integer sendBufferSize; |
|||
|
|||
@Value("${cassandra.max_requests_per_connection_local:32768}") |
|||
private int max_requests_local; |
|||
@Value("${cassandra.max_requests_per_connection_remote:32768}") |
|||
private int max_requests_remote; |
|||
|
|||
@Value("${cassandra.query.default_fetch_size}") |
|||
private Integer defaultFetchSize; |
|||
@Value("${cassandra.query.read_consistency_level}") |
|||
private String readConsistencyLevel; |
|||
@Value("${cassandra.query.write_consistency_level}") |
|||
private String writeConsistencyLevel; |
|||
|
|||
@Value("${cassandra.compression}") |
|||
private String compression; |
|||
@Value("${cassandra.ssl}") |
|||
private Boolean ssl; |
|||
@Value("${cassandra.metrics}") |
|||
private Boolean metrics; |
|||
|
|||
@Value("${cassandra.credentials}") |
|||
private Boolean credentials; |
|||
@Value("${cassandra.username}") |
|||
private String username; |
|||
@Value("${cassandra.password}") |
|||
private String password; |
|||
|
|||
@Value("${cassandra.init_timeout_ms}") |
|||
private long initTimeout; |
|||
@Value("${cassandra.init_retry_interval_ms}") |
|||
private long initRetryInterval; |
|||
|
|||
private DriverConfigLoader loader; |
|||
|
|||
private ConsistencyLevel defaultReadConsistencyLevel; |
|||
private ConsistencyLevel defaultWriteConsistencyLevel; |
|||
|
|||
@PostConstruct |
|||
public void initLoader() { |
|||
ProgrammaticDriverConfigLoaderBuilder driverConfigBuilder = |
|||
DriverConfigLoader.programmaticBuilder(); |
|||
|
|||
driverConfigBuilder |
|||
.withStringList(DefaultDriverOption.CONTACT_POINTS, getContactPoints(url)) |
|||
.withString(DefaultDriverOption.SESSION_NAME, clusterName); |
|||
|
|||
this.initSocketOptions(driverConfigBuilder); |
|||
this.initPoolingOptions(driverConfigBuilder); |
|||
this.initQueryOptions(driverConfigBuilder); |
|||
|
|||
driverConfigBuilder.withString(DefaultDriverOption.PROTOCOL_COMPRESSION, |
|||
StringUtils.isEmpty(this.compression) ? "none" : this.compression.toLowerCase()); |
|||
|
|||
if (this.ssl) { |
|||
driverConfigBuilder.withString(DefaultDriverOption.SSL_ENGINE_FACTORY_CLASS, |
|||
"DefaultSslEngineFactory"); |
|||
} |
|||
|
|||
if (this.metrics) { |
|||
driverConfigBuilder.withStringList(DefaultDriverOption.METRICS_SESSION_ENABLED, |
|||
Arrays.asList(DefaultSessionMetric.CONNECTED_NODES.getPath(), |
|||
DefaultSessionMetric.CQL_REQUESTS.getPath())); |
|||
driverConfigBuilder.withStringList(DefaultDriverOption.METRICS_NODE_ENABLED, |
|||
Arrays.asList(DefaultNodeMetric.OPEN_CONNECTIONS.getPath(), |
|||
DefaultNodeMetric.IN_FLIGHT.getPath())); |
|||
} |
|||
|
|||
if (this.credentials) { |
|||
driverConfigBuilder.withString(DefaultDriverOption.AUTH_PROVIDER_CLASS, |
|||
"PlainTextAuthProvider"); |
|||
driverConfigBuilder.withString(DefaultDriverOption.AUTH_PROVIDER_USER_NAME, |
|||
this.username); |
|||
driverConfigBuilder.withString(DefaultDriverOption.AUTH_PROVIDER_PASSWORD, |
|||
this.password); |
|||
} |
|||
|
|||
driverConfigBuilder.withBoolean(DefaultDriverOption.RECONNECT_ON_INIT, |
|||
true); |
|||
driverConfigBuilder.withString(DefaultDriverOption.RECONNECTION_POLICY_CLASS, |
|||
"ExponentialReconnectionPolicy"); |
|||
driverConfigBuilder.withDuration(DefaultDriverOption.RECONNECTION_BASE_DELAY, |
|||
Duration.ofMillis(this.initRetryInterval)); |
|||
driverConfigBuilder.withDuration(DefaultDriverOption.RECONNECTION_MAX_DELAY, |
|||
Duration.ofMillis(this.initTimeout)); |
|||
|
|||
this.loader = driverConfigBuilder.build(); |
|||
} |
|||
|
|||
protected ConsistencyLevel getDefaultReadConsistencyLevel() { |
|||
if (defaultReadConsistencyLevel == null) { |
|||
if (readConsistencyLevel != null) { |
|||
defaultReadConsistencyLevel = DefaultConsistencyLevel.valueOf(readConsistencyLevel.toUpperCase()); |
|||
} else { |
|||
defaultReadConsistencyLevel = DefaultConsistencyLevel.ONE; |
|||
} |
|||
} |
|||
return defaultReadConsistencyLevel; |
|||
} |
|||
|
|||
protected ConsistencyLevel getDefaultWriteConsistencyLevel() { |
|||
if (defaultWriteConsistencyLevel == null) { |
|||
if (writeConsistencyLevel != null) { |
|||
defaultWriteConsistencyLevel = DefaultConsistencyLevel.valueOf(writeConsistencyLevel.toUpperCase()); |
|||
} else { |
|||
defaultWriteConsistencyLevel = DefaultConsistencyLevel.ONE; |
|||
} |
|||
} |
|||
return defaultWriteConsistencyLevel; |
|||
} |
|||
|
|||
private void initSocketOptions(ProgrammaticDriverConfigLoaderBuilder driverConfigBuilder) { |
|||
driverConfigBuilder.withDuration(DefaultDriverOption.CONNECTION_CONNECT_TIMEOUT, |
|||
Duration.ofMillis(this.connectTimeoutMillis)); |
|||
driverConfigBuilder.withDuration(DefaultDriverOption.REQUEST_TIMEOUT, |
|||
Duration.ofMillis(this.readTimeoutMillis)); |
|||
if (this.keepAlive != null) { |
|||
driverConfigBuilder.withBoolean(DefaultDriverOption.SOCKET_KEEP_ALIVE, |
|||
this.keepAlive); |
|||
} |
|||
if (this.reuseAddress != null) { |
|||
driverConfigBuilder.withBoolean(DefaultDriverOption.SOCKET_REUSE_ADDRESS, |
|||
this.reuseAddress); |
|||
} |
|||
if (this.soLinger != null) { |
|||
driverConfigBuilder.withInt(DefaultDriverOption.SOCKET_LINGER_INTERVAL, |
|||
this.soLinger); |
|||
} |
|||
if (this.tcpNoDelay != null) { |
|||
driverConfigBuilder.withBoolean(DefaultDriverOption.SOCKET_TCP_NODELAY, |
|||
this.tcpNoDelay); |
|||
} |
|||
if (this.receiveBufferSize != null) { |
|||
driverConfigBuilder.withInt(DefaultDriverOption.SOCKET_RECEIVE_BUFFER_SIZE, |
|||
this.receiveBufferSize); |
|||
} |
|||
if (this.sendBufferSize != null) { |
|||
driverConfigBuilder.withInt(DefaultDriverOption.SOCKET_SEND_BUFFER_SIZE, |
|||
this.sendBufferSize); |
|||
} |
|||
} |
|||
|
|||
private void initPoolingOptions(ProgrammaticDriverConfigLoaderBuilder driverConfigBuilder) { |
|||
driverConfigBuilder.withInt(DefaultDriverOption.CONNECTION_MAX_REQUESTS, |
|||
this.max_requests_local); |
|||
} |
|||
|
|||
private void initQueryOptions(ProgrammaticDriverConfigLoaderBuilder driverConfigBuilder) { |
|||
driverConfigBuilder.withInt(DefaultDriverOption.REQUEST_PAGE_SIZE, |
|||
this.defaultFetchSize); |
|||
} |
|||
|
|||
private List<String> getContactPoints(String url) { |
|||
List<String> result; |
|||
if (StringUtils.isBlank(url)) { |
|||
result = Collections.emptyList(); |
|||
} else { |
|||
result = new ArrayList<>(); |
|||
for (String hostPort : url.split(COMMA)) { |
|||
result.add(hostPort); |
|||
} |
|||
} |
|||
return result; |
|||
} |
|||
|
|||
} |
|||
@ -0,0 +1,73 @@ |
|||
/** |
|||
* Copyright © 2016-2020 The Thingsboard Authors |
|||
* |
|||
* Licensed under the Apache License, Version 2.0 (the "License"); |
|||
* you may not use this file except in compliance with the License. |
|||
* You may obtain a copy of the License at |
|||
* |
|||
* http://www.apache.org/licenses/LICENSE-2.0
|
|||
* |
|||
* Unless required by applicable law or agreed to in writing, software |
|||
* distributed under the License is distributed on an "AS IS" BASIS, |
|||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|||
* See the License for the specific language governing permissions and |
|||
* limitations under the License. |
|||
*/ |
|||
package org.thingsboard.server.dao.cassandra; |
|||
|
|||
import com.datastax.driver.core.ConsistencyLevel; |
|||
import com.datastax.driver.core.QueryOptions; |
|||
import lombok.Data; |
|||
import org.springframework.beans.factory.annotation.Value; |
|||
import org.springframework.context.annotation.Configuration; |
|||
import org.springframework.stereotype.Component; |
|||
import org.thingsboard.server.dao.util.NoSqlAnyDao; |
|||
|
|||
import javax.annotation.PostConstruct; |
|||
|
|||
@Component |
|||
@Configuration |
|||
@Data |
|||
@NoSqlAnyDao |
|||
public class CassandraQueryOptions { |
|||
|
|||
@Value("${cassandra.query.default_fetch_size}") |
|||
private Integer defaultFetchSize; |
|||
@Value("${cassandra.query.read_consistency_level}") |
|||
private String readConsistencyLevel; |
|||
@Value("${cassandra.query.write_consistency_level}") |
|||
private String writeConsistencyLevel; |
|||
|
|||
private QueryOptions opts; |
|||
|
|||
private ConsistencyLevel defaultReadConsistencyLevel; |
|||
private ConsistencyLevel defaultWriteConsistencyLevel; |
|||
|
|||
@PostConstruct |
|||
public void initOpts(){ |
|||
opts = new QueryOptions(); |
|||
opts.setFetchSize(defaultFetchSize); |
|||
} |
|||
|
|||
protected ConsistencyLevel getDefaultReadConsistencyLevel() { |
|||
if (defaultReadConsistencyLevel == null) { |
|||
if (readConsistencyLevel != null) { |
|||
defaultReadConsistencyLevel = ConsistencyLevel.valueOf(readConsistencyLevel.toUpperCase()); |
|||
} else { |
|||
defaultReadConsistencyLevel = ConsistencyLevel.ONE; |
|||
} |
|||
} |
|||
return defaultReadConsistencyLevel; |
|||
} |
|||
|
|||
protected ConsistencyLevel getDefaultWriteConsistencyLevel() { |
|||
if (defaultWriteConsistencyLevel == null) { |
|||
if (writeConsistencyLevel != null) { |
|||
defaultWriteConsistencyLevel = ConsistencyLevel.valueOf(writeConsistencyLevel.toUpperCase()); |
|||
} else { |
|||
defaultWriteConsistencyLevel = ConsistencyLevel.ONE; |
|||
} |
|||
} |
|||
return defaultWriteConsistencyLevel; |
|||
} |
|||
} |
|||
@ -0,0 +1,76 @@ |
|||
/** |
|||
* Copyright © 2016-2020 The Thingsboard Authors |
|||
* |
|||
* Licensed under the Apache License, Version 2.0 (the "License"); |
|||
* you may not use this file except in compliance with the License. |
|||
* You may obtain a copy of the License at |
|||
* |
|||
* http://www.apache.org/licenses/LICENSE-2.0
|
|||
* |
|||
* Unless required by applicable law or agreed to in writing, software |
|||
* distributed under the License is distributed on an "AS IS" BASIS, |
|||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|||
* See the License for the specific language governing permissions and |
|||
* limitations under the License. |
|||
*/ |
|||
package org.thingsboard.server.dao.cassandra; |
|||
|
|||
import com.datastax.driver.core.SocketOptions; |
|||
import lombok.Data; |
|||
import org.springframework.beans.factory.annotation.Value; |
|||
import org.springframework.context.annotation.Configuration; |
|||
import org.springframework.stereotype.Component; |
|||
import org.thingsboard.server.dao.util.NoSqlAnyDao; |
|||
|
|||
import javax.annotation.PostConstruct; |
|||
|
|||
@Component |
|||
@Configuration |
|||
@Data |
|||
@NoSqlAnyDao |
|||
public class CassandraSocketOptions { |
|||
|
|||
@Value("${cassandra.socket.connect_timeout}") |
|||
private int connectTimeoutMillis; |
|||
@Value("${cassandra.socket.read_timeout}") |
|||
private int readTimeoutMillis; |
|||
@Value("${cassandra.socket.keep_alive}") |
|||
private Boolean keepAlive; |
|||
@Value("${cassandra.socket.reuse_address}") |
|||
private Boolean reuseAddress; |
|||
@Value("${cassandra.socket.so_linger}") |
|||
private Integer soLinger; |
|||
@Value("${cassandra.socket.tcp_no_delay}") |
|||
private Boolean tcpNoDelay; |
|||
@Value("${cassandra.socket.receive_buffer_size}") |
|||
private Integer receiveBufferSize; |
|||
@Value("${cassandra.socket.send_buffer_size}") |
|||
private Integer sendBufferSize; |
|||
|
|||
private SocketOptions opts; |
|||
|
|||
@PostConstruct |
|||
public void initOpts() { |
|||
opts = new SocketOptions(); |
|||
opts.setConnectTimeoutMillis(connectTimeoutMillis); |
|||
opts.setReadTimeoutMillis(readTimeoutMillis); |
|||
if (keepAlive != null) { |
|||
opts.setKeepAlive(keepAlive); |
|||
} |
|||
if (reuseAddress != null) { |
|||
opts.setReuseAddress(reuseAddress); |
|||
} |
|||
if (soLinger != null) { |
|||
opts.setSoLinger(soLinger); |
|||
} |
|||
if (tcpNoDelay != null) { |
|||
opts.setTcpNoDelay(tcpNoDelay); |
|||
} |
|||
if (receiveBufferSize != null) { |
|||
opts.setReceiveBufferSize(receiveBufferSize); |
|||
} |
|||
if (sendBufferSize != null) { |
|||
opts.setSendBufferSize(sendBufferSize); |
|||
} |
|||
} |
|||
} |
|||
@ -1,84 +0,0 @@ |
|||
/** |
|||
* Copyright © 2016-2020 The Thingsboard Authors |
|||
* |
|||
* Licensed under the Apache License, Version 2.0 (the "License"); |
|||
* you may not use this file except in compliance with the License. |
|||
* You may obtain a copy of the License at |
|||
* |
|||
* http://www.apache.org/licenses/LICENSE-2.0
|
|||
* |
|||
* Unless required by applicable law or agreed to in writing, software |
|||
* distributed under the License is distributed on an "AS IS" BASIS, |
|||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|||
* See the License for the specific language governing permissions and |
|||
* limitations under the License. |
|||
*/ |
|||
package org.thingsboard.server.dao.cassandra.guava; |
|||
|
|||
import com.datastax.oss.driver.api.core.config.DriverConfigLoader; |
|||
import com.datastax.oss.driver.api.core.cql.PrepareRequest; |
|||
import com.datastax.oss.driver.api.core.cql.Statement; |
|||
import com.datastax.oss.driver.api.core.metadata.Node; |
|||
import com.datastax.oss.driver.api.core.metadata.NodeStateListener; |
|||
import com.datastax.oss.driver.api.core.metadata.schema.SchemaChangeListener; |
|||
import com.datastax.oss.driver.api.core.session.ProgrammaticArguments; |
|||
import com.datastax.oss.driver.api.core.tracker.RequestTracker; |
|||
import com.datastax.oss.driver.api.core.type.codec.TypeCodec; |
|||
import com.datastax.oss.driver.internal.core.context.DefaultDriverContext; |
|||
import com.datastax.oss.driver.internal.core.cql.CqlPrepareAsyncProcessor; |
|||
import com.datastax.oss.driver.internal.core.cql.CqlPrepareSyncProcessor; |
|||
import com.datastax.oss.driver.internal.core.cql.CqlRequestAsyncProcessor; |
|||
import com.datastax.oss.driver.internal.core.cql.CqlRequestSyncProcessor; |
|||
import com.datastax.oss.driver.internal.core.session.RequestProcessorRegistry; |
|||
import java.util.List; |
|||
import java.util.Map; |
|||
import java.util.function.Predicate; |
|||
|
|||
/** |
|||
* A Custom {@link DefaultDriverContext} that overrides {@link #getRequestProcessorRegistry()} to |
|||
* return a {@link RequestProcessorRegistry} that includes processors for returning guava futures. |
|||
*/ |
|||
public class GuavaDriverContext extends DefaultDriverContext { |
|||
|
|||
public GuavaDriverContext( |
|||
DriverConfigLoader configLoader, |
|||
List<TypeCodec<?>> typeCodecs, |
|||
NodeStateListener nodeStateListener, |
|||
SchemaChangeListener schemaChangeListener, |
|||
RequestTracker requestTracker, |
|||
Map<String, String> localDatacenters, |
|||
Map<String, Predicate<Node>> nodeFilters, |
|||
ClassLoader classLoader) { |
|||
super( |
|||
configLoader, |
|||
ProgrammaticArguments.builder() |
|||
.addTypeCodecs(typeCodecs.toArray(new TypeCodec<?>[0])) |
|||
.withNodeStateListener(nodeStateListener) |
|||
.withSchemaChangeListener(schemaChangeListener) |
|||
.withRequestTracker(requestTracker) |
|||
.withLocalDatacenters(localDatacenters) |
|||
.withNodeFilters(nodeFilters) |
|||
.withClassLoader(classLoader) |
|||
.build()); |
|||
} |
|||
|
|||
@Override |
|||
public RequestProcessorRegistry buildRequestProcessorRegistry() { |
|||
// Register the typical request processors, except instead of the normal async processors,
|
|||
// use GuavaRequestAsyncProcessor to return ListenableFutures in async methods.
|
|||
|
|||
CqlRequestAsyncProcessor cqlRequestAsyncProcessor = new CqlRequestAsyncProcessor(); |
|||
CqlPrepareAsyncProcessor cqlPrepareAsyncProcessor = new CqlPrepareAsyncProcessor(); |
|||
CqlRequestSyncProcessor cqlRequestSyncProcessor = |
|||
new CqlRequestSyncProcessor(cqlRequestAsyncProcessor); |
|||
|
|||
return new RequestProcessorRegistry( |
|||
getSessionName(), |
|||
cqlRequestSyncProcessor, |
|||
new CqlPrepareSyncProcessor(cqlPrepareAsyncProcessor), |
|||
new GuavaRequestAsyncProcessor<>( |
|||
cqlRequestAsyncProcessor, Statement.class, GuavaSession.ASYNC), |
|||
new GuavaRequestAsyncProcessor<>( |
|||
cqlPrepareAsyncProcessor, PrepareRequest.class, GuavaSession.ASYNC_PREPARED)); |
|||
} |
|||
} |
|||
@ -1,79 +0,0 @@ |
|||
/** |
|||
* Copyright © 2016-2020 The Thingsboard Authors |
|||
* |
|||
* Licensed under the Apache License, Version 2.0 (the "License"); |
|||
* you may not use this file except in compliance with the License. |
|||
* You may obtain a copy of the License at |
|||
* |
|||
* http://www.apache.org/licenses/LICENSE-2.0
|
|||
* |
|||
* Unless required by applicable law or agreed to in writing, software |
|||
* distributed under the License is distributed on an "AS IS" BASIS, |
|||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|||
* See the License for the specific language governing permissions and |
|||
* limitations under the License. |
|||
*/ |
|||
package org.thingsboard.server.dao.cassandra.guava; |
|||
|
|||
import com.datastax.oss.driver.api.core.session.Request; |
|||
import com.datastax.oss.driver.api.core.type.reflect.GenericType; |
|||
import com.datastax.oss.driver.internal.core.context.InternalDriverContext; |
|||
import com.datastax.oss.driver.internal.core.session.DefaultSession; |
|||
import com.datastax.oss.driver.internal.core.session.RequestProcessor; |
|||
import com.google.common.util.concurrent.Futures; |
|||
import com.google.common.util.concurrent.ListenableFuture; |
|||
import com.google.common.util.concurrent.SettableFuture; |
|||
import java.util.concurrent.CompletionStage; |
|||
|
|||
/** |
|||
* Wraps a {@link RequestProcessor} that returns {@link CompletionStage}s and converts them to a |
|||
* {@link ListenableFuture}s. |
|||
* |
|||
* @param <T> The type of request |
|||
* @param <U> The type of responses enclosed in the future response. |
|||
*/ |
|||
public class GuavaRequestAsyncProcessor<T extends Request, U> |
|||
implements RequestProcessor<T, ListenableFuture<U>> { |
|||
|
|||
private final RequestProcessor<T, CompletionStage<U>> subProcessor; |
|||
|
|||
private final GenericType resultType; |
|||
|
|||
private final Class<?> requestClass; |
|||
|
|||
GuavaRequestAsyncProcessor( |
|||
RequestProcessor<T, CompletionStage<U>> subProcessor, |
|||
Class<?> requestClass, |
|||
GenericType resultType) { |
|||
this.subProcessor = subProcessor; |
|||
this.requestClass = requestClass; |
|||
this.resultType = resultType; |
|||
} |
|||
|
|||
@Override |
|||
public boolean canProcess(Request request, GenericType resultType) { |
|||
return requestClass.isInstance(request) && resultType.equals(this.resultType); |
|||
} |
|||
|
|||
@Override |
|||
public ListenableFuture<U> process( |
|||
T request, DefaultSession session, InternalDriverContext context, String sessionLogPrefix) { |
|||
SettableFuture<U> future = SettableFuture.create(); |
|||
subProcessor |
|||
.process(request, session, context, sessionLogPrefix) |
|||
.whenComplete( |
|||
(r, ex) -> { |
|||
if (ex != null) { |
|||
future.setException(ex); |
|||
} else { |
|||
future.set(r); |
|||
} |
|||
}); |
|||
return future; |
|||
} |
|||
|
|||
@Override |
|||
public ListenableFuture<U> newFailure(RuntimeException error) { |
|||
return Futures.immediateFailedFuture(error); |
|||
} |
|||
} |
|||
@ -1,51 +0,0 @@ |
|||
/** |
|||
* Copyright © 2016-2020 The Thingsboard Authors |
|||
* |
|||
* Licensed under the Apache License, Version 2.0 (the "License"); |
|||
* you may not use this file except in compliance with the License. |
|||
* You may obtain a copy of the License at |
|||
* |
|||
* http://www.apache.org/licenses/LICENSE-2.0
|
|||
* |
|||
* Unless required by applicable law or agreed to in writing, software |
|||
* distributed under the License is distributed on an "AS IS" BASIS, |
|||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|||
* See the License for the specific language governing permissions and |
|||
* limitations under the License. |
|||
*/ |
|||
package org.thingsboard.server.dao.cassandra.guava; |
|||
|
|||
import com.datastax.oss.driver.api.core.cql.AsyncResultSet; |
|||
import com.datastax.oss.driver.api.core.cql.PreparedStatement; |
|||
import com.datastax.oss.driver.api.core.cql.SimpleStatement; |
|||
import com.datastax.oss.driver.api.core.cql.Statement; |
|||
import com.datastax.oss.driver.api.core.cql.SyncCqlSession; |
|||
import com.datastax.oss.driver.api.core.session.Session; |
|||
import com.datastax.oss.driver.api.core.type.reflect.GenericType; |
|||
import com.datastax.oss.driver.internal.core.cql.DefaultPrepareRequest; |
|||
import com.google.common.util.concurrent.ListenableFuture; |
|||
|
|||
public interface GuavaSession extends Session, SyncCqlSession { |
|||
|
|||
GenericType<ListenableFuture<AsyncResultSet>> ASYNC = |
|||
new GenericType<ListenableFuture<AsyncResultSet>>() {}; |
|||
|
|||
GenericType<ListenableFuture<PreparedStatement>> ASYNC_PREPARED = |
|||
new GenericType<ListenableFuture<PreparedStatement>>() {}; |
|||
|
|||
default ListenableFuture<AsyncResultSet> executeAsync(Statement<?> statement) { |
|||
return this.execute(statement, ASYNC); |
|||
} |
|||
|
|||
default ListenableFuture<AsyncResultSet> executeAsync(String statement) { |
|||
return this.executeAsync(SimpleStatement.newInstance(statement)); |
|||
} |
|||
|
|||
default ListenableFuture<PreparedStatement> prepareAsync(SimpleStatement statement) { |
|||
return this.execute(new DefaultPrepareRequest(statement), ASYNC_PREPARED); |
|||
} |
|||
|
|||
default ListenableFuture<PreparedStatement> prepareAsync(String statement) { |
|||
return this.prepareAsync(SimpleStatement.newInstance(statement)); |
|||
} |
|||
} |
|||
@ -1,59 +0,0 @@ |
|||
/** |
|||
* Copyright © 2016-2020 The Thingsboard Authors |
|||
* |
|||
* Licensed under the Apache License, Version 2.0 (the "License"); |
|||
* you may not use this file except in compliance with the License. |
|||
* You may obtain a copy of the License at |
|||
* |
|||
* http://www.apache.org/licenses/LICENSE-2.0
|
|||
* |
|||
* Unless required by applicable law or agreed to in writing, software |
|||
* distributed under the License is distributed on an "AS IS" BASIS, |
|||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|||
* See the License for the specific language governing permissions and |
|||
* limitations under the License. |
|||
*/ |
|||
package org.thingsboard.server.dao.cassandra.guava; |
|||
|
|||
import com.datastax.oss.driver.api.core.CqlSession; |
|||
import com.datastax.oss.driver.api.core.config.DriverConfigLoader; |
|||
import com.datastax.oss.driver.api.core.context.DriverContext; |
|||
import com.datastax.oss.driver.api.core.metadata.Node; |
|||
import com.datastax.oss.driver.api.core.metadata.NodeStateListener; |
|||
import com.datastax.oss.driver.api.core.metadata.schema.SchemaChangeListener; |
|||
import com.datastax.oss.driver.api.core.session.SessionBuilder; |
|||
import com.datastax.oss.driver.api.core.tracker.RequestTracker; |
|||
import com.datastax.oss.driver.api.core.type.codec.TypeCodec; |
|||
import edu.umd.cs.findbugs.annotations.NonNull; |
|||
import java.util.List; |
|||
import java.util.Map; |
|||
import java.util.function.Predicate; |
|||
|
|||
public class GuavaSessionBuilder extends SessionBuilder<GuavaSessionBuilder, GuavaSession> { |
|||
|
|||
@Override |
|||
protected DriverContext buildContext( |
|||
DriverConfigLoader configLoader, |
|||
List<TypeCodec<?>> typeCodecs, |
|||
NodeStateListener nodeStateListener, |
|||
SchemaChangeListener schemaChangeListener, |
|||
RequestTracker requestTracker, |
|||
Map<String, String> localDatacenters, |
|||
Map<String, Predicate<Node>> nodeFilters, |
|||
ClassLoader classLoader) { |
|||
return new GuavaDriverContext( |
|||
configLoader, |
|||
typeCodecs, |
|||
nodeStateListener, |
|||
schemaChangeListener, |
|||
requestTracker, |
|||
localDatacenters, |
|||
nodeFilters, |
|||
classLoader); |
|||
} |
|||
|
|||
@Override |
|||
protected GuavaSession wrap(@NonNull CqlSession defaultSession) { |
|||
return new DefaultGuavaSession(defaultSession); |
|||
} |
|||
} |
|||
Some files were not shown because too many files changed in this diff
Loading…
Reference in new issue