13 changed files with 639 additions and 23 deletions
@ -0,0 +1,178 @@ |
|||
DROP MATERIALIZED VIEW IF EXISTS thingsboard.device_by_tenant_and_name; |
|||
DROP MATERIALIZED VIEW IF EXISTS thingsboard.device_by_tenant_and_search_text; |
|||
DROP MATERIALIZED VIEW IF EXISTS thingsboard.device_by_tenant_by_type_and_search_text; |
|||
DROP MATERIALIZED VIEW IF EXISTS thingsboard.device_by_customer_and_search_text; |
|||
DROP MATERIALIZED VIEW IF EXISTS thingsboard.device_by_customer_by_type_and_search_text; |
|||
DROP MATERIALIZED VIEW IF EXISTS thingsboard.device_types_by_tenant; |
|||
|
|||
DROP TABLE IF EXISTS thingsboard.device; |
|||
|
|||
CREATE TABLE IF NOT EXISTS thingsboard.device ( |
|||
id timeuuid, |
|||
tenant_id timeuuid, |
|||
customer_id timeuuid, |
|||
name text, |
|||
type text, |
|||
search_text text, |
|||
additional_info text, |
|||
PRIMARY KEY (id, tenant_id, customer_id, type) |
|||
); |
|||
|
|||
CREATE MATERIALIZED VIEW IF NOT EXISTS thingsboard.device_by_tenant_and_name AS |
|||
SELECT * |
|||
from thingsboard.device |
|||
WHERE tenant_id IS NOT NULL AND customer_id IS NOT NULL AND type IS NOT NULL AND name IS NOT NULL AND id IS NOT NULL |
|||
PRIMARY KEY ( tenant_id, name, id, customer_id, type) |
|||
WITH CLUSTERING ORDER BY ( name ASC, id DESC, customer_id DESC); |
|||
|
|||
CREATE MATERIALIZED VIEW IF NOT EXISTS thingsboard.device_by_tenant_and_search_text AS |
|||
SELECT * |
|||
from thingsboard.device |
|||
WHERE tenant_id IS NOT NULL AND customer_id IS NOT NULL AND type IS NOT NULL AND search_text IS NOT NULL AND id IS NOT NULL |
|||
PRIMARY KEY ( tenant_id, search_text, id, customer_id, type) |
|||
WITH CLUSTERING ORDER BY ( search_text ASC, id DESC, customer_id DESC); |
|||
|
|||
CREATE MATERIALIZED VIEW IF NOT EXISTS thingsboard.device_by_tenant_by_type_and_search_text AS |
|||
SELECT * |
|||
from thingsboard.device |
|||
WHERE tenant_id IS NOT NULL AND customer_id IS NOT NULL AND type IS NOT NULL AND search_text IS NOT NULL AND id IS NOT NULL |
|||
PRIMARY KEY ( tenant_id, type, search_text, id, customer_id) |
|||
WITH CLUSTERING ORDER BY ( type ASC, search_text ASC, id DESC, customer_id DESC); |
|||
|
|||
CREATE MATERIALIZED VIEW IF NOT EXISTS thingsboard.device_by_customer_and_search_text AS |
|||
SELECT * |
|||
from thingsboard.device |
|||
WHERE tenant_id IS NOT NULL AND customer_id IS NOT NULL AND type IS NOT NULL AND search_text IS NOT NULL AND id IS NOT NULL |
|||
PRIMARY KEY ( customer_id, tenant_id, search_text, id, type ) |
|||
WITH CLUSTERING ORDER BY ( tenant_id DESC, search_text ASC, id DESC ); |
|||
|
|||
CREATE MATERIALIZED VIEW IF NOT EXISTS thingsboard.device_by_customer_by_type_and_search_text AS |
|||
SELECT * |
|||
from thingsboard.device |
|||
WHERE tenant_id IS NOT NULL AND customer_id IS NOT NULL AND type IS NOT NULL AND search_text IS NOT NULL AND id IS NOT NULL |
|||
PRIMARY KEY ( customer_id, tenant_id, type, search_text, id ) |
|||
WITH CLUSTERING ORDER BY ( tenant_id DESC, type ASC, search_text ASC, id DESC ); |
|||
|
|||
CREATE MATERIALIZED VIEW IF NOT EXISTS thingsboard.device_types_by_tenant AS |
|||
SELECT * |
|||
from thingsboard.device |
|||
WHERE tenant_id IS NOT NULL AND customer_id IS NOT NULL AND type IS NOT NULL AND id IS NOT NULL |
|||
PRIMARY KEY ( (type, tenant_id), id, customer_id) |
|||
WITH CLUSTERING ORDER BY ( id ASC, customer_id DESC); |
|||
|
|||
DROP MATERIALIZED VIEW IF EXISTS thingsboard.asset_by_tenant_and_name; |
|||
DROP MATERIALIZED VIEW IF EXISTS thingsboard.asset_by_tenant_and_search_text; |
|||
DROP MATERIALIZED VIEW IF EXISTS thingsboard.asset_by_tenant_by_type_and_search_text; |
|||
DROP MATERIALIZED VIEW IF EXISTS thingsboard.asset_by_customer_and_search_text; |
|||
DROP MATERIALIZED VIEW IF EXISTS thingsboard.asset_by_customer_by_type_and_search_text; |
|||
DROP MATERIALIZED VIEW IF EXISTS thingsboard.asset_types_by_tenant; |
|||
|
|||
DROP TABLE IF EXISTS thingsboard.asset; |
|||
|
|||
CREATE TABLE IF NOT EXISTS thingsboard.asset ( |
|||
id timeuuid, |
|||
tenant_id timeuuid, |
|||
customer_id timeuuid, |
|||
name text, |
|||
type text, |
|||
search_text text, |
|||
additional_info text, |
|||
PRIMARY KEY (id, tenant_id, customer_id, type) |
|||
); |
|||
|
|||
CREATE MATERIALIZED VIEW IF NOT EXISTS thingsboard.asset_by_tenant_and_name AS |
|||
SELECT * |
|||
from thingsboard.asset |
|||
WHERE tenant_id IS NOT NULL AND customer_id IS NOT NULL AND type IS NOT NULL AND name IS NOT NULL AND id IS NOT NULL |
|||
PRIMARY KEY ( tenant_id, name, id, customer_id, type) |
|||
WITH CLUSTERING ORDER BY ( name ASC, id DESC, customer_id DESC); |
|||
|
|||
CREATE MATERIALIZED VIEW IF NOT EXISTS thingsboard.asset_by_tenant_and_search_text AS |
|||
SELECT * |
|||
from thingsboard.asset |
|||
WHERE tenant_id IS NOT NULL AND customer_id IS NOT NULL AND type IS NOT NULL AND search_text IS NOT NULL AND id IS NOT NULL |
|||
PRIMARY KEY ( tenant_id, search_text, id, customer_id, type) |
|||
WITH CLUSTERING ORDER BY ( search_text ASC, id DESC, customer_id DESC); |
|||
|
|||
CREATE MATERIALIZED VIEW IF NOT EXISTS thingsboard.asset_by_tenant_by_type_and_search_text AS |
|||
SELECT * |
|||
from thingsboard.asset |
|||
WHERE tenant_id IS NOT NULL AND customer_id IS NOT NULL AND type IS NOT NULL AND search_text IS NOT NULL AND id IS NOT NULL |
|||
PRIMARY KEY ( tenant_id, type, search_text, id, customer_id) |
|||
WITH CLUSTERING ORDER BY ( type ASC, search_text ASC, id DESC, customer_id DESC); |
|||
|
|||
CREATE MATERIALIZED VIEW IF NOT EXISTS thingsboard.asset_by_customer_and_search_text AS |
|||
SELECT * |
|||
from thingsboard.asset |
|||
WHERE tenant_id IS NOT NULL AND customer_id IS NOT NULL AND type IS NOT NULL AND search_text IS NOT NULL AND id IS NOT NULL |
|||
PRIMARY KEY ( customer_id, tenant_id, search_text, id, type ) |
|||
WITH CLUSTERING ORDER BY ( tenant_id DESC, search_text ASC, id DESC ); |
|||
|
|||
CREATE MATERIALIZED VIEW IF NOT EXISTS thingsboard.asset_by_customer_by_type_and_search_text AS |
|||
SELECT * |
|||
from thingsboard.asset |
|||
WHERE tenant_id IS NOT NULL AND customer_id IS NOT NULL AND type IS NOT NULL AND search_text IS NOT NULL AND id IS NOT NULL |
|||
PRIMARY KEY ( customer_id, tenant_id, type, search_text, id ) |
|||
WITH CLUSTERING ORDER BY ( tenant_id DESC, type ASC, search_text ASC, id DESC ); |
|||
|
|||
CREATE MATERIALIZED VIEW IF NOT EXISTS thingsboard.asset_types_by_tenant AS |
|||
SELECT * |
|||
from thingsboard.asset |
|||
WHERE tenant_id IS NOT NULL AND customer_id IS NOT NULL AND type IS NOT NULL AND id IS NOT NULL |
|||
PRIMARY KEY ( (type, tenant_id), id, customer_id) |
|||
WITH CLUSTERING ORDER BY ( id ASC, customer_id DESC); |
|||
|
|||
CREATE TABLE IF NOT EXISTS thingsboard.alarm ( |
|||
id timeuuid, |
|||
tenant_id timeuuid, |
|||
type text, |
|||
originator_id timeuuid, |
|||
originator_type text, |
|||
severity text, |
|||
status text, |
|||
start_ts bigint, |
|||
end_ts bigint, |
|||
ack_ts bigint, |
|||
clear_ts bigint, |
|||
details text, |
|||
propagate boolean, |
|||
PRIMARY KEY ((tenant_id, originator_id, originator_type), type, id) |
|||
) WITH CLUSTERING ORDER BY ( type ASC, id DESC); |
|||
|
|||
CREATE MATERIALIZED VIEW IF NOT EXISTS thingsboard.alarm_by_id AS |
|||
SELECT * |
|||
from thingsboard.alarm |
|||
WHERE tenant_id IS NOT NULL AND originator_id IS NOT NULL AND originator_type IS NOT NULL AND type IS NOT NULL |
|||
AND type IS NOT NULL AND id IS NOT NULL |
|||
PRIMARY KEY (id, tenant_id, originator_id, originator_type, type) |
|||
WITH CLUSTERING ORDER BY ( tenant_id ASC, originator_id ASC, originator_type ASC, type ASC); |
|||
|
|||
DROP MATERIALIZED VIEW IF EXISTS thingsboard.relation_by_type_and_child_type; |
|||
DROP MATERIALIZED VIEW IF EXISTS thingsboard.reverse_relation; |
|||
|
|||
DROP TABLE IF EXISTS thingsboard.relation; |
|||
|
|||
CREATE TABLE IF NOT EXISTS thingsboard.relation ( |
|||
from_id timeuuid, |
|||
from_type text, |
|||
to_id timeuuid, |
|||
to_type text, |
|||
relation_type_group text, |
|||
relation_type text, |
|||
additional_info text, |
|||
PRIMARY KEY ((from_id, from_type), relation_type_group, relation_type, to_id, to_type) |
|||
) WITH CLUSTERING ORDER BY ( relation_type_group ASC, relation_type ASC, to_id ASC, to_type ASC); |
|||
|
|||
CREATE MATERIALIZED VIEW IF NOT EXISTS thingsboard.relation_by_type_and_child_type AS |
|||
SELECT * |
|||
from thingsboard.relation |
|||
WHERE from_id IS NOT NULL AND from_type IS NOT NULL AND relation_type_group IS NOT NULL AND relation_type IS NOT NULL AND to_id IS NOT NULL AND to_type IS NOT NULL |
|||
PRIMARY KEY ((from_id, from_type), relation_type_group, relation_type, to_type, to_id) |
|||
WITH CLUSTERING ORDER BY ( relation_type_group ASC, relation_type ASC, to_type ASC, to_id DESC); |
|||
|
|||
CREATE MATERIALIZED VIEW IF NOT EXISTS thingsboard.reverse_relation AS |
|||
SELECT * |
|||
from thingsboard.relation |
|||
WHERE from_id IS NOT NULL AND from_type IS NOT NULL AND relation_type_group IS NOT NULL AND relation_type IS NOT NULL AND to_id IS NOT NULL AND to_type IS NOT NULL |
|||
PRIMARY KEY ((to_id, to_type), relation_type_group, relation_type, from_id, from_type) |
|||
WITH CLUSTERING ORDER BY ( relation_type_group ASC, relation_type ASC, from_id ASC, from_type ASC); |
|||
@ -0,0 +1,132 @@ |
|||
/** |
|||
* Copyright © 2016-2017 The Thingsboard Authors |
|||
* |
|||
* Licensed under the Apache License, Version 2.0 (the "License"); |
|||
* you may not use this file except in compliance with the License. |
|||
* You may obtain a copy of the License at |
|||
* |
|||
* http://www.apache.org/licenses/LICENSE-2.0
|
|||
* |
|||
* Unless required by applicable law or agreed to in writing, software |
|||
* distributed under the License is distributed on an "AS IS" BASIS, |
|||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|||
* See the License for the specific language governing permissions and |
|||
* limitations under the License. |
|||
*/ |
|||
|
|||
package org.thingsboard.server.service.install; |
|||
|
|||
import com.datastax.driver.core.KeyspaceMetadata; |
|||
import lombok.extern.slf4j.Slf4j; |
|||
import org.springframework.beans.factory.annotation.Autowired; |
|||
import org.springframework.beans.factory.annotation.Value; |
|||
import org.springframework.context.annotation.Profile; |
|||
import org.springframework.stereotype.Service; |
|||
import org.thingsboard.server.dao.cassandra.CassandraCluster; |
|||
import org.thingsboard.server.dao.cassandra.CassandraInstallCluster; |
|||
import org.thingsboard.server.dao.util.NoSqlDao; |
|||
import org.thingsboard.server.service.install.cql.CQLStatementsParser; |
|||
import org.thingsboard.server.service.install.cql.CassandraDbHelper; |
|||
|
|||
import java.nio.file.Files; |
|||
import java.nio.file.Path; |
|||
import java.nio.file.Paths; |
|||
import java.util.List; |
|||
|
|||
@Service |
|||
@NoSqlDao |
|||
@Profile("install") |
|||
@Slf4j |
|||
public class CassandraDatabaseUpgradeService implements DatabaseUpgradeService { |
|||
|
|||
private static final String SCHEMA_UPDATE_CQL = "schema_update.cql"; |
|||
|
|||
@Value("${install.data_dir}") |
|||
private String dataDir; |
|||
|
|||
@Autowired |
|||
private CassandraCluster cluster; |
|||
|
|||
@Autowired |
|||
private CassandraInstallCluster installCluster; |
|||
|
|||
@Override |
|||
public void upgradeDatabase(String fromVersion) throws Exception { |
|||
|
|||
switch (fromVersion) { |
|||
case "1.2.3": |
|||
|
|||
log.info("Upgrading Cassandara DataBase from version {} to 1.3.0 ...", fromVersion); |
|||
|
|||
//Dump devices, assets and relations
|
|||
|
|||
KeyspaceMetadata ks = cluster.getCluster().getMetadata().getKeyspace(cluster.getKeyspaceName()); |
|||
|
|||
log.info("Dumping devices ..."); |
|||
Path devicesDump = CassandraDbHelper.dumpCfIfExists(ks, cluster.getSession(), "device", |
|||
new String[]{"id", "tenant_id", "customer_id", "name", "search_text", "additional_info"}, |
|||
"tb-devices"); |
|||
if (devicesDump != null) { |
|||
CassandraDbHelper.appendToEndOfLine(devicesDump, "default"); |
|||
} |
|||
log.info("Devices dumped."); |
|||
|
|||
log.info("Dumping assets ..."); |
|||
Path assetsDump = CassandraDbHelper.dumpCfIfExists(ks, cluster.getSession(), "asset", |
|||
new String[]{"id", "tenant_id", "customer_id", "name", "search_text", "additional_info", "type"}, |
|||
"tb-assets"); |
|||
log.info("Assets dumped."); |
|||
|
|||
log.info("Dumping relations ..."); |
|||
Path relationsDump = CassandraDbHelper.dumpCfIfExists(ks, cluster.getSession(), "relation", |
|||
new String[]{"from_id", "from_type", "to_id", "to_type", "relation_type", "additional_info"}, |
|||
"tb-relations"); |
|||
if (relationsDump != null) { |
|||
CassandraDbHelper.appendToEndOfLine(relationsDump, "COMMON"); |
|||
} |
|||
log.info("Relations dumped."); |
|||
|
|||
log.info("Updating schema ..."); |
|||
Path schemaUpdateFile = Paths.get(this.dataDir, "upgrade", "1.3.0", SCHEMA_UPDATE_CQL); |
|||
loadCql(schemaUpdateFile); |
|||
log.info("Schema updated."); |
|||
|
|||
//Restore devices, assets and relations
|
|||
|
|||
log.info("Restoring devices ..."); |
|||
if (devicesDump != null) { |
|||
CassandraDbHelper.loadCf(ks, cluster.getSession(), "device", |
|||
new String[]{"id", "tenant_id", "customer_id", "name", "search_text", "additional_info", "type"}, devicesDump); |
|||
Files.deleteIfExists(devicesDump); |
|||
} |
|||
log.info("Devices restored."); |
|||
|
|||
log.info("Restoring assets ..."); |
|||
if (assetsDump != null) { |
|||
CassandraDbHelper.loadCf(ks, cluster.getSession(), "asset", |
|||
new String[]{"id", "tenant_id", "customer_id", "name", "search_text", "additional_info", "type"}, assetsDump); |
|||
Files.deleteIfExists(assetsDump); |
|||
} |
|||
log.info("Assets restored."); |
|||
|
|||
log.info("Restoring relations ..."); |
|||
if (relationsDump != null) { |
|||
CassandraDbHelper.loadCf(ks, cluster.getSession(), "relation", |
|||
new String[]{"from_id", "from_type", "to_id", "to_type", "relation_type", "additional_info", "relation_type_group"}, relationsDump); |
|||
Files.deleteIfExists(relationsDump); |
|||
} |
|||
log.info("Relations restored."); |
|||
|
|||
break; |
|||
default: |
|||
throw new RuntimeException("Unable to upgrade Cassandra database, unsupported fromVersion: " + fromVersion); |
|||
} |
|||
|
|||
} |
|||
|
|||
private void loadCql(Path cql) throws Exception { |
|||
List<String> statements = new CQLStatementsParser(cql).getStatements(); |
|||
statements.forEach(statement -> installCluster.getSession().execute(statement)); |
|||
} |
|||
|
|||
} |
|||
@ -0,0 +1,23 @@ |
|||
/** |
|||
* Copyright © 2016-2017 The Thingsboard Authors |
|||
* |
|||
* Licensed under the Apache License, Version 2.0 (the "License"); |
|||
* you may not use this file except in compliance with the License. |
|||
* You may obtain a copy of the License at |
|||
* |
|||
* http://www.apache.org/licenses/LICENSE-2.0
|
|||
* |
|||
* Unless required by applicable law or agreed to in writing, software |
|||
* distributed under the License is distributed on an "AS IS" BASIS, |
|||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|||
* See the License for the specific language governing permissions and |
|||
* limitations under the License. |
|||
*/ |
|||
|
|||
package org.thingsboard.server.service.install; |
|||
|
|||
public interface DatabaseUpgradeService { |
|||
|
|||
void upgradeDatabase(String fromVersion) throws Exception; |
|||
|
|||
} |
|||
@ -0,0 +1,37 @@ |
|||
/** |
|||
* Copyright © 2016-2017 The Thingsboard Authors |
|||
* |
|||
* Licensed under the Apache License, Version 2.0 (the "License"); |
|||
* you may not use this file except in compliance with the License. |
|||
* You may obtain a copy of the License at |
|||
* |
|||
* http://www.apache.org/licenses/LICENSE-2.0
|
|||
* |
|||
* Unless required by applicable law or agreed to in writing, software |
|||
* distributed under the License is distributed on an "AS IS" BASIS, |
|||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|||
* See the License for the specific language governing permissions and |
|||
* limitations under the License. |
|||
*/ |
|||
|
|||
package org.thingsboard.server.service.install; |
|||
|
|||
import lombok.extern.slf4j.Slf4j; |
|||
import org.springframework.context.annotation.Profile; |
|||
import org.springframework.stereotype.Service; |
|||
import org.thingsboard.server.dao.util.SqlDao; |
|||
|
|||
@Service |
|||
@Profile("install") |
|||
@Slf4j |
|||
@SqlDao |
|||
public class SqlDatabaseUpgradeService implements DatabaseUpgradeService { |
|||
|
|||
@Override |
|||
public void upgradeDatabase(String fromVersion) throws Exception { |
|||
switch (fromVersion) { |
|||
default: |
|||
throw new RuntimeException("Unable to upgrade SQL database, unsupported fromVersion: " + fromVersion); |
|||
} |
|||
} |
|||
} |
|||
@ -0,0 +1,169 @@ |
|||
/** |
|||
* Copyright © 2016-2017 The Thingsboard Authors |
|||
* |
|||
* Licensed under the Apache License, Version 2.0 (the "License"); |
|||
* you may not use this file except in compliance with the License. |
|||
* You may obtain a copy of the License at |
|||
* |
|||
* http://www.apache.org/licenses/LICENSE-2.0
|
|||
* |
|||
* Unless required by applicable law or agreed to in writing, software |
|||
* distributed under the License is distributed on an "AS IS" BASIS, |
|||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|||
* See the License for the specific language governing permissions and |
|||
* limitations under the License. |
|||
*/ |
|||
|
|||
package org.thingsboard.server.service.install.cql; |
|||
|
|||
import com.datastax.driver.core.*; |
|||
import org.apache.commons.csv.CSVFormat; |
|||
import org.apache.commons.csv.CSVParser; |
|||
import org.apache.commons.csv.CSVPrinter; |
|||
import org.apache.commons.csv.CSVRecord; |
|||
|
|||
import java.io.IOException; |
|||
import java.nio.file.Files; |
|||
import java.nio.file.Path; |
|||
import java.nio.file.StandardCopyOption; |
|||
import java.util.*; |
|||
|
|||
public class CassandraDbHelper { |
|||
|
|||
private static final CSVFormat CSV_DUMP_FORMAT = CSVFormat.DEFAULT.withNullString("\\N"); |
|||
|
|||
public static Path dumpCfIfExists(KeyspaceMetadata ks, Session session, String cfName, |
|||
String[] columns, String dumpPrefix) throws Exception { |
|||
if (ks.getTable(cfName) != null) { |
|||
Path dumpFile = Files.createTempFile(dumpPrefix, null); |
|||
Files.deleteIfExists(dumpFile); |
|||
try (CSVPrinter csvPrinter = new CSVPrinter(Files.newBufferedWriter(dumpFile), CSV_DUMP_FORMAT)) { |
|||
Statement stmt = new SimpleStatement("SELECT * FROM " + cfName); |
|||
stmt.setFetchSize(1000); |
|||
ResultSet rs = session.execute(stmt); |
|||
Iterator<Row> iter = rs.iterator(); |
|||
while (iter.hasNext()) { |
|||
Row row = iter.next(); |
|||
if (row != null) { |
|||
dumpRow(row, columns, csvPrinter); |
|||
} |
|||
} |
|||
} |
|||
return dumpFile; |
|||
} else { |
|||
return null; |
|||
} |
|||
} |
|||
|
|||
public static void appendToEndOfLine(Path targetDumpFile, String toAppend) throws Exception { |
|||
Path tmp = Files.createTempFile(null, null); |
|||
try (CSVParser csvParser = new CSVParser(Files.newBufferedReader(targetDumpFile), CSV_DUMP_FORMAT)) { |
|||
try (CSVPrinter csvPrinter = new CSVPrinter(Files.newBufferedWriter(tmp), CSV_DUMP_FORMAT)) { |
|||
csvParser.forEach(record -> { |
|||
List<String> newRecord = new ArrayList<>(); |
|||
record.forEach(val -> newRecord.add(val)); |
|||
newRecord.add(toAppend); |
|||
try { |
|||
csvPrinter.printRecord(newRecord); |
|||
} catch (IOException e) { |
|||
throw new RuntimeException("Error appending to EOL", e); |
|||
} |
|||
}); |
|||
} |
|||
} |
|||
Files.move(tmp, targetDumpFile, StandardCopyOption.REPLACE_EXISTING); |
|||
} |
|||
|
|||
public static void loadCf(KeyspaceMetadata ks, Session session, String cfName, String[] columns, Path sourceFile) throws Exception { |
|||
TableMetadata tableMetadata = ks.getTable(cfName); |
|||
PreparedStatement prepared = session.prepare(createInsertStatement(cfName, columns)); |
|||
try (CSVParser csvParser = new CSVParser(Files.newBufferedReader(sourceFile), CSV_DUMP_FORMAT.withHeader(columns))) { |
|||
csvParser.forEach(record -> { |
|||
BoundStatement boundStatement = prepared.bind(); |
|||
for (String column : columns) { |
|||
setColumnValue(tableMetadata, column, record, boundStatement); |
|||
} |
|||
session.execute(boundStatement); |
|||
}); |
|||
} |
|||
} |
|||
|
|||
|
|||
private static void dumpRow(Row row, String[] columns, CSVPrinter csvPrinter) throws Exception { |
|||
List<String> record = new ArrayList<>(); |
|||
for (String column : columns) { |
|||
record.add(getColumnValue(column, row)); |
|||
} |
|||
csvPrinter.printRecord(record); |
|||
} |
|||
|
|||
private static String getColumnValue(String column, Row row) { |
|||
String str = ""; |
|||
int index = row.getColumnDefinitions().getIndexOf(column); |
|||
if (index > -1) { |
|||
DataType type = row.getColumnDefinitions().getType(index); |
|||
try { |
|||
if (row.isNull(index)) { |
|||
return null; |
|||
} else if (type == DataType.cdouble()) { |
|||
str = new Double(row.getDouble(index)).toString(); |
|||
} else if (type == DataType.cint()) { |
|||
str = new Integer(row.getInt(index)).toString(); |
|||
} else if (type == DataType.uuid()) { |
|||
str = row.getUUID(index).toString(); |
|||
} else if (type == DataType.timeuuid()) { |
|||
str = row.getUUID(index).toString(); |
|||
} else if (type == DataType.cfloat()) { |
|||
str = new Float(row.getFloat(index)).toString(); |
|||
} else if (type == DataType.timestamp()) { |
|||
str = ""+row.getTimestamp(index).getTime(); |
|||
} else { |
|||
str = row.getString(index); |
|||
} |
|||
} catch (Exception e) { |
|||
str = ""; |
|||
} |
|||
} |
|||
return str; |
|||
} |
|||
|
|||
private static String createInsertStatement(String cfName, String[] columns) { |
|||
StringBuilder insertStatementBuilder = new StringBuilder(); |
|||
insertStatementBuilder.append("INSERT INTO ").append(cfName).append(" ("); |
|||
for (String column : columns) { |
|||
insertStatementBuilder.append(column).append(","); |
|||
} |
|||
insertStatementBuilder.deleteCharAt(insertStatementBuilder.length() - 1); |
|||
insertStatementBuilder.append(") VALUES ("); |
|||
for (String column : columns) { |
|||
insertStatementBuilder.append("?").append(","); |
|||
} |
|||
insertStatementBuilder.deleteCharAt(insertStatementBuilder.length() - 1); |
|||
insertStatementBuilder.append(")"); |
|||
return insertStatementBuilder.toString(); |
|||
} |
|||
|
|||
private static void setColumnValue(TableMetadata tableMetadata, String column, |
|||
CSVRecord record, BoundStatement boundStatement) { |
|||
String value = record.get(column); |
|||
DataType type = tableMetadata.getColumn(column).getType(); |
|||
if (value == null) { |
|||
boundStatement.setToNull(column); |
|||
} else if (type == DataType.cdouble()) { |
|||
boundStatement.setDouble(column, Double.valueOf(value)); |
|||
} else if (type == DataType.cint()) { |
|||
boundStatement.setInt(column, Integer.valueOf(value)); |
|||
} else if (type == DataType.uuid()) { |
|||
boundStatement.setUUID(column, UUID.fromString(value)); |
|||
} else if (type == DataType.timeuuid()) { |
|||
boundStatement.setUUID(column, UUID.fromString(value)); |
|||
} else if (type == DataType.cfloat()) { |
|||
boundStatement.setFloat(column, Float.valueOf(value)); |
|||
} else if (type == DataType.timestamp()) { |
|||
boundStatement.setTimestamp(column, new Date(Long.valueOf(value))); |
|||
} else { |
|||
boundStatement.setString(column, value); |
|||
} |
|||
} |
|||
|
|||
} |
|||
Loading…
Reference in new issue