9 changed files with 327 additions and 193 deletions
@ -0,0 +1,130 @@ |
|||
/** |
|||
* Copyright © 2016-2018 The Thingsboard Authors |
|||
* <p> |
|||
* Licensed under the Apache License, Version 2.0 (the "License"); |
|||
* you may not use this file except in compliance with the License. |
|||
* You may obtain a copy of the License at |
|||
* <p> |
|||
* http://www.apache.org/licenses/LICENSE-2.0
|
|||
* <p> |
|||
* Unless required by applicable law or agreed to in writing, software |
|||
* distributed under the License is distributed on an "AS IS" BASIS, |
|||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|||
* See the License for the specific language governing permissions and |
|||
* limitations under the License. |
|||
*/ |
|||
package org.thingsboard.rule.engine.action; |
|||
|
|||
import com.google.common.cache.CacheBuilder; |
|||
import com.google.common.cache.CacheLoader; |
|||
import com.google.common.cache.LoadingCache; |
|||
import com.google.common.util.concurrent.Futures; |
|||
import com.google.common.util.concurrent.ListenableFuture; |
|||
import lombok.AllArgsConstructor; |
|||
import lombok.Data; |
|||
import lombok.extern.slf4j.Slf4j; |
|||
import org.thingsboard.rule.engine.api.TbContext; |
|||
import org.thingsboard.rule.engine.api.TbNode; |
|||
import org.thingsboard.rule.engine.api.TbNodeConfiguration; |
|||
import org.thingsboard.rule.engine.api.TbNodeException; |
|||
import org.thingsboard.rule.engine.api.util.TbNodeUtils; |
|||
import org.thingsboard.server.common.data.Customer; |
|||
import org.thingsboard.server.common.data.id.CustomerId; |
|||
import org.thingsboard.server.common.msg.TbMsg; |
|||
import org.thingsboard.server.dao.customer.CustomerService; |
|||
|
|||
import java.util.Optional; |
|||
import java.util.concurrent.TimeUnit; |
|||
|
|||
import static org.thingsboard.rule.engine.api.util.DonAsynchron.withCallback; |
|||
|
|||
@Slf4j |
|||
public abstract class TbAbstractCustomerActionNode<C extends TbAbstractCustomerActionNodeConfiguration> implements TbNode { |
|||
|
|||
protected C config; |
|||
|
|||
private LoadingCache<CustomerKey, Optional<CustomerId>> customerIdCache; |
|||
|
|||
@Override |
|||
public void init(TbContext ctx, TbNodeConfiguration configuration) throws TbNodeException { |
|||
this.config = loadCustomerNodeActionConfig(configuration); |
|||
CacheBuilder cacheBuilder = CacheBuilder.newBuilder(); |
|||
if (this.config.getCustomerCacheExpiration() > 0) { |
|||
cacheBuilder.expireAfterWrite(this.config.getCustomerCacheExpiration(), TimeUnit.SECONDS); |
|||
} |
|||
customerIdCache = cacheBuilder |
|||
.build(new CustomerCacheLoader(ctx, createCustomerIfNotExists())); |
|||
} |
|||
|
|||
protected abstract boolean createCustomerIfNotExists(); |
|||
|
|||
protected abstract C loadCustomerNodeActionConfig(TbNodeConfiguration configuration) throws TbNodeException; |
|||
|
|||
@Override |
|||
public void onMsg(TbContext ctx, TbMsg msg) { |
|||
withCallback(processCustomerAction(ctx, msg), |
|||
m -> ctx.tellNext(msg, "Success"), |
|||
t -> ctx.tellFailure(msg, t), ctx.getDbCallbackExecutor()); |
|||
} |
|||
|
|||
private ListenableFuture<Void> processCustomerAction(TbContext ctx, TbMsg msg) { |
|||
ListenableFuture<CustomerId> customerIdFeature = getCustomer(ctx, msg); |
|||
return Futures.transform(customerIdFeature, customerId -> { |
|||
doProcessCustomerAction(ctx, msg, customerId); |
|||
return null; |
|||
}, ctx.getDbCallbackExecutor() |
|||
); |
|||
} |
|||
|
|||
protected abstract void doProcessCustomerAction(TbContext ctx, TbMsg msg, CustomerId customerId); |
|||
|
|||
protected ListenableFuture<CustomerId> getCustomer(TbContext ctx, TbMsg msg) { |
|||
String customerTitle = TbNodeUtils.processPattern(this.config.getCustomerNamePattern(), msg.getMetaData()); |
|||
CustomerKey key = new CustomerKey(customerTitle); |
|||
return ctx.getDbCallbackExecutor().executeAsync(() -> { |
|||
Optional<CustomerId> customerId = customerIdCache.get(key); |
|||
if (!customerId.isPresent()) { |
|||
throw new RuntimeException("No customer found with name '" + key.getCustomerTitle() + "'."); |
|||
} |
|||
return customerId.get(); |
|||
}); |
|||
} |
|||
|
|||
@Override |
|||
public void destroy() { |
|||
} |
|||
|
|||
@Data |
|||
@AllArgsConstructor |
|||
private static class CustomerKey { |
|||
private String customerTitle; |
|||
} |
|||
|
|||
private static class CustomerCacheLoader extends CacheLoader<CustomerKey, Optional<CustomerId>> { |
|||
|
|||
private final TbContext ctx; |
|||
private final boolean createIfNotExists; |
|||
|
|||
private CustomerCacheLoader(TbContext ctx, boolean createIfNotExists) { |
|||
this.ctx = ctx; |
|||
this.createIfNotExists = createIfNotExists; |
|||
} |
|||
|
|||
@Override |
|||
public Optional<CustomerId> load(CustomerKey key) { |
|||
CustomerService service = ctx.getCustomerService(); |
|||
Optional<Customer> customerOptional = |
|||
service.findCustomerByTenantIdAndTitle(ctx.getTenantId(), key.getCustomerTitle()); |
|||
if (customerOptional.isPresent()) { |
|||
return Optional.of(customerOptional.get().getId()); |
|||
} else if (createIfNotExists) { |
|||
Customer newCustomer = new Customer(); |
|||
newCustomer.setTitle(key.getCustomerTitle()); |
|||
newCustomer.setTenantId(ctx.getTenantId()); |
|||
return Optional.of(service.saveCustomer(newCustomer).getId()); |
|||
} |
|||
return Optional.empty(); |
|||
} |
|||
} |
|||
|
|||
} |
|||
@ -0,0 +1,29 @@ |
|||
/** |
|||
* Copyright © 2016-2018 The Thingsboard Authors |
|||
* |
|||
* Licensed under the Apache License, Version 2.0 (the "License"); |
|||
* you may not use this file except in compliance with the License. |
|||
* You may obtain a copy of the License at |
|||
* |
|||
* http://www.apache.org/licenses/LICENSE-2.0
|
|||
* |
|||
* Unless required by applicable law or agreed to in writing, software |
|||
* distributed under the License is distributed on an "AS IS" BASIS, |
|||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|||
* See the License for the specific language governing permissions and |
|||
* limitations under the License. |
|||
*/ |
|||
package org.thingsboard.rule.engine.action; |
|||
|
|||
import lombok.Data; |
|||
|
|||
/** |
|||
* Created by igor on 6/1/18. |
|||
*/ |
|||
@Data |
|||
public abstract class TbAbstractCustomerActionNodeConfiguration { |
|||
|
|||
private String customerNamePattern; |
|||
private long customerCacheExpiration; |
|||
|
|||
} |
|||
@ -1,72 +0,0 @@ |
|||
package org.thingsboard.rule.engine.action; |
|||
|
|||
import com.google.common.util.concurrent.ListenableFuture; |
|||
import lombok.extern.slf4j.Slf4j; |
|||
import org.thingsboard.rule.engine.api.*; |
|||
import org.thingsboard.rule.engine.api.util.DonAsynchron; |
|||
import org.thingsboard.rule.engine.api.util.TbNodeUtils; |
|||
import org.thingsboard.server.common.data.id.EntityId; |
|||
import org.thingsboard.server.common.data.id.EntityIdFactory; |
|||
import org.thingsboard.server.common.data.plugin.ComponentType; |
|||
import org.thingsboard.server.common.data.relation.EntityRelation; |
|||
import org.thingsboard.server.common.data.relation.EntitySearchDirection; |
|||
import org.thingsboard.server.common.data.relation.RelationTypeGroup; |
|||
import org.thingsboard.server.common.msg.TbMsg; |
|||
|
|||
import static org.thingsboard.rule.engine.api.TbRelationTypes.FAILURE; |
|||
import static org.thingsboard.rule.engine.api.TbRelationTypes.SUCCESS; |
|||
import static org.thingsboard.rule.engine.api.util.DonAsynchron.withCallback; |
|||
|
|||
@Slf4j |
|||
@RuleNode( |
|||
type = ComponentType.ACTION, |
|||
name = "create relation", |
|||
configClazz = TbCreateRelationNodeConfiguration.class, |
|||
nodeDescription = "Create the relation from the selected entity to originator of the message by type and direction", |
|||
nodeDetails = "If relation is already exists - send Message via <b>Success</b> chain with error message, otherwise is also used <b>Success</b> chain without error message.", |
|||
uiResources = {"static/rulenode/rulenode-core-config.js"}, |
|||
configDirective = "tbFilterNodeCheckRelationConfig") |
|||
public class TbCreateRelationNode implements TbNode { |
|||
|
|||
private TbCreateRelationNodeConfiguration config; |
|||
private EntityId fromId; |
|||
private EntityId toId; |
|||
|
|||
@Override |
|||
public void init(TbContext ctx, TbNodeConfiguration configuration) throws TbNodeException { |
|||
this.config = TbNodeUtils.convert(configuration, TbCreateRelationNodeConfiguration.class); |
|||
} |
|||
|
|||
@Override |
|||
public void onMsg(TbContext ctx, TbMsg msg) { |
|||
DonAsynchron.withCallback(checkRelation(ctx, msg), result -> { |
|||
if (result) { |
|||
ctx.tellNext(msg, SUCCESS, new Throwable("Relation between message originator and Entity: " + config.getEntityId() + " is already exist")); |
|||
} else { |
|||
processCreateRelation(ctx, msg); |
|||
} |
|||
}, error -> ctx.tellFailure(msg, error), ctx.getDbCallbackExecutor()); |
|||
} |
|||
|
|||
@Override |
|||
public void destroy() { |
|||
} |
|||
|
|||
private ListenableFuture<Boolean> checkRelation(TbContext ctx, TbMsg msg) { |
|||
if (EntitySearchDirection.TO.name().equals(config.getDirection())) { |
|||
fromId = EntityIdFactory.getByTypeAndId(config.getEntityType(), config.getEntityId()); |
|||
toId = msg.getOriginator(); |
|||
} else { |
|||
fromId = EntityIdFactory.getByTypeAndId(config.getEntityType(), config.getEntityId()); |
|||
toId = msg.getOriginator(); |
|||
} |
|||
return ctx.getRelationService().checkRelation(ctx.getTenantId(), fromId, toId, config.getRelationType(), RelationTypeGroup.COMMON); |
|||
} |
|||
|
|||
private void processCreateRelation(TbContext ctx, TbMsg msg) { |
|||
EntityRelation entityRelation = new EntityRelation(fromId, toId, config.getRelationType(), RelationTypeGroup.COMMON); |
|||
withCallback(ctx.getRelationService().saveRelationAsync(ctx.getTenantId(), entityRelation), |
|||
filterResult -> ctx.tellNext(msg, filterResult ? SUCCESS : FAILURE), t -> ctx.tellFailure(msg, t), ctx.getDbCallbackExecutor()); |
|||
} |
|||
|
|||
} |
|||
@ -1,23 +0,0 @@ |
|||
package org.thingsboard.rule.engine.action; |
|||
|
|||
import lombok.Data; |
|||
import org.thingsboard.rule.engine.api.NodeConfiguration; |
|||
import org.thingsboard.server.common.data.relation.EntitySearchDirection; |
|||
|
|||
@Data |
|||
public class TbCreateRelationNodeConfiguration implements NodeConfiguration<TbCreateRelationNodeConfiguration> { |
|||
|
|||
private String direction; |
|||
private String entityId; |
|||
private String entityType; |
|||
private String relationType; |
|||
|
|||
|
|||
@Override |
|||
public TbCreateRelationNodeConfiguration defaultConfiguration() { |
|||
TbCreateRelationNodeConfiguration configuration = new TbCreateRelationNodeConfiguration(); |
|||
configuration.setDirection(EntitySearchDirection.FROM.name()); |
|||
configuration.setRelationType("Contains"); |
|||
return configuration; |
|||
} |
|||
} |
|||
@ -0,0 +1,88 @@ |
|||
/** |
|||
* Copyright © 2016-2018 The Thingsboard Authors |
|||
* |
|||
* Licensed under the Apache License, Version 2.0 (the "License"); |
|||
* you may not use this file except in compliance with the License. |
|||
* You may obtain a copy of the License at |
|||
* |
|||
* http://www.apache.org/licenses/LICENSE-2.0
|
|||
* |
|||
* Unless required by applicable law or agreed to in writing, software |
|||
* distributed under the License is distributed on an "AS IS" BASIS, |
|||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|||
* See the License for the specific language governing permissions and |
|||
* limitations under the License. |
|||
*/ |
|||
package org.thingsboard.rule.engine.action; |
|||
|
|||
import org.thingsboard.rule.engine.api.RuleNode; |
|||
import org.thingsboard.rule.engine.api.TbContext; |
|||
import org.thingsboard.rule.engine.api.TbNodeConfiguration; |
|||
import org.thingsboard.rule.engine.api.TbNodeException; |
|||
import org.thingsboard.rule.engine.api.util.TbNodeUtils; |
|||
import org.thingsboard.server.common.data.EntityType; |
|||
import org.thingsboard.server.common.data.id.*; |
|||
import org.thingsboard.server.common.data.plugin.ComponentType; |
|||
import org.thingsboard.server.common.msg.TbMsg; |
|||
|
|||
@RuleNode( |
|||
type = ComponentType.ACTION, |
|||
name = "unassign from customer", |
|||
configClazz = TbUnassignFromCustomerNodeConfiguration.class, |
|||
nodeDescription = "Unassign Message Originator Entity from Customer", |
|||
nodeDetails = "Finds target Entity Customer by Customer name pattern and then unassign Originator Entity from this customer.", |
|||
uiResources = {"static/rulenode/rulenode-core-config.js"}, |
|||
configDirective = "tbActionNodeUnAssignToCustomerConfig", |
|||
icon = "remove_circle" |
|||
) |
|||
public class TbUnassignFromCustomerNode extends TbAbstractCustomerActionNode<TbUnassignFromCustomerNodeConfiguration> { |
|||
|
|||
@Override |
|||
protected boolean createCustomerIfNotExists() { |
|||
return false; |
|||
} |
|||
|
|||
@Override |
|||
protected TbUnassignFromCustomerNodeConfiguration loadCustomerNodeActionConfig(TbNodeConfiguration configuration) throws TbNodeException { |
|||
return TbNodeUtils.convert(configuration, TbUnassignFromCustomerNodeConfiguration.class); |
|||
} |
|||
|
|||
@Override |
|||
protected void doProcessCustomerAction(TbContext ctx, TbMsg msg, CustomerId customerId) { |
|||
EntityType originatorType = msg.getOriginator().getEntityType(); |
|||
switch (originatorType) { |
|||
case DEVICE: |
|||
processUnnasignDevice(ctx, msg); |
|||
break; |
|||
case ASSET: |
|||
processUnnasignAsset(ctx, msg); |
|||
break; |
|||
case ENTITY_VIEW: |
|||
processUnassignEntityView(ctx, msg); |
|||
break; |
|||
case DASHBOARD: |
|||
processUnnasignDashboard(ctx, msg, customerId); |
|||
break; |
|||
default: |
|||
ctx.tellFailure(msg, new RuntimeException("Unsupported originator type '" + originatorType + |
|||
"'! Only 'DEVICE', 'ASSET', 'ENTITY_VIEW' or 'DASHBOARD' types are allowed.")); |
|||
break; |
|||
} |
|||
} |
|||
|
|||
private void processUnnasignAsset(TbContext ctx, TbMsg msg) { |
|||
ctx.getAssetService().unassignAssetFromCustomer(ctx.getTenantId(), new AssetId(msg.getOriginator().getId())); |
|||
} |
|||
|
|||
private void processUnnasignDevice(TbContext ctx, TbMsg msg) { |
|||
ctx.getDeviceService().unassignDeviceFromCustomer(ctx.getTenantId(), new DeviceId(msg.getOriginator().getId())); |
|||
} |
|||
|
|||
private void processUnnasignDashboard(TbContext ctx, TbMsg msg, CustomerId customerId) { |
|||
ctx.getDashboardService().unassignDashboardFromCustomer(ctx.getTenantId(), new DashboardId(msg.getOriginator().getId()), customerId); |
|||
} |
|||
|
|||
private void processUnassignEntityView(TbContext ctx, TbMsg msg) { |
|||
ctx.getEntityViewService().unassignEntityViewFromCustomer(ctx.getTenantId(), new EntityViewId(msg.getOriginator().getId())); |
|||
} |
|||
} |
|||
@ -0,0 +1,31 @@ |
|||
/** |
|||
* Copyright © 2016-2018 The Thingsboard Authors |
|||
* |
|||
* Licensed under the Apache License, Version 2.0 (the "License"); |
|||
* you may not use this file except in compliance with the License. |
|||
* You may obtain a copy of the License at |
|||
* |
|||
* http://www.apache.org/licenses/LICENSE-2.0
|
|||
* |
|||
* Unless required by applicable law or agreed to in writing, software |
|||
* distributed under the License is distributed on an "AS IS" BASIS, |
|||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|||
* See the License for the specific language governing permissions and |
|||
* limitations under the License. |
|||
*/ |
|||
package org.thingsboard.rule.engine.action; |
|||
|
|||
import lombok.Data; |
|||
import org.thingsboard.rule.engine.api.NodeConfiguration; |
|||
|
|||
@Data |
|||
public class TbUnassignFromCustomerNodeConfiguration extends TbAbstractCustomerActionNodeConfiguration implements NodeConfiguration<TbUnassignFromCustomerNodeConfiguration> { |
|||
|
|||
@Override |
|||
public TbUnassignFromCustomerNodeConfiguration defaultConfiguration() { |
|||
TbUnassignFromCustomerNodeConfiguration configuration = new TbUnassignFromCustomerNodeConfiguration(); |
|||
configuration.setCustomerNamePattern(""); |
|||
configuration.setCustomerCacheExpiration(300); |
|||
return configuration; |
|||
} |
|||
} |
|||
File diff suppressed because one or more lines are too long
Loading…
Reference in new issue