Browse Source

Merge branch 'master' of github.com:Squidex/squidex

pull/528/head
Sebastian 6 years ago
parent
commit
b191d8405c
  1. 4
      backend/extensions/Squidex.Extensions/Actions/Kafka/KafkaAction.cs
  2. 9
      backend/extensions/Squidex.Extensions/Actions/Kafka/KafkaActionHandler.cs
  3. 110
      backend/extensions/Squidex.Extensions/Actions/Kafka/KafkaProducer.cs
  4. 11
      backend/extensions/Squidex.Extensions/Actions/Kafka/KafkaProducerOptions.cs
  5. 2
      backend/extensions/Squidex.Extensions/Squidex.Extensions.csproj
  6. 11
      backend/src/Squidex.Domain.Apps.Entities.MongoDb/Assets/MongoAssetRepository.cs
  7. 72
      backend/src/Squidex.Domain.Apps.Entities.MongoDb/Contents/Operations/QueryContentsByQuery.cs
  8. 25
      backend/src/Squidex.Infrastructure.MongoDb/MongoDb/Queries/LimitExtensions.cs
  9. 7
      backend/src/Squidex.Infrastructure/Queries/CompareFilter.cs
  10. 4
      backend/src/Squidex.Infrastructure/Queries/FilterNode.cs
  11. 8
      backend/src/Squidex.Infrastructure/Queries/LogicalFilter.cs
  12. 7
      backend/src/Squidex.Infrastructure/Queries/NegateFilter.cs
  13. 17
      backend/src/Squidex.Infrastructure/Queries/Query.cs
  14. 10
      backend/tests/Squidex.Domain.Apps.Entities.Tests/Contents/MongoDb/ContentsQueryFixture.cs
  15. 50
      backend/tests/Squidex.Domain.Apps.Entities.Tests/Contents/MongoDb/ContentsQueryTests.cs
  16. 1
      backend/tests/Squidex.Domain.Apps.Entities.Tests/Squidex.Domain.Apps.Entities.Tests.csproj
  17. 64
      backend/tests/Squidex.Infrastructure.Tests/Queries/QueryTests.cs

4
backend/extensions/Squidex.Extensions/Actions/Kafka/KafkaAction.cs

@ -40,5 +40,9 @@ namespace Squidex.Extensions.Actions.Kafka
[DataType(DataType.MultilineText)]
[Formattable]
public string Headers { get; set; }
[Display(Name = "Schema (Optional)", Description = "Define a specific AVRO schema in JSON format.")]
[DataType(DataType.MultilineText)]
public string Schema { get; set; }
}
}

9
backend/extensions/Squidex.Extensions/Actions/Kafka/KafkaActionHandler.cs

@ -54,7 +54,8 @@ namespace Squidex.Extensions.Actions.Kafka
TopicName = action.TopicName,
MessageKey = key,
MessageValue = value,
Headers = ParseHeaders(action.Headers, @event)
Headers = ParseHeaders(action.Headers, @event),
Schema = action.Schema
};
return (Description, ruleJob);
@ -105,13 +106,13 @@ namespace Squidex.Extensions.Actions.Kafka
}
}
await kafkaProducer.Send(job.TopicName, message);
await kafkaProducer.Send(job.TopicName, message, job.Schema);
return Result.Success($"Event pushed to {job.TopicName} kafka topic.");
}
catch (Exception ex)
{
return Result.Failed(ex, "Push to Kafka failed.");
return Result.Failed(ex, $"Push to Kafka failed: {ex}");
}
}
}
@ -125,5 +126,7 @@ namespace Squidex.Extensions.Actions.Kafka
public string MessageValue { get; set; }
public Dictionary<string, string> Headers { get; set; }
public string Schema { get; set; }
}
}

110
backend/extensions/Squidex.Extensions/Actions/Kafka/KafkaProducer.cs

@ -5,20 +5,34 @@
// All rights reserved. Licensed under the MIT license.
// ==========================================================================
using System;
using System.Collections.Generic;
using System.Threading.Tasks;
using Avro;
using Avro.Generic;
using Confluent.Kafka;
using Confluent.SchemaRegistry;
using Confluent.SchemaRegistry.Serdes;
using GraphQL.Types;
using Microsoft.Extensions.Options;
using Squidex.Infrastructure.Json;
using Squidex.Infrastructure.Json.Objects;
using Squidex.Infrastructure.Log;
namespace Squidex.Extensions.Actions.Kafka
{
public sealed class KafkaProducer
{
private readonly IProducer<string, string> producer;
private readonly IProducer<string, string> textProducer;
private readonly IProducer<string, GenericRecord> avroProducer;
private readonly ISchemaRegistryClient schemaRegistry;
private readonly IJsonSerializer jsonSerializer;
public KafkaProducer(IOptions<KafkaProducerOptions> options, ISemanticLog log)
public KafkaProducer(IOptions<KafkaProducerOptions> options, ISemanticLog log, IJsonSerializer jsonSerializer)
{
producer = new ProducerBuilder<string, string>(options.Value)
this.jsonSerializer = jsonSerializer;
textProducer = new ProducerBuilder<string, string>(options.Value)
.SetErrorHandler((p, error) =>
{
LogError(log, error);
@ -30,6 +44,24 @@ namespace Squidex.Extensions.Actions.Kafka
.SetKeySerializer(Serializers.Utf8)
.SetValueSerializer(Serializers.Utf8)
.Build();
if (options.Value.IsSchemaRegistryConfigured())
{
schemaRegistry = new CachedSchemaRegistryClient(options.Value.SchemaRegistry);
avroProducer = new ProducerBuilder<string, GenericRecord>(options.Value)
.SetErrorHandler((p, error) =>
{
LogError(log, error);
})
.SetLogHandler((p, message) =>
{
LogMessage(log, message);
})
.SetKeySerializer(Serializers.Utf8)
.SetValueSerializer(new AvroSerializer<GenericRecord>(schemaRegistry, options.Value.AvroSerializer))
.Build();
}
}
private static void LogMessage(ISemanticLog log, LogMessage message)
@ -77,14 +109,80 @@ namespace Squidex.Extensions.Actions.Kafka
.WriteProperty("reason", error.Reason));
}
public async Task<DeliveryResult<string, string>> Send(string topicName, Message<string, string> message)
public async Task<DeliveryResult<string, string>> Send(string topicName, Message<string, string> message, string schema)
{
return await producer.ProduceAsync(topicName, message);
if (!string.IsNullOrWhiteSpace(schema))
{
var value = CreateAvroRecord(message.Value, schema);
var avroMessage = new Message<string, GenericRecord> { Key = message.Key, Headers = message.Headers, Value = value };
await avroProducer.ProduceAsync(topicName, avroMessage);
}
return await textProducer.ProduceAsync(topicName, message);
}
private GenericRecord CreateAvroRecord(string json, string avroSchema)
{
var schema = (RecordSchema)Avro.Schema.Parse(avroSchema);
var jsonObject = jsonSerializer.Deserialize<JsonObject>(json);
var result = (GenericRecord)GetValue(jsonObject, schema);
return result;
}
public void Dispose()
{
producer?.Dispose();
textProducer?.Dispose();
avroProducer?.Dispose();
}
private object GetValue(IJsonValue value, Avro.Schema schema)
{
switch (value)
{
case JsonString s:
return s.Value;
case JsonNumber n:
return n.Value;
case JsonBoolean b:
return b.Value;
case JsonObject o:
{
var recordSchema = (RecordSchema)schema;
var result = new GenericRecord(recordSchema);
foreach (var (key, childValue) in o)
{
if (recordSchema.TryGetField(key, out var field))
{
result.Add(key, GetValue(childValue, field.Schema));
}
}
return result;
}
case JsonArray a:
{
var arraySchema = (ArraySchema)schema;
var result = new List<object>();
foreach (var item in a)
{
result.Add(GetValue(item, arraySchema.ItemSchema));
}
return result.ToArray();
}
}
return null;
}
}
}

11
backend/extensions/Squidex.Extensions/Actions/Kafka/KafkaProducerOptions.cs

@ -6,14 +6,25 @@
// ==========================================================================
using Confluent.Kafka;
using Confluent.SchemaRegistry;
using Confluent.SchemaRegistry.Serdes;
namespace Squidex.Extensions.Actions.Kafka
{
public class KafkaProducerOptions : ProducerConfig
{
public SchemaRegistryConfig SchemaRegistry { get; set; }
public AvroSerializerConfig AvroSerializer { get; set; }
public bool IsProducerConfigured()
{
return !string.IsNullOrWhiteSpace(BootstrapServers);
}
public bool IsSchemaRegistryConfigured()
{
return !string.IsNullOrWhiteSpace(SchemaRegistry?.Url);
}
}
}

2
backend/extensions/Squidex.Extensions/Squidex.Extensions.csproj

@ -9,7 +9,9 @@
</ItemGroup>
<ItemGroup>
<PackageReference Include="Algolia.Search" Version="6.5.1" />
<PackageReference Include="Confluent.Apache.Avro" Version="1.7.7.7" />
<PackageReference Include="Confluent.Kafka" Version="1.3.0" />
<PackageReference Include="Confluent.SchemaRegistry.Serdes" Version="1.3.0" />
<PackageReference Include="CoreTweet" Version="1.0.0.483" />
<PackageReference Include="Datadog.Trace" Version="1.14.2" />
<PackageReference Include="Elasticsearch.Net" Version="7.6.1" />

11
backend/src/Squidex.Domain.Apps.Entities.MongoDb/Assets/MongoAssetRepository.cs

@ -85,16 +85,9 @@ namespace Squidex.Domain.Apps.Entities.MongoDb.Assets
return ResultList.Create<IAssetEntity>(assetCount.Result, assetItems.Result);
}
catch (MongoQueryException ex)
catch (MongoQueryException ex) when (ex.Message.Contains("17406"))
{
if (ex.Message.Contains("17406"))
{
throw new DomainException("Result set is too large to be retrieved. Use $top parameter to reduce the number of items.");
}
else
{
throw;
}
throw new DomainException("Result set is too large to be retrieved. Use $take parameter to reduce the number of items.");
}
}
}

72
backend/src/Squidex.Domain.Apps.Entities.MongoDb/Contents/Operations/QueryContentsByQuery.cs

@ -7,8 +7,11 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using MongoDB.Bson;
using MongoDB.Bson.Serialization.Attributes;
using MongoDB.Driver;
using Squidex.Domain.Apps.Entities.Apps;
using Squidex.Domain.Apps.Entities.Contents;
@ -22,9 +25,21 @@ namespace Squidex.Domain.Apps.Entities.MongoDb.Contents.Operations
{
internal sealed class QueryContentsByQuery : OperationBase
{
private static readonly PropertyPath DefaultOrderField = "mt";
private readonly DataConverter converter;
private readonly ITextIndex indexer;
[BsonIgnoreExtraElements]
internal sealed class IdOnly
{
[BsonId]
[BsonElement("_id")]
[BsonRepresentation(BsonType.String)]
public Guid Id { get; set; }
public MongoContentEntity[] Joined { get; set; }
}
public QueryContentsByQuery(DataConverter converter, ITextIndex indexer)
{
this.converter = converter;
@ -71,12 +86,7 @@ namespace Squidex.Domain.Apps.Entities.MongoDb.Contents.Operations
var filter = CreateFilter(schema.Id, fullTextIds, query);
var contentCount = Collection.Find(filter).CountDocumentsAsync();
var contentItems =
Collection.Find(filter)
.QueryLimit(query)
.QuerySkip(query)
.QuerySort(query)
.ToListAsync();
var contentItems = FindContentsAsync(query, filter);
await Task.WhenAll(contentItems, contentCount);
@ -87,17 +97,53 @@ namespace Squidex.Domain.Apps.Entities.MongoDb.Contents.Operations
return ResultList.Create<IContentEntity>(contentCount.Result, contentItems.Result);
}
catch (MongoQueryException ex)
catch (MongoCommandException ex) when (ex.Code == 96)
{
if (ex.Message.Contains("17406"))
{
throw new DomainException("Result set is too large to be retrieved. Use $top parameter to reduce the number of items.");
}
else
throw new DomainException("Result set is too large to be retrieved. Use $take parameter to reduce the number of items.");
}
catch (MongoQueryException ex) when (ex.Message.Contains("17406"))
{
throw new DomainException("Result set is too large to be retrieved. Use $take parameter to reduce the number of items.");
}
}
private async Task<List<MongoContentEntity>> FindContentsAsync(ClrQuery query, FilterDefinition<MongoContentEntity> filter)
{
if (query.Skip > 0 && !IsSatisfiedByIndex(query))
{
var projection = Projection.Include("_id");
foreach (var field in query.GetAllFields())
{
throw;
projection = Projection.Include(field);
}
var joined =
await Collection.Aggregate()
.Match(filter)
.Project<IdOnly>(projection)
.QuerySort(query)
.QuerySkip(query)
.QueryLimit(query)
.Lookup<IdOnly, MongoContentEntity, IdOnly>(Collection, x => x.Id, x => x.Id, x => x.Joined)
.ToListAsync();
return joined.Select(x => x.Joined[0]).ToList();
}
var result =
Collection.Find(filter)
.QuerySort(query)
.QueryLimit(query)
.QuerySkip(query)
.ToListAsync();
return await result;
}
private static bool IsSatisfiedByIndex(ClrQuery query)
{
return query.Sort?.Any(x => x.Path == DefaultOrderField && x.Order == SortOrder.Descending) == true;
}
private static FilterDefinition<MongoContentEntity> CreateFilter(Guid schemaId, ICollection<Guid>? ids, ClrQuery? query)

25
backend/src/Squidex.Infrastructure.MongoDb/MongoDb/Queries/LimitExtensions.cs

@ -12,6 +12,16 @@ namespace Squidex.Infrastructure.MongoDb.Queries
{
public static class LimitExtensions
{
public static IAggregateFluent<T> QueryLimit<T>(this IAggregateFluent<T> cursor, ClrQuery query)
{
if (query.Take < long.MaxValue)
{
cursor = cursor.Limit((int)query.Take);
}
return cursor;
}
public static IFindFluent<T, T> QueryLimit<T>(this IFindFluent<T, T> cursor, ClrQuery query)
{
if (query.Take < long.MaxValue)
@ -22,6 +32,16 @@ namespace Squidex.Infrastructure.MongoDb.Queries
return cursor;
}
public static IAggregateFluent<T> QuerySkip<T>(this IAggregateFluent<T> cursor, ClrQuery query)
{
if (query.Skip > 0)
{
cursor = cursor.Skip((int)query.Skip);
}
return cursor;
}
public static IFindFluent<T, T> QuerySkip<T>(this IFindFluent<T, T> cursor, ClrQuery query)
{
if (query.Skip > 0)
@ -36,5 +56,10 @@ namespace Squidex.Infrastructure.MongoDb.Queries
{
return cursor.Sort(query.BuildSort<T>());
}
public static IAggregateFluent<T> QuerySort<T>(this IAggregateFluent<T> cursor, ClrQuery query)
{
return cursor.Sort(query.BuildSort<T>());
}
}
}

7
backend/src/Squidex.Infrastructure/Queries/CompareFilter.cs

@ -5,6 +5,8 @@
// All rights reserved. Licensed under the MIT license.
// ==========================================================================
using System.Collections.Generic;
namespace Squidex.Infrastructure.Queries
{
public sealed class CompareFilter<TValue> : FilterNode<TValue>
@ -28,6 +30,11 @@ namespace Squidex.Infrastructure.Queries
Value = value;
}
public override void AddFields(HashSet<string> fields)
{
fields.Add(Path.ToString());
}
public override T Accept<T>(FilterNodeVisitor<T, TValue> visitor)
{
return visitor.Visit(this);

4
backend/src/Squidex.Infrastructure/Queries/FilterNode.cs

@ -5,12 +5,16 @@
// All rights reserved. Licensed under the MIT license.
// ==========================================================================
using System.Collections.Generic;
namespace Squidex.Infrastructure.Queries
{
public abstract class FilterNode<TValue>
{
public abstract T Accept<T>(FilterNodeVisitor<T, TValue> visitor);
public abstract void AddFields(HashSet<string> fields);
public abstract override string ToString();
}
}

8
backend/src/Squidex.Infrastructure/Queries/LogicalFilter.cs

@ -25,6 +25,14 @@ namespace Squidex.Infrastructure.Queries
Type = type;
}
public override void AddFields(HashSet<string> fields)
{
foreach (var filter in Filters)
{
filter.AddFields(fields);
}
}
public override T Accept<T>(FilterNodeVisitor<T, TValue> visitor)
{
return visitor.Visit(this);

7
backend/src/Squidex.Infrastructure/Queries/NegateFilter.cs

@ -5,6 +5,8 @@
// All rights reserved. Licensed under the MIT license.
// ==========================================================================
using System.Collections.Generic;
namespace Squidex.Infrastructure.Queries
{
public sealed class NegateFilter<TValue> : FilterNode<TValue>
@ -18,6 +20,11 @@ namespace Squidex.Infrastructure.Queries
Filter = filter;
}
public override void AddFields(HashSet<string> fields)
{
Filter.AddFields(fields);
}
public override T Accept<T>(FilterNodeVisitor<T, TValue> visitor)
{
return visitor.Visit(this);

17
backend/src/Squidex.Infrastructure/Queries/Query.cs

@ -26,6 +26,23 @@ namespace Squidex.Infrastructure.Queries
public List<SortNode> Sort { get; set; } = new List<SortNode>();
public HashSet<string> GetAllFields()
{
var result = new HashSet<string>();
if (Sort != null)
{
foreach (var sorting in Sort)
{
result.Add(sorting.Path.ToString());
}
}
Filter?.AddFields(result);
return result;
}
public override string ToString()
{
var parts = new List<string>();

10
backend/tests/Squidex.Domain.Apps.Entities.Tests/Contents/MongoDb/ContentsQueryFixture.cs

@ -9,6 +9,7 @@ using System;
using System.Collections.Generic;
using System.Threading.Tasks;
using FakeItEasy;
using LoremNET;
using MongoDB.Bson;
using MongoDB.Driver;
using Newtonsoft.Json;
@ -102,13 +103,14 @@ namespace Squidex.Domain.Apps.Entities.Contents.MongoDb
{
for (var i = 0; i < numValues; i++)
{
var value = i.ToString();
var data =
new IdContentData()
.AddField(1,
new ContentFieldData()
.AddJsonValue(JsonValue.Create(value)));
.AddJsonValue(JsonValue.Create(i)))
.AddField(2,
new ContentFieldData()
.AddJsonValue(JsonValue.Create(Lorem.Paragraph(200, 20))));
var content = new MongoContentEntity
{
@ -198,7 +200,7 @@ namespace Squidex.Domain.Apps.Entities.Contents.MongoDb
{
var schemaDef =
new Schema("my-schema")
.AddField(Fields.String(1, "value", Partitioning.Invariant));
.AddField(Fields.Number(1, "value", Partitioning.Invariant));
var schema =
Mocks.Schema(

50
backend/tests/Squidex.Domain.Apps.Entities.Tests/Contents/MongoDb/ContentsQueryTests.cs

@ -62,11 +62,11 @@ namespace Squidex.Domain.Apps.Entities.Contents.MongoDb
[Fact]
public async Task Should_query_contents_by_filter()
{
var filter = F.Eq("data.value.iv", _.RandomValue());
var filter = F.Eq("data.value.iv", 12);
var contents = await _.ContentRepository.QueryIdsAsync(_.RandomAppId(), _.RandomSchemaId(), filter);
Assert.NotNull(contents);
Assert.NotEmpty(contents);
}
[Fact]
@ -84,7 +84,23 @@ namespace Squidex.Domain.Apps.Entities.Contents.MongoDb
var contents = await QueryAsync(query);
Assert.NotNull(contents);
Assert.NotEmpty(contents);
}
[Fact]
public async Task Should_query_contents_with_large_skip()
{
var query = new ClrQuery
{
Sort = new List<SortNode>
{
new SortNode("data.value.iv", SortOrder.Ascending)
}
};
var contents = await QueryAsync(query, 1000, 9000);
Assert.NotEmpty(contents);
}
[Fact]
@ -105,19 +121,33 @@ namespace Squidex.Domain.Apps.Entities.Contents.MongoDb
{
var query = new ClrQuery
{
Filter = F.Eq("data.value.iv", _.RandomValue())
Filter = F.Eq("data.value.iv", 200)
};
var contents = await QueryAsync(query);
var contents = await QueryAsync(query, 1000, 0);
Assert.NotNull(contents);
Assert.NotEmpty(contents);
}
private async Task<IResultList<IContentEntity>> QueryAsync(ClrQuery clrQuery)
private async Task<IResultList<IContentEntity>> QueryAsync(ClrQuery clrQuery, int take = 1000, int skip = 100)
{
clrQuery.Top = 1000;
clrQuery.Skip = 100;
clrQuery.Sort = new List<SortNode> { new SortNode("LastModified", SortOrder.Descending) };
if (clrQuery.Take == long.MaxValue)
{
clrQuery.Take = take;
}
if (clrQuery.Skip == 0)
{
clrQuery.Skip = skip;
}
if (clrQuery.Sort.Count == 0)
{
clrQuery.Sort = new List<SortNode>
{
new SortNode("LastModified", SortOrder.Descending)
};
}
var contents = await _.ContentRepository.QueryAsync(_.RandomApp(), _.RandomSchema(), clrQuery, SearchScope.All);

1
backend/tests/Squidex.Domain.Apps.Entities.Tests/Squidex.Domain.Apps.Entities.Tests.csproj

@ -20,6 +20,7 @@
<PackageReference Include="FakeItEasy" Version="6.0.0" />
<PackageReference Include="FluentAssertions" Version="5.10.2" />
<PackageReference Include="GraphQL" Version="2.4.0" />
<PackageReference Include="Lorem.Universal.Net" Version="3.0.64" />
<PackageReference Include="Microsoft.Extensions.Caching.Memory" Version="3.1.2" />
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="16.5.0" />
<PackageReference Include="RefactoringEssentials" Version="5.6.0" PrivateAssets="all" />

64
backend/tests/Squidex.Infrastructure.Tests/Queries/QueryTests.cs

@ -0,0 +1,64 @@
// ==========================================================================
// Squidex Headless CMS
// ==========================================================================
// Copyright (c) Squidex UG (haftungsbeschraenkt)
// All rights reserved. Licensed under the MIT license.
// ==========================================================================
using System.Collections.Generic;
using Xunit;
namespace Squidex.Infrastructure.Queries
{
public class QueryTests
{
[Fact]
public void Should_add_fields_from_sorting()
{
var query = new ClrQuery
{
Sort = new List<SortNode>
{
new SortNode("field1", SortOrder.Ascending),
new SortNode("field1", SortOrder.Ascending),
new SortNode("field2", SortOrder.Ascending)
}
};
var fields = query.GetAllFields();
var expected = new HashSet<string>
{
"field1",
"field2"
};
Assert.Equal(expected, fields);
}
[Fact]
public void Should_add_fields_from_filters()
{
var query = new ClrQuery
{
Filter =
ClrFilter.And(
ClrFilter.Not(
ClrFilter.Eq("field1", 1)),
ClrFilter.Or(
ClrFilter.Eq("field2", 2),
ClrFilter.Eq("field2", 4)))
};
var fields = query.GetAllFields();
var expected = new HashSet<string>
{
"field1",
"field2"
};
Assert.Equal(expected, fields);
}
}
}
Loading…
Cancel
Save