|
|
|
@ -15,6 +15,7 @@ |
|
|
|
*/ |
|
|
|
package org.thingsboard.server.dao.util; |
|
|
|
|
|
|
|
import com.datastax.driver.core.*; |
|
|
|
import com.google.common.util.concurrent.FutureCallback; |
|
|
|
import com.google.common.util.concurrent.Futures; |
|
|
|
import com.google.common.util.concurrent.ListenableFuture; |
|
|
|
@ -22,6 +23,7 @@ import com.google.common.util.concurrent.SettableFuture; |
|
|
|
import lombok.extern.slf4j.Slf4j; |
|
|
|
import org.thingsboard.server.common.data.id.TenantId; |
|
|
|
import org.thingsboard.server.common.msg.tools.TbRateLimits; |
|
|
|
import org.thingsboard.server.dao.nosql.CassandraStatementTask; |
|
|
|
|
|
|
|
import javax.annotation.Nullable; |
|
|
|
import java.util.UUID; |
|
|
|
@ -183,12 +185,39 @@ public abstract class AbstractBufferedRateExecutor<T extends AsyncTask, F extend |
|
|
|
|
|
|
|
private void logTask(String action, AsyncTaskContext<T, V> taskCtx) { |
|
|
|
if (log.isTraceEnabled()) { |
|
|
|
log.trace("[{}] {} task: {}", taskCtx.getId(), action, taskCtx); |
|
|
|
if (taskCtx.getTask() instanceof CassandraStatementTask) { |
|
|
|
CassandraStatementTask cassStmtTask = (CassandraStatementTask) taskCtx.getTask(); |
|
|
|
if (cassStmtTask.getStatement() instanceof BoundStatement) { |
|
|
|
BoundStatement stmt = (BoundStatement) cassStmtTask.getStatement(); |
|
|
|
String query = toStringWithValues(stmt, ProtocolVersion.V5); |
|
|
|
log.trace("[{}] {} task: {}, BoundStatement query: {}", taskCtx.getId(), action, taskCtx, query); |
|
|
|
} |
|
|
|
} else { |
|
|
|
log.trace("[{}] {} task: {}", taskCtx.getId(), action, taskCtx); |
|
|
|
} |
|
|
|
} else { |
|
|
|
log.debug("[{}] {} task", taskCtx.getId(), action); |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
private static String toStringWithValues(BoundStatement boundStatement, ProtocolVersion protocolVersion) { |
|
|
|
CodecRegistry codecRegistry = boundStatement.preparedStatement().getCodecRegistry(); |
|
|
|
PreparedStatement preparedStatement = boundStatement.preparedStatement(); |
|
|
|
String query = preparedStatement.getQueryString(); |
|
|
|
ColumnDefinitions defs = preparedStatement.getVariables(); |
|
|
|
int index = 0; |
|
|
|
for (ColumnDefinitions.Definition def : defs) { |
|
|
|
DataType type = def.getType(); |
|
|
|
TypeCodec<Object> codec = codecRegistry.codecFor(type); |
|
|
|
if (boundStatement.getBytesUnsafe(index) != null) { |
|
|
|
Object value = codec.deserialize(boundStatement.getBytesUnsafe(index), protocolVersion); |
|
|
|
query = query.replaceFirst("\\?", codec.format(value)); |
|
|
|
} |
|
|
|
index++; |
|
|
|
} |
|
|
|
return query; |
|
|
|
} |
|
|
|
|
|
|
|
protected int getQueueSize() { |
|
|
|
return queue.size(); |
|
|
|
} |
|
|
|
|