Skip to content

Commit 4d0985f

Browse files
committed
CSHARP-994: fix thread starvation in Mapper
1 parent aa4f36e commit 4d0985f

File tree

7 files changed

+215
-23
lines changed

7 files changed

+215
-23
lines changed

src/Cassandra/Cassandra.csproj

+8-5
Original file line numberDiff line numberDiff line change
@@ -8,8 +8,8 @@
88
<VersionPrefix>3.22.0</VersionPrefix>
99
<IncludeSourceRevisionInInformationalVersion>false</IncludeSourceRevisionInInformationalVersion>
1010
<Authors>DataStax</Authors>
11-
<TargetFrameworks Condition="'$(BuildCoreOnly)' != 'True'">net452;netstandard2.0</TargetFrameworks>
12-
<TargetFramework Condition="'$(BuildCoreOnly)' == 'True'">netstandard2.0</TargetFramework>
11+
<TargetFrameworks Condition="'$(BuildCoreOnly)' != 'True'">net452;netstandard2.0;net8.0</TargetFrameworks>
12+
<TargetFrameworks Condition="'$(BuildCoreOnly)' == 'True'">netstandard2.0;net8.0</TargetFrameworks>
1313
<NoWarn>$(NoWarn);1591</NoWarn>
1414
<TreatWarningsAsErrors>true</TreatWarningsAsErrors>
1515
<WarningsNotAsErrors>NU1901;NU1902;NU1903;NU1904</WarningsNotAsErrors>
@@ -24,16 +24,17 @@
2424
<PackageLicenseFile>LICENSE.md</PackageLicenseFile>
2525
<RepositoryUrl>https://github.com/datastax/csharp-driver</RepositoryUrl>
2626
<PackageProjectUrl>https://github.com/datastax/csharp-driver</PackageProjectUrl>
27-
<LangVersion>7.1</LangVersion>
28-
<NuGetAudit>false</NuGetAudit>
27+
<LangVersion Condition="$(TargetFramework) != 'net8.0'">7.1</LangVersion>
28+
<LangVersion Condition="$(TargetFramework) == 'net8.0'">12.0</LangVersion>
29+
<NuGetAudit>false</NuGetAudit>
2930
</PropertyGroup>
3031
<PropertyGroup Condition="$([System.Text.RegularExpressions.Regex]::IsMatch('$(TargetFramework)', '^net4\d'))">
3132
<DefineConstants>$(DefineConstants);NETFRAMEWORK</DefineConstants>
3233
</PropertyGroup>
3334
<PropertyGroup Condition="$([System.Text.RegularExpressions.Regex]::IsMatch('$(TargetFramework)', '^net\d$'))">
3435
<DefineConstants>$(DefineConstants);NETCOREAPP</DefineConstants>
3536
</PropertyGroup>
36-
37+
3738
<ItemGroup>
3839
<None Include="..\..\LICENSE.md" Pack="true" PackagePath="LICENSE.md" />
3940
</ItemGroup>
@@ -44,6 +45,8 @@
4445
<PackageReference Include="Microsoft.Extensions.Logging.Abstractions" Version="1.0.0" />
4546
<PackageReference Include="Newtonsoft.Json" Version="9.0.1" NoWarn="NU1903" />
4647
<PackageReference Include="System.Management" Version="4.7.0" />
48+
</ItemGroup>
49+
<ItemGroup Condition="'$(TargetFramework)' != 'net8.0'">
4750
<PackageReference Include="System.Runtime.InteropServices.RuntimeInformation" Version="4.0.0" />
4851
<PackageReference Include="System.Threading.Tasks.Dataflow" Version="4.6.0" />
4952
</ItemGroup>

src/Cassandra/Data/Linq/CqlConditionalCommand.cs

+1-1
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,7 @@ protected internal override string GetCql(out object[] values)
7676
Cql.New(cql, values).WithExecutionProfile(executionProfile)).ConfigureAwait(false);
7777
this.CopyQueryPropertiesTo(stmt);
7878
var rs = await session.ExecuteAsync(stmt, executionProfile).ConfigureAwait(false);
79-
return AppliedInfo<TEntity>.FromRowSet(_mapperFactory, cql, rs);
79+
return AppliedInfo<TEntity>.FromRowSetAsync(_mapperFactory, cql, rs);
8080
}
8181

8282
/// <summary>
+110
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,110 @@
1+
// Copyright (C) DataStax Inc.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
#if NET8_0_OR_GREATER && !NET10_OR_GREATER // These methods are implemented in .NET 10.
16+
using System.Collections.Generic;
17+
using System.Threading;
18+
using System.Threading.Tasks;
19+
20+
// ReSharper disable once CheckNamespace
21+
namespace System.Linq;
22+
23+
internal static class AsyncEnumerable
24+
{
25+
public static async ValueTask<TSource> FirstAsync<TSource>(
26+
this IAsyncEnumerable<TSource> source,
27+
CancellationToken cancellationToken = default)
28+
{
29+
await using var e = source.GetAsyncEnumerator(cancellationToken);
30+
31+
if (!await e.MoveNextAsync())
32+
{
33+
throw new InvalidOperationException("Sequence contains no elements");
34+
}
35+
36+
return e.Current;
37+
}
38+
39+
public static async ValueTask<TSource> FirstOrDefaultAsync<TSource>(
40+
this IAsyncEnumerable<TSource> source,
41+
CancellationToken cancellationToken = default)
42+
{
43+
await using var e = source.GetAsyncEnumerator(cancellationToken);
44+
return await e.MoveNextAsync() ? e.Current : default;
45+
}
46+
47+
public static async ValueTask<TSource> SingleAsync<TSource>(
48+
this IAsyncEnumerable<TSource> source,
49+
CancellationToken cancellationToken = default)
50+
{
51+
await using var e = source.GetAsyncEnumerator(cancellationToken);
52+
53+
if (!await e.MoveNextAsync())
54+
{
55+
throw new InvalidOperationException("Sequence contains no elements");
56+
}
57+
58+
TSource result = e.Current;
59+
if (await e.MoveNextAsync())
60+
{
61+
throw new InvalidOperationException("Sequence contains more than one element");
62+
}
63+
64+
return result;
65+
}
66+
67+
public static async ValueTask<TSource> SingleOrDefaultAsync<TSource>(
68+
this IAsyncEnumerable<TSource> source,
69+
CancellationToken cancellationToken = default)
70+
{
71+
await using var e = source.GetAsyncEnumerator(cancellationToken);
72+
73+
if (!await e.MoveNextAsync())
74+
{
75+
return default;
76+
}
77+
78+
TSource result = e.Current;
79+
if (await e.MoveNextAsync())
80+
{
81+
throw new InvalidOperationException("Sequence contains more than one element");
82+
}
83+
84+
return result;
85+
}
86+
87+
public static async IAsyncEnumerable<TResult> Select<TSource, TResult>(
88+
this IAsyncEnumerable<TSource> source,
89+
Func<TSource, TResult> selector)
90+
{
91+
await foreach (TSource element in source)
92+
{
93+
yield return selector(element);
94+
}
95+
}
96+
97+
public static async ValueTask<List<TSource>> ToListAsync<TSource>(
98+
this IAsyncEnumerable<TSource> source,
99+
CancellationToken cancellationToken = default)
100+
{
101+
List<TSource> list = [];
102+
await foreach (TSource element in source.WithCancellation(cancellationToken))
103+
{
104+
list.Add(element);
105+
}
106+
107+
return list;
108+
}
109+
}
110+
#endif

src/Cassandra/Mapping/AppliedInfo.cs

+6-1
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
//
1616

1717
using System.Linq;
18+
using System.Threading.Tasks;
1819

1920
namespace Cassandra.Mapping
2021
{
@@ -54,9 +55,13 @@ public AppliedInfo(T existing)
5455
/// <summary>
5556
/// Adapts a LWT RowSet and returns a new AppliedInfo
5657
/// </summary>
57-
internal static AppliedInfo<T> FromRowSet(MapperFactory mapperFactory, string cql, RowSet rs)
58+
internal static async Task<AppliedInfo<T>> FromRowSetAsync(MapperFactory mapperFactory, string cql, RowSet rs)
5859
{
60+
#if NET8_0_OR_GREATER
61+
var row = await rs.FirstOrDefaultAsync();
62+
#else
5963
var row = rs.FirstOrDefault();
64+
#endif
6065
const string appliedColumn = "[applied]";
6166
if (row == null || row.GetColumn(appliedColumn) == null || row.GetValue<bool>(appliedColumn))
6267
{

src/Cassandra/Mapping/Mapper.cs

+64-14
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
using System;
1818
using System.Collections.Generic;
1919
using System.Linq;
20+
using System.Threading;
2021
using System.Threading.Tasks;
2122

2223
using Cassandra.Mapping.Statements;
@@ -71,11 +72,11 @@ internal Mapper(ISession session, MapperFactory mapperFactory, StatementFactory
7172
/// <summary>
7273
/// Executes asynchronously and uses the delegate to adapt the RowSet into the return value.
7374
/// </summary>
74-
private async Task<TResult> ExecuteAsyncAndAdapt<TResult>(Cql cql, Func<Statement, RowSet, TResult> adaptation)
75+
private async Task<TResult> ExecuteAsyncAndAdapt<TResult>(Cql cql, Func<Statement, RowSet, Task<TResult>> adaptation)
7576
{
7677
var stmt = await _statementFactory.GetStatementAsync(_session, cql).ConfigureAwait(false);
7778
var rs = await ExecuteStatementAsync(stmt, cql.ExecutionProfile).ConfigureAwait(false);
78-
return adaptation(stmt, rs);
79+
return await adaptation(stmt, rs);
7980
}
8081

8182
/// <inheritdoc />
@@ -98,9 +99,37 @@ public Task<IEnumerable<T>> FetchAsync<T>(Cql cql)
9899
return ExecuteAsyncAndAdapt(cql, (s, rs) =>
99100
{
100101
var mapper = _mapperFactory.GetMapper<T>(cql.Statement, rs);
101-
return rs.Select(mapper);
102+
return Task.FromResult(Enumerable.Select(rs, mapper));
102103
});
103104
}
105+
#if NET8_0_OR_GREATER
106+
107+
/// <inheritdoc />
108+
public IAsyncEnumerable<T> FetchAsAsyncEnumerable<T>(CqlQueryOptions options = null)
109+
{
110+
return FetchAsAsyncEnumerable<T>(Cql.New(string.Empty, [], options ?? CqlQueryOptions.None));
111+
}
112+
113+
/// <inheritdoc />
114+
public IAsyncEnumerable<T> FetchAsAsyncEnumerable<T>(string cql, params object[] args)
115+
{
116+
return FetchAsAsyncEnumerable<T>(Cql.New(cql, args, CqlQueryOptions.None));
117+
}
118+
119+
/// <inheritdoc />
120+
public async IAsyncEnumerable<T> FetchAsAsyncEnumerable<T>(Cql cql)
121+
{
122+
//Use ExecuteAsyncAndAdapt with a delegate to handle the adaptation from RowSet to IEnumerable<T>
123+
_cqlGenerator.AddSelect<T>(cql);
124+
var stmt = await _statementFactory.GetStatementAsync(_session, cql).ConfigureAwait(false);
125+
var rs = await ExecuteStatementAsync(stmt, cql.ExecutionProfile).ConfigureAwait(false);
126+
var mapper = _mapperFactory.GetMapper<T>(cql.Statement, rs);
127+
await foreach (var row in rs)
128+
{
129+
yield return mapper(row);
130+
}
131+
}
132+
#endif
104133

105134
/// <inheritdoc />
106135
public Task<IPage<T>> FetchPageAsync<T>(Cql cql)
@@ -111,10 +140,14 @@ public Task<IPage<T>> FetchPageAsync<T>(Cql cql)
111140
}
112141
cql.AutoPage = false;
113142
_cqlGenerator.AddSelect<T>(cql);
114-
return ExecuteAsyncAndAdapt<IPage<T>>(cql, (stmt, rs) =>
143+
return ExecuteAsyncAndAdapt<IPage<T>>(cql, async (stmt, rs) =>
115144
{
116145
var mapper = _mapperFactory.GetMapper<T>(cql.Statement, rs);
117-
return new Page<T>(rs.Select(mapper), stmt.PagingState, rs.PagingState);
146+
#if NET8_0_OR_GREATER
147+
return new Page<T>(await AsyncEnumerable.Select(rs, mapper).ToListAsync(), stmt.PagingState, rs.PagingState);
148+
#else
149+
return new Page<T>(Enumerable.Select(rs, mapper).ToList(), stmt.PagingState, rs.PagingState);
150+
#endif
118151
});
119152
}
120153

@@ -140,10 +173,15 @@ public Task<T> SingleAsync<T>(string cql, params object[] args)
140173
public Task<T> SingleAsync<T>(Cql cql)
141174
{
142175
_cqlGenerator.AddSelect<T>(cql);
143-
return ExecuteAsyncAndAdapt(cql, (s, rs) =>
176+
return ExecuteAsyncAndAdapt(cql, async (s, rs) =>
144177
{
178+
#if NET8_0_OR_GREATER
179+
var row = await rs.SingleAsync();
180+
#else
181+
var row = rs.Single();
182+
#endif
145183
var mapper = _mapperFactory.GetMapper<T>(cql.Statement, rs);
146-
return mapper(rs.Single());
184+
return mapper(row);
147185
});
148186
}
149187

@@ -157,9 +195,13 @@ public Task<T> SingleOrDefaultAsync<T>(string cql, params object[] args)
157195
public Task<T> SingleOrDefaultAsync<T>(Cql cql)
158196
{
159197
_cqlGenerator.AddSelect<T>(cql);
160-
return ExecuteAsyncAndAdapt(cql, (s, rs) =>
198+
return ExecuteAsyncAndAdapt(cql, async (s, rs) =>
161199
{
200+
#if NET8_0_OR_GREATER
201+
var row = await rs.SingleOrDefaultAsync();
202+
#else
162203
var row = rs.SingleOrDefault();
204+
#endif
163205
// Map to return type
164206
if (row == null)
165207
{
@@ -180,9 +222,13 @@ public Task<T> FirstAsync<T>(string cql, params object[] args)
180222
public Task<T> FirstAsync<T>(Cql cql)
181223
{
182224
_cqlGenerator.AddSelect<T>(cql);
183-
return ExecuteAsyncAndAdapt(cql, (s, rs) =>
225+
return ExecuteAsyncAndAdapt(cql, async (s, rs) =>
184226
{
227+
#if NET8_0_OR_GREATER
228+
var row = await rs.FirstAsync();
229+
#else
185230
var row = rs.First();
231+
#endif
186232
// Map to return type
187233
var mapper = _mapperFactory.GetMapper<T>(cql.Statement, rs);
188234
return mapper(row);
@@ -199,9 +245,13 @@ public Task<T> FirstOrDefaultAsync<T>(string cql, params object[] args)
199245
public Task<T> FirstOrDefaultAsync<T>(Cql cql)
200246
{
201247
_cqlGenerator.AddSelect<T>(cql);
202-
return ExecuteAsyncAndAdapt(cql, (s, rs) =>
248+
return ExecuteAsyncAndAdapt(cql, async (s, rs) =>
203249
{
250+
#if NET8_0_OR_GREATER
251+
var row = await rs.FirstOrDefaultAsync();
252+
#else
204253
var row = rs.FirstOrDefault();
254+
#endif
205255
// Map to return type
206256
if (row == null)
207257
{
@@ -311,7 +361,7 @@ public Task<AppliedInfo<T>> InsertIfNotExistsAsync<T>(T poco, string executionPr
311361

312362
return ExecuteAsyncAndAdapt(
313363
cqlInstance,
314-
(stmt, rs) => AppliedInfo<T>.FromRowSet(_mapperFactory, cql, rs));
364+
(stmt, rs) => AppliedInfo<T>.FromRowSetAsync(_mapperFactory, cql, rs));
315365
}
316366

317367
/// <inheritdoc />
@@ -360,7 +410,7 @@ public Task<AppliedInfo<T>> UpdateIfAsync<T>(string cql, params object[] args)
360410
public Task<AppliedInfo<T>> UpdateIfAsync<T>(Cql cql)
361411
{
362412
_cqlGenerator.PrependUpdate<T>(cql);
363-
return ExecuteAsyncAndAdapt(cql, (stmt, rs) => AppliedInfo<T>.FromRowSet(_mapperFactory, cql.Statement, rs));
413+
return ExecuteAsyncAndAdapt(cql, (stmt, rs) => AppliedInfo<T>.FromRowSetAsync(_mapperFactory, cql.Statement, rs));
364414
}
365415

366416
/// <inheritdoc />
@@ -492,7 +542,7 @@ public Task<AppliedInfo<T>> DeleteIfAsync<T>(string cql, params object[] args)
492542
public Task<AppliedInfo<T>> DeleteIfAsync<T>(Cql cql)
493543
{
494544
_cqlGenerator.PrependDelete<T>(cql);
495-
return ExecuteAsyncAndAdapt(cql, (stmt, rs) => AppliedInfo<T>.FromRowSet(_mapperFactory, cql.Statement, rs));
545+
return ExecuteAsyncAndAdapt(cql, (stmt, rs) => AppliedInfo<T>.FromRowSetAsync(_mapperFactory, cql.Statement, rs));
496546
}
497547

498548
/// <inheritdoc />
@@ -788,7 +838,7 @@ public async Task<AppliedInfo<T>> ExecuteConditionalAsync<T>(ICqlBatch batch, st
788838
//Use the concatenation of cql strings as hash for the mapper
789839
var cqlString = string.Join(";", batch.Statements.Select(s => s.Statement));
790840
var rs = await ExecuteStatementAsync(batchStatement, executionProfile).ConfigureAwait(false);
791-
return AppliedInfo<T>.FromRowSet(_mapperFactory, cqlString, rs);
841+
return await AppliedInfo<T>.FromRowSetAsync(_mapperFactory, cqlString, rs);
792842
}
793843

794844
/// <inheritdoc />

src/Cassandra/Mapping/Page.cs

+2-2
Original file line numberDiff line numberDiff line change
@@ -38,9 +38,9 @@ public bool IsReadOnly
3838
get { return true; }
3939
}
4040

41-
internal Page(IEnumerable<T> items, byte[] currentPagingState, byte[] pagingState)
41+
internal Page(List<T> items, byte[] currentPagingState, byte[] pagingState)
4242
{
43-
_list = new List<T>(items);
43+
_list = items;
4444
CurrentPagingState = currentPagingState;
4545
PagingState = pagingState;
4646
}

0 commit comments

Comments
 (0)