Skip to content

Commit a9e4a96

Browse files
committed
Implement differenceBySorted API
Fix after rebase Remove UnionBySorted fix Werror Add benchmarek Fix review comments
1 parent 5e7fb87 commit a9e4a96

File tree

5 files changed

+123
-6
lines changed

5 files changed

+123
-6
lines changed

benchmark/Streamly/Benchmark/Prelude/Serial/NestedStream.hs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -461,6 +461,11 @@ o_n_heap_buffering value =
461461
$ joinWith Internal.intersectBy sqrtVal
462462
, benchIOSrc1 "intersectBySorted"
463463
$ joinMapWith (Internal.intersectBySorted compare) halfVal
464+
-- XXX It hangs forever
465+
--, benchIOSrc1 "differenceBy"
466+
-- $ joinMapWith (Internal.differenceBy (==)) halfVal
467+
, benchIOSrc1 "differenceBySorted"
468+
$ joinMapWith (Internal.differenceBySorted compare) sqrtVal
464469
]
465470
]
466471

core/src/Streamly/Internal/Data/Stream/StreamD/Nesting.hs

Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -143,6 +143,7 @@ module Streamly.Internal.Data.Stream.StreamD.Nesting
143143
, splitInnerBy
144144
, splitInnerBySuffix
145145
, intersectBySorted
146+
, differenceBySorted
146147
)
147148
where
148149

@@ -487,6 +488,25 @@ mergeBy cmp = mergeByM (\a b -> return $ cmp a b)
487488
-- Intersection of sorted streams
488489
-------------------------------------------------------------------------------
489490

491+
data StreamEmptyNess =
492+
LeftEmpty
493+
| RightEmpty
494+
| BothEmpty
495+
| NoneEmpty
496+
deriving (Eq, Show)
497+
498+
data RunOrder =
499+
LeftRun
500+
| RightRun
501+
| CompareRun
502+
| CompareDupRun
503+
| FastFarwardRun
504+
| RightDupRun
505+
| BuffPrepare
506+
| BuffPair
507+
| BuffReset
508+
deriving (Eq, Show)
509+
490510
-- Assuming the streams are sorted in ascending order
491511
{-# INLINE_NORMAL intersectBySorted #-}
492512
intersectBySorted :: Monad m
@@ -2466,3 +2486,67 @@ splitInnerBySuffix splitter joiner (Stream step1 state1) =
24662486

24672487
step _ (SplitYielding x next) = return $ Yield x next
24682488
step _ SplitFinishing = return Stop
2489+
2490+
-------------------------------------------------------------------------------
2491+
-- Difference of sorted streams -----------------------------------------------
2492+
-------------------------------------------------------------------------------
2493+
{-# INLINE_NORMAL differenceBySorted #-}
2494+
differenceBySorted :: (Monad m) =>
2495+
(a -> a -> Ordering) -> Stream m a -> Stream m a -> Stream m a
2496+
differenceBySorted cmp (Stream stepa ta) (Stream stepb tb) =
2497+
Stream step (Just ta, Just tb, Nothing, Nothing, Nothing)
2498+
2499+
where
2500+
{-# INLINE_LATE step #-}
2501+
2502+
-- one of the values is missing, and the corresponding stream is running
2503+
step gst (Just sa, sb, Nothing, b, Nothing) = do
2504+
r <- stepa gst sa
2505+
return $ case r of
2506+
Yield a sa' -> Skip (Just sa', sb, Just a, b, Nothing)
2507+
Skip sa' -> Skip (Just sa', sb, Nothing, b, Nothing)
2508+
Stop -> Skip (Nothing, sb, Nothing, b, Nothing)
2509+
2510+
step gst (sa, Just sb, a, Nothing, Nothing) = do
2511+
r <- stepb gst sb
2512+
return $ case r of
2513+
Yield b sb' -> Skip (sa, Just sb', a, Just b, Nothing)
2514+
Skip sb' -> Skip (sa, Just sb', a, Nothing, Nothing)
2515+
Stop -> Skip (sa, Nothing, a, Nothing, Nothing)
2516+
2517+
-- Matching element
2518+
step gst (Just sa, Just sb, Nothing, _, Just _) = do
2519+
r1 <- stepa gst sa
2520+
r2 <- stepb gst sb
2521+
return $ case r1 of
2522+
Yield a sa' ->
2523+
case r2 of
2524+
Yield c sb' ->
2525+
Skip (Just sa', Just sb', Just a, Just c, Nothing)
2526+
Skip sb' ->
2527+
Skip (Just sa', Just sb', Just a, Just a, Nothing)
2528+
Stop ->
2529+
Yield a (Just sa', Just sb, Nothing, Nothing, Just a)
2530+
Skip sa' ->
2531+
case r2 of
2532+
Yield c sb' ->
2533+
Skip (Just sa', Just sb', Just c, Just c, Nothing)
2534+
Skip sb' ->
2535+
Skip (Just sa', Just sb', Nothing, Nothing, Nothing)
2536+
Stop ->
2537+
Stop
2538+
Stop ->
2539+
Stop
2540+
2541+
-- both the values are available
2542+
step _ (sa, sb, Just a, Just b, Nothing) = do
2543+
let res = cmp a b
2544+
return $ case res of
2545+
GT -> Skip (sa, sb, Just a, Nothing, Nothing)
2546+
LT -> Yield a (sa, sb, Nothing, Just b, Nothing)
2547+
EQ -> Skip (sa, sb, Nothing, Just b, Just b)
2548+
2549+
-- one of the values is missing, corresponding stream is done
2550+
step _ (sa, Nothing, Just a, Nothing, Nothing) =
2551+
return $ Yield a (sa, Nothing, Nothing, Nothing , Nothing)
2552+
step _ (_, _, _, _, _) = return Stop

src/Streamly/Internal/Data/Stream/IsStream/Top.hs

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ module Streamly.Internal.Data.Stream.IsStream.Top
3030
, intersectBy
3131
, intersectBySorted
3232
, differenceBy
33-
, mergeDifferenceBy
33+
, differenceBySorted
3434
, unionBy
3535
, mergeUnionBy
3636

@@ -632,11 +632,14 @@ differenceBy eq s1 s2 =
632632
--
633633
-- Space: O(1)
634634
--
635-
-- /Unimplemented/
636-
{-# INLINE mergeDifferenceBy #-}
637-
mergeDifferenceBy :: -- (IsStream t, Monad m) =>
635+
-- /Pre-release/
636+
{-# INLINE differenceBySorted #-}
637+
differenceBySorted :: (IsStream t, MonadIO m) =>
638638
(a -> a -> Ordering) -> t m a -> t m a -> t m a
639-
mergeDifferenceBy _eq _s1 _s2 = undefined
639+
differenceBySorted eq s1 =
640+
IsStream.fromStreamD
641+
. StreamD.differenceBySorted eq (IsStream.toStreamD s1)
642+
. IsStream.toStreamD
640643

641644
-- | This is essentially an append operation that appends all the extra
642645
-- occurrences of elements from the second stream that are not already present

test/Streamly/Test/Prelude/Top.hs

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
module Main (main) where
22

3-
import Data.List (elem, intersect, nub, sort)
3+
import Data.List (elem, intersect, nub, sort, (\\))
44
import Data.Maybe (isNothing)
55
import Streamly.Prelude (SerialT)
66
import Test.QuickCheck
@@ -196,6 +196,25 @@ intersectBy srt intersectFunc cmp =
196196
let v2 = ls0 `intersect` ls1
197197
assert (sort v1 == sort v2)
198198

199+
differenceBySorted :: Property
200+
differenceBySorted =
201+
forAll (listOf (chooseInt (min_value, max_value))) $ \ls0 ->
202+
forAll (listOf (chooseInt (min_value, max_value))) $ \ls1 ->
203+
monadicIO $ action (sort ls0) (sort ls1)
204+
205+
where
206+
207+
action ls0 ls1 = do
208+
v1 <-
209+
run
210+
$ S.toList
211+
$ Top.differenceBySorted
212+
compare
213+
(S.fromList ls0)
214+
(S.fromList ls1)
215+
let v2 = ls0 \\ ls1
216+
assert (v1 == sort v2)
217+
199218
-------------------------------------------------------------------------------
200219
-- Main
201220
-------------------------------------------------------------------------------
@@ -219,3 +238,4 @@ main = hspec $ do
219238
(intersectBy id Top.intersectBy (==))
220239
prop "intersectBySorted"
221240
(intersectBy sort Top.intersectBySorted compare)
241+
prop "differenceBySorted" Main.differenceBySorted

test/streamly-tests.cabal

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -390,6 +390,11 @@ test-suite Prelude.Top
390390
type: exitcode-stdio-1.0
391391
main-is: Streamly/Test/Prelude/Top.hs
392392

393+
test-suite Data.Stream.Top
394+
import: test-options
395+
type: exitcode-stdio-1.0
396+
main-is: Streamly/Test/Data/Stream/Top.hs
397+
393398
test-suite Prelude.WAsync
394399
import: test-options
395400
type: exitcode-stdio-1.0

0 commit comments

Comments
 (0)