Skip to content

Commit

Permalink
Fix arguments and types for wait all
Browse files Browse the repository at this point in the history
  • Loading branch information
MattEdwardsWaggleBee committed Dec 2, 2024
1 parent ea7729d commit c5d8ca8
Show file tree
Hide file tree
Showing 3 changed files with 25 additions and 60 deletions.
2 changes: 0 additions & 2 deletions src/Hyperbee.Pipeline/Binders/HookBinder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -41,15 +41,13 @@ await middleware(
var arg = Parameter( typeof( TInput ), "arg" );
var nextExpression = Lambda<FunctionAsync<TOutput, TInput>>(
BlockAsync(
[function],
Await( Invoke( Middleware, ctx, arg, function ), configureAwait: false )
),
parameters: [ctx, arg]
);

return Lambda<MiddlewareAsync<TInput, TOutput>>(
BlockAsync(
[context, argument],
Await( Invoke( middleware, context, argument, nextExpression ), configureAwait: false )
),
parameters: [context, argument, function] );
Expand Down
80 changes: 23 additions & 57 deletions src/Hyperbee.Pipeline/Binders/WaitAllBlockBinder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ await items.ForEachAsync( async item =>
*/

var results = Variable( typeof( WaitAllResult[] ), "results" );
var result = Variable( typeof( TNext ), "result" );
var result = Variable( typeof( TOutput ), "result" );
var innerContext = Variable( typeof( IPipelineContext ), "innerContext" );

var indexedItem = Parameter( typeof( (FunctionAsync<TOutput, object>, int) ), "indexedItem" );
Expand All @@ -114,9 +114,8 @@ await items.ForEachAsync( async item =>

Assign( innerContext, Call( context, methodInfo, [Constant( false )] ) ),

Assign( result, Await( ProcessStatementAsync<TNext>( item, innerContext, nextArgument, "NAME" ), configureAwait: false ) ),
Assign( result, Await( ProcessStatementAsync( item, innerContext, Convert( nextArgument, typeof( TOutput ) ), "NAME" ), configureAwait: false ) ),
Assign( ArrayAccess( results, index ), New( typeof( WaitAllResult ).GetConstructors()[0], innerContext, Convert( result, typeof( object ) ) ) )

),
parameters: [indexedItem]
);
Expand All @@ -125,11 +124,10 @@ await items.ForEachAsync( async item =>
var innerForEach = ForEachAsync(
items,
forEachBody,
Constant( Environment.ProcessorCount ) ); //Constant( 1 )
Constant( Environment.ProcessorCount ) );

var reducerResult = Variable( typeof( TNext ), "reducerResult" );

// TODO: reducerResult shouldn't be required, using/block should return last expression
return BlockAsync(
[results, reducerResult],
Using( //using var _ = contextControl.CreateFrame( context, Configure, frameName );
Expand All @@ -139,10 +137,9 @@ await items.ForEachAsync( async item =>
Assign( results, NewArrayBounds( typeof( WaitAllResult ), ArrayLength( nexts ) ) ),
Assign( items, SelectIndexItem<FunctionAsync<TOutput, object>>( nexts ) ),

//Invoke( LoggerExpression.Log( "forEachBody.items" ), Convert( items, typeof( object ) ) ),

Await( innerForEach, configureAwait: false ),
Assign( reducerResult, Invoke( reducer, context, nextArgument, results ) )

Assign( reducerResult, Convert( Invoke( reducer, context, nextArgument, results ), typeof( TNext ) ) )
)
),
reducerResult
Expand Down Expand Up @@ -181,7 +178,7 @@ private Expression SelectIndexItem<T>( Expression nexts )
);
}

protected virtual Expression ProcessStatementAsync<TNext>(
protected virtual Expression ProcessStatementAsync(
Expression nextFunction,
ParameterExpression context,
Expression nextArgument,
Expand All @@ -203,9 +200,11 @@ protected virtual Expression ProcessStatementAsync<TNext>(
if ( Middleware == null )
return BlockAsync(
Convert(
Await( Invoke( nextFunction, context, nextArgument ), configureAwait: false ),
typeof( TNext ) )
);
Await(
Invoke( nextFunction, context, nextArgument ),
configureAwait: false ),
typeof( TOutput ) )
);

// async ( ctx, arg ) => await nextFunction( ctx, (TOutput) arg ).ConfigureAwait( false )
var ctx = Parameter( typeof( IPipelineContext ), "ctx" );
Expand All @@ -228,7 +227,7 @@ protected virtual Expression ProcessStatementAsync<TNext>(
middlewareNext
),
configureAwait: false ),
typeof( TNext ) )
typeof( TOutput ) )
);
}

Expand Down Expand Up @@ -267,54 +266,42 @@ public static Expression ForEachAsync<TSource>(
var getPartitionsCall = Call(
createPartitionerCall,
getPartitionsMethod,
partitionCount ); // returns IList<IEnumerator<(TSource, int)>>
partitionCount );

//var partition = Variable( typeof( IEnumerator<(TSource, int)> ), "partition" );
var temp = Variable( typeof( int ), "_nh_temp" );
var temp = Variable( typeof( int ), "temp" );
var counter = Variable( typeof( int ), "counter" );

var partitionList = Variable( typeof( IList<IEnumerator<(TSource, int)>> ), "partitionList" );
var partitionAccess = MakeIndex( partitionList, typeof( IList<IEnumerator<(TSource, int)>> ).GetProperty( "Item" ), [temp] );
var enumerator = Variable( typeof( IEnumerator<(TSource, int)> ), "enumerator" );

// Task.Run( async () => {
// using var enumerator = partition;
// using var enumerator = partitionList[temp];
// while ( enumerator.MoveNext() ) await function( enumerator.Current )
// }
var moveNext = Call( enumerator, typeof( IEnumerator ).GetMethod( nameof( IEnumerator.MoveNext ) ) );
var current = Property( enumerator, nameof( IEnumerator<TSource>.Current ) );
var taskRun = Call(
typeof( Task ).GetMethod( nameof( Task.Run ), [typeof( Func<Task> )] )!,
Lambda<Func<Task>>(
BlockAsync( // SM<11>
BlockAsync(
[enumerator],
Assign( enumerator, partitionAccess ), // var enumerator = partitionList[temp]
Assign( enumerator, partitionAccess ),

//Invoke( LoggerExpression.Log( "taskRun.enumerator" ), Convert( enumerator, typeof( object ) ) ),
//Invoke( LoggerExpression.Log( "taskRun.counter" ), Convert( counter, typeof( object ) ) ),
//Invoke( LoggerExpression.Log( "taskRun.temp" ), Convert( temp, typeof( object ) ) ),

//Using( Convert( enumerator, typeof( IDisposable ) ), // TODO: Shouldn't need to convert
Using( Convert( enumerator, typeof( IDisposable ) ),
While(
moveNext,
Await( Invoke( function, current ), configureAwait: false )
)
//)
)
)
)
);

/*
* List<Task> tasks = new List<Task>();
* foreach( var partition in partitions )
* {
* tasks.Add( Task.Run( async () => { ... } ) ) );
* }
* for( int counter = 0; counter < 2: counter++ )
* for( int counter = 0; counter < partitionList.Count: counter++ )
* {
*
* await..
*
* var temp = counter; ---
* tasks.Add( Task.Run( async () => { temp ... } ) ) );
* }
Expand All @@ -324,48 +311,27 @@ public static Expression ForEachAsync<TSource>(
var addTask = Block(
[temp],
Assign( temp, counter ),
//Invoke( LoggerExpression.Log( "addtask.temp" ), Convert( temp, typeof( object ) ) ),
//Invoke( LoggerExpression.Log( "addtask.counter" ), Convert( counter, typeof( object ) ) ),
Call( tasks, typeof( List<Task> ).GetMethod( nameof( List<Task>.Add ) )!, taskRun )
);


var counterInit = Assign( counter, Constant( 0 ) );

var condition = LessThan( counter, Constant( 2 ) ); // TODO: partitionList.Count
var condition = LessThan( counter, Property( Convert( partitionList, typeof( ICollection ) ), "Count" ) );
var iteration = PostIncrementAssign( counter );

var forExpr = For( counterInit, condition, iteration, addTask );


//var foreachPartition = ForEach( getPartitionsCall, partition, addTask );

// await Task.WhenAll( tasks );
var taskWhenAll = Await( Call(
typeof( Task ).GetMethod( nameof( Task.WhenAll ), [typeof( IEnumerable<Task> )] ),
tasks )
);

return BlockAsync( // SM<10> // temp, counter, partitionList, enumerator, tasks
return BlockAsync(
[tasks, counter, partitionList],
initalizeTasks,
Assign( partitionList, getPartitionsCall ),
//Invoke( LoggerExpression.Log( "forEachAsync.partitionList" ), Convert( partitionList, typeof( object ) ) ),
forExpr, //foreachPartition,
forExpr,
taskWhenAll
);
}
}

public static class LoggerExpression
{
public static Expression<Action<object>> Log( string message )
{
return arg1 => Log( message, arg1 );
}

public static void Log( string message, object arg1 )
{
Console.WriteLine( $"{message} value: {arg1}" );
}
}
3 changes: 2 additions & 1 deletion src/Hyperbee.Pipeline/Binders/WrapBinder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,8 @@ public Expression<FunctionAsync<TInput, TOutput>> Bind( Expression<FunctionAsync
Using( //using var _ = contextControl.CreateFrame( context, Configure, frameName );
ContextImplExtensions.CreateFrameExpression( context, Configure, frameName ),
Invoke( next, context, argument )
) );
),
parameters: [context, argument] );
}

var ctx = Parameter( typeof( IPipelineContext ), "ctx" );
Expand Down

0 comments on commit c5d8ca8

Please sign in to comment.