19 changed files with 399 additions and 31 deletions
@ -0,0 +1,23 @@ |
|||
/** |
|||
* Copyright © 2016-2018 The Thingsboard Authors |
|||
* |
|||
* Licensed under the Apache License, Version 2.0 (the "License"); |
|||
* you may not use this file except in compliance with the License. |
|||
* You may obtain a copy of the License at |
|||
* |
|||
* http://www.apache.org/licenses/LICENSE-2.0
|
|||
* |
|||
* Unless required by applicable law or agreed to in writing, software |
|||
* distributed under the License is distributed on an "AS IS" BASIS, |
|||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|||
* See the License for the specific language governing permissions and |
|||
* limitations under the License. |
|||
*/ |
|||
package org.thingsboard.server.common.data; |
|||
|
|||
import org.thingsboard.server.common.data.id.CustomerId; |
|||
|
|||
public interface HasCustomerId { |
|||
|
|||
CustomerId getCustomerId(); |
|||
} |
|||
@ -0,0 +1,23 @@ |
|||
/** |
|||
* Copyright © 2016-2018 The Thingsboard Authors |
|||
* |
|||
* Licensed under the Apache License, Version 2.0 (the "License"); |
|||
* you may not use this file except in compliance with the License. |
|||
* You may obtain a copy of the License at |
|||
* |
|||
* http://www.apache.org/licenses/LICENSE-2.0
|
|||
* |
|||
* Unless required by applicable law or agreed to in writing, software |
|||
* distributed under the License is distributed on an "AS IS" BASIS, |
|||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|||
* See the License for the specific language governing permissions and |
|||
* limitations under the License. |
|||
*/ |
|||
package org.thingsboard.server.common.data; |
|||
|
|||
import org.thingsboard.server.common.data.id.TenantId; |
|||
|
|||
public interface HasTenantId { |
|||
|
|||
TenantId getTenantId(); |
|||
} |
|||
@ -0,0 +1,40 @@ |
|||
/** |
|||
* Copyright © 2016-2018 The Thingsboard Authors |
|||
* |
|||
* Licensed under the Apache License, Version 2.0 (the "License"); |
|||
* you may not use this file except in compliance with the License. |
|||
* You may obtain a copy of the License at |
|||
* |
|||
* http://www.apache.org/licenses/LICENSE-2.0
|
|||
* |
|||
* Unless required by applicable law or agreed to in writing, software |
|||
* distributed under the License is distributed on an "AS IS" BASIS, |
|||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|||
* See the License for the specific language governing permissions and |
|||
* limitations under the License. |
|||
*/ |
|||
package org.thingsboard.rule.engine; |
|||
|
|||
import com.google.common.util.concurrent.FutureCallback; |
|||
import com.google.common.util.concurrent.Futures; |
|||
import com.google.common.util.concurrent.ListenableFuture; |
|||
|
|||
import javax.annotation.Nullable; |
|||
import java.util.function.Consumer; |
|||
|
|||
public class DonAsynchron { |
|||
|
|||
public static <T> void withCallback(ListenableFuture<T> future, Consumer<T> onSuccess, Consumer<Throwable> onFailure) { |
|||
Futures.addCallback(future, new FutureCallback<T>() { |
|||
@Override |
|||
public void onSuccess(@Nullable T result) { |
|||
onSuccess.accept(result); |
|||
} |
|||
|
|||
@Override |
|||
public void onFailure(Throwable t) { |
|||
onFailure.accept(t); |
|||
} |
|||
}); |
|||
} |
|||
} |
|||
@ -0,0 +1,66 @@ |
|||
/** |
|||
* Copyright © 2016-2018 The Thingsboard Authors |
|||
* |
|||
* Licensed under the Apache License, Version 2.0 (the "License"); |
|||
* you may not use this file except in compliance with the License. |
|||
* You may obtain a copy of the License at |
|||
* |
|||
* http://www.apache.org/licenses/LICENSE-2.0
|
|||
* |
|||
* Unless required by applicable law or agreed to in writing, software |
|||
* distributed under the License is distributed on an "AS IS" BASIS, |
|||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|||
* See the License for the specific language governing permissions and |
|||
* limitations under the License. |
|||
*/ |
|||
package org.thingsboard.rule.engine.metadata; |
|||
|
|||
import com.google.common.util.concurrent.ListenableFuture; |
|||
import org.thingsboard.rule.engine.TbNodeUtils; |
|||
import org.thingsboard.rule.engine.api.*; |
|||
import org.thingsboard.server.common.data.id.EntityId; |
|||
import org.thingsboard.server.common.data.kv.AttributeKvEntry; |
|||
import org.thingsboard.server.common.msg.TbMsg; |
|||
|
|||
import java.util.List; |
|||
|
|||
import static org.thingsboard.rule.engine.DonAsynchron.withCallback; |
|||
import static org.thingsboard.server.common.data.DataConstants.SERVER_SCOPE; |
|||
|
|||
public abstract class TbEntityGetAttrNode<T extends EntityId> implements TbNode { |
|||
|
|||
private TbGetEntityAttrNodeConfiguration config; |
|||
|
|||
@Override |
|||
public void init(TbNodeConfiguration configuration, TbNodeState state) throws TbNodeException { |
|||
this.config = TbNodeUtils.convert(configuration, TbGetEntityAttrNodeConfiguration.class); |
|||
} |
|||
|
|||
@Override |
|||
public void onMsg(TbContext ctx, TbMsg msg) { |
|||
withCallback( |
|||
findEntityAsync(ctx, msg.getOriginator()), |
|||
entityId -> withCallback( |
|||
ctx.getAttributesService().find(entityId, SERVER_SCOPE, config.getAttrMapping().keySet()), |
|||
attributes -> putAttributesAndTell(ctx, msg, attributes), |
|||
t -> ctx.tellError(msg, t) |
|||
), |
|||
t -> ctx.tellError(msg, t)); |
|||
} |
|||
|
|||
private void putAttributesAndTell(TbContext ctx, TbMsg msg, List<AttributeKvEntry> attributes) { |
|||
attributes.forEach(r -> { |
|||
String attrName = config.getAttrMapping().get(r.getKey()); |
|||
msg.getMetaData().putValue(attrName, r.getValueAsString()); |
|||
}); |
|||
ctx.tellNext(msg); |
|||
} |
|||
|
|||
@Override |
|||
public void destroy() { |
|||
|
|||
} |
|||
|
|||
protected abstract ListenableFuture<T> findEntityAsync(TbContext ctx, EntityId originator); |
|||
|
|||
} |
|||
@ -0,0 +1,49 @@ |
|||
/** |
|||
* Copyright © 2016-2018 The Thingsboard Authors |
|||
* |
|||
* Licensed under the Apache License, Version 2.0 (the "License"); |
|||
* you may not use this file except in compliance with the License. |
|||
* You may obtain a copy of the License at |
|||
* |
|||
* http://www.apache.org/licenses/LICENSE-2.0
|
|||
* |
|||
* Unless required by applicable law or agreed to in writing, software |
|||
* distributed under the License is distributed on an "AS IS" BASIS, |
|||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|||
* See the License for the specific language governing permissions and |
|||
* limitations under the License. |
|||
*/ |
|||
package org.thingsboard.rule.engine.metadata; |
|||
|
|||
import com.google.common.util.concurrent.AsyncFunction; |
|||
import com.google.common.util.concurrent.Futures; |
|||
import com.google.common.util.concurrent.ListenableFuture; |
|||
import org.thingsboard.rule.engine.api.TbContext; |
|||
import org.thingsboard.rule.engine.api.TbNodeException; |
|||
import org.thingsboard.server.common.data.HasCustomerId; |
|||
import org.thingsboard.server.common.data.id.*; |
|||
|
|||
public class TbGetCustomerAttributeNode extends TbEntityGetAttrNode<CustomerId> { |
|||
|
|||
@Override |
|||
protected ListenableFuture<CustomerId> findEntityAsync(TbContext ctx, EntityId originator) { |
|||
|
|||
switch (originator.getEntityType()) { |
|||
case CUSTOMER: |
|||
return Futures.immediateFuture((CustomerId) originator); |
|||
case USER: |
|||
return getCustomerAsync(ctx.getUserService().findUserByIdAsync((UserId) originator)); |
|||
case ASSET: |
|||
return getCustomerAsync(ctx.getAssetService().findAssetByIdAsync((AssetId) originator)); |
|||
case DEVICE: |
|||
return getCustomerAsync(ctx.getDeviceService().findDeviceByIdAsync((DeviceId) originator)); |
|||
default: |
|||
return Futures.immediateFailedFuture(new TbNodeException("Unexpected originator EntityType " + originator)); |
|||
} |
|||
} |
|||
|
|||
private <T extends HasCustomerId> ListenableFuture<CustomerId> getCustomerAsync(ListenableFuture<T> future) { |
|||
return Futures.transform(future, (AsyncFunction<HasCustomerId, CustomerId>) in -> Futures.immediateFuture(in.getCustomerId())); |
|||
} |
|||
|
|||
} |
|||
@ -0,0 +1,27 @@ |
|||
/** |
|||
* Copyright © 2016-2018 The Thingsboard Authors |
|||
* |
|||
* Licensed under the Apache License, Version 2.0 (the "License"); |
|||
* you may not use this file except in compliance with the License. |
|||
* You may obtain a copy of the License at |
|||
* |
|||
* http://www.apache.org/licenses/LICENSE-2.0
|
|||
* |
|||
* Unless required by applicable law or agreed to in writing, software |
|||
* distributed under the License is distributed on an "AS IS" BASIS, |
|||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|||
* See the License for the specific language governing permissions and |
|||
* limitations under the License. |
|||
*/ |
|||
package org.thingsboard.rule.engine.metadata; |
|||
|
|||
import lombok.Data; |
|||
|
|||
import java.util.Map; |
|||
import java.util.Optional; |
|||
|
|||
@Data |
|||
public class TbGetEntityAttrNodeConfiguration { |
|||
|
|||
private Map<String, String> attrMapping; |
|||
} |
|||
@ -0,0 +1,62 @@ |
|||
/** |
|||
* Copyright © 2016-2018 The Thingsboard Authors |
|||
* |
|||
* Licensed under the Apache License, Version 2.0 (the "License"); |
|||
* you may not use this file except in compliance with the License. |
|||
* You may obtain a copy of the License at |
|||
* |
|||
* http://www.apache.org/licenses/LICENSE-2.0
|
|||
* |
|||
* Unless required by applicable law or agreed to in writing, software |
|||
* distributed under the License is distributed on an "AS IS" BASIS, |
|||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|||
* See the License for the specific language governing permissions and |
|||
* limitations under the License. |
|||
*/ |
|||
package org.thingsboard.rule.engine.metadata; |
|||
|
|||
import com.google.common.util.concurrent.AsyncFunction; |
|||
import com.google.common.util.concurrent.Futures; |
|||
import com.google.common.util.concurrent.ListenableFuture; |
|||
import lombok.extern.slf4j.Slf4j; |
|||
import org.thingsboard.rule.engine.api.TbContext; |
|||
import org.thingsboard.rule.engine.api.TbNodeException; |
|||
import org.thingsboard.server.common.data.HasTenantId; |
|||
import org.thingsboard.server.common.data.alarm.AlarmId; |
|||
import org.thingsboard.server.common.data.id.*; |
|||
|
|||
@Slf4j |
|||
public class TbGetTenantAttributeNode extends TbEntityGetAttrNode<TenantId> { |
|||
|
|||
@Override |
|||
protected ListenableFuture<TenantId> findEntityAsync(TbContext ctx, EntityId originator) { |
|||
|
|||
switch (originator.getEntityType()) { |
|||
case TENANT: |
|||
return Futures.immediateFuture((TenantId) originator); |
|||
case CUSTOMER: |
|||
return getTenantAsync(ctx.getCustomerService().findCustomerByIdAsync((CustomerId) originator)); |
|||
case USER: |
|||
return getTenantAsync(ctx.getUserService().findUserByIdAsync((UserId) originator)); |
|||
case RULE: |
|||
return getTenantAsync(ctx.getRuleService().findRuleByIdAsync((RuleId) originator)); |
|||
case PLUGIN: |
|||
return getTenantAsync(ctx.getPluginService().findPluginByIdAsync((PluginId) originator)); |
|||
case ASSET: |
|||
return getTenantAsync(ctx.getAssetService().findAssetByIdAsync((AssetId) originator)); |
|||
case DEVICE: |
|||
return getTenantAsync(ctx.getDeviceService().findDeviceByIdAsync((DeviceId) originator)); |
|||
case ALARM: |
|||
return getTenantAsync(ctx.getAlarmService().findAlarmByIdAsync((AlarmId) originator)); |
|||
case RULE_CHAIN: |
|||
return getTenantAsync(ctx.getRuleChainService().findRuleChainByIdAsync((RuleChainId) originator)); |
|||
default: |
|||
return Futures.immediateFailedFuture(new TbNodeException("Unexpected originator EntityType " + originator)); |
|||
} |
|||
} |
|||
|
|||
private <T extends HasTenantId> ListenableFuture<TenantId> getTenantAsync(ListenableFuture<T> future) { |
|||
return Futures.transform(future, (AsyncFunction<HasTenantId, TenantId>) in -> Futures.immediateFuture(in.getTenantId())); |
|||
} |
|||
|
|||
} |
|||
@ -0,0 +1,44 @@ |
|||
/** |
|||
* Copyright © 2016-2018 The Thingsboard Authors |
|||
* |
|||
* Licensed under the Apache License, Version 2.0 (the "License"); |
|||
* you may not use this file except in compliance with the License. |
|||
* You may obtain a copy of the License at |
|||
* |
|||
* http://www.apache.org/licenses/LICENSE-2.0
|
|||
* |
|||
* Unless required by applicable law or agreed to in writing, software |
|||
* distributed under the License is distributed on an "AS IS" BASIS, |
|||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|||
* See the License for the specific language governing permissions and |
|||
* limitations under the License. |
|||
*/ |
|||
package org.thingsboard.rule.engine.metadata; |
|||
|
|||
import org.junit.Test; |
|||
import org.junit.runner.RunWith; |
|||
import org.mockito.Mock; |
|||
import org.mockito.runners.MockitoJUnitRunner; |
|||
import org.thingsboard.rule.engine.api.TbContext; |
|||
import org.thingsboard.server.common.msg.TbMsg; |
|||
|
|||
import static org.junit.Assert.*; |
|||
|
|||
@RunWith(MockitoJUnitRunner.class) |
|||
public class TbGetCustomerAttributeNodeTest { |
|||
|
|||
//todo-vp: return to this after fixing compilation problem with protobuff
|
|||
// private TbGetCustomerAttributeNode node;
|
|||
//
|
|||
// @Mock
|
|||
// private TbContext ctx;
|
|||
//
|
|||
// private TbMsg msg;
|
|||
//
|
|||
// @Test
|
|||
// public void customerAttributeAddedInMetadata() {
|
|||
// msg = new TbMsg();
|
|||
// node.onMsg(ctx, );
|
|||
// }
|
|||
|
|||
} |
|||
Loading…
Reference in new issue