{-# LANGUAGE NamedFieldPuns #-}
{-# LANGUAGE Safe #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE TupleSections #-}
module BroadcastChan (
BroadcastChan
, Direction(..)
, In
, Out
, newBroadcastChan
, newBChanListener
, readBChan
, writeBChan
, closeBChan
, isClosedBChan
, getBChanContents
, Action(..)
, Handler(..)
, parMapM_
, parFoldMap
, parFoldMapM
, foldBChan
, foldBChanM
) where
import Control.Exception
(SomeException(..), mask, throwIO, try, uninterruptibleMask_)
import Control.Monad (liftM)
import Control.Monad.IO.Unlift
(MonadUnliftIO, UnliftIO(..), askUnliftIO, withRunInIO)
import Data.Foldable as F (Foldable(..), foldlM, forM_)
import BroadcastChan.Extra
import BroadcastChan.Internal
bracketOnError :: MonadUnliftIO m => IO a -> (a -> IO b) -> m c -> m c
bracketOnError :: forall (m :: * -> *) a b c.
MonadUnliftIO m =>
IO a -> (a -> IO b) -> m c -> m c
bracketOnError IO a
before a -> IO b
after m c
thing = ((forall a. m a -> IO a) -> IO c) -> m c
forall b. ((forall a. m a -> IO a) -> IO b) -> m b
forall (m :: * -> *) b.
MonadUnliftIO m =>
((forall a. m a -> IO a) -> IO b) -> m b
withRunInIO (((forall a. m a -> IO a) -> IO c) -> m c)
-> ((forall a. m a -> IO a) -> IO c) -> m c
forall a b. (a -> b) -> a -> b
$ \forall a. m a -> IO a
run -> ((forall a. IO a -> IO a) -> IO c) -> IO c
forall b. ((forall a. IO a -> IO a) -> IO b) -> IO b
mask (((forall a. IO a -> IO a) -> IO c) -> IO c)
-> ((forall a. IO a -> IO a) -> IO c) -> IO c
forall a b. (a -> b) -> a -> b
$ \forall a. IO a -> IO a
restore -> do
x <- IO a
before
res1 <- try . restore . run $ thing
case res1 of
Left (SomeException e
exc) -> do
_ :: Either SomeException b <- IO b -> IO (Either SomeException b)
forall e a. Exception e => IO a -> IO (Either e a)
try (IO b -> IO (Either SomeException b))
-> (IO b -> IO b) -> IO b -> IO (Either SomeException b)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. IO b -> IO b
forall a. IO a -> IO a
uninterruptibleMask_ (IO b -> IO (Either SomeException b))
-> IO b -> IO (Either SomeException b)
forall a b. (a -> b) -> a -> b
$ a -> IO b
after a
x
throwIO exc
Right c
y -> c -> IO c
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return c
y
parMapM_
:: (F.Foldable f, MonadUnliftIO m)
=> Handler m a
-> Int
-> (a -> m ())
-> f a
-> m ()
parMapM_ :: forall (f :: * -> *) (m :: * -> *) a.
(Foldable f, MonadUnliftIO m) =>
Handler m a -> Int -> (a -> m ()) -> f a -> m ()
parMapM_ Handler m a
hndl Int
threads a -> m ()
workFun f a
input = do
UnliftIO runInIO <- m (UnliftIO m)
forall (m :: * -> *). MonadUnliftIO m => m (UnliftIO m)
askUnliftIO
Bracket{allocate,cleanup,action} <- runParallel_
(mapHandler runInIO hndl)
threads
(runInIO . workFun)
(forM_ input)
bracketOnError allocate cleanup action
parFoldMap
:: (F.Foldable f, MonadUnliftIO m)
=> Handler m a
-> Int
-> (a -> m b)
-> (r -> b -> r)
-> r
-> f a
-> m r
parFoldMap :: forall (f :: * -> *) (m :: * -> *) a b r.
(Foldable f, MonadUnliftIO m) =>
Handler m a
-> Int -> (a -> m b) -> (r -> b -> r) -> r -> f a -> m r
parFoldMap Handler m a
hndl Int
threads a -> m b
work r -> b -> r
f =
Handler m a
-> Int -> (a -> m b) -> (r -> b -> m r) -> r -> f a -> m r
forall a b (f :: * -> *) (m :: * -> *) r.
(Foldable f, MonadUnliftIO m) =>
Handler m a
-> Int -> (a -> m b) -> (r -> b -> m r) -> r -> f a -> m r
parFoldMapM Handler m a
hndl Int
threads a -> m b
work (\r
x b
y -> r -> m r
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (r -> b -> r
f r
x b
y))
parFoldMapM
:: forall a b f m r
. (F.Foldable f, MonadUnliftIO m)
=> Handler m a
-> Int
-> (a -> m b)
-> (r -> b -> m r)
-> r
-> f a
-> m r
parFoldMapM :: forall a b (f :: * -> *) (m :: * -> *) r.
(Foldable f, MonadUnliftIO m) =>
Handler m a
-> Int -> (a -> m b) -> (r -> b -> m r) -> r -> f a -> m r
parFoldMapM Handler m a
hndl Int
threads a -> m b
workFun r -> b -> m r
f r
z f a
input = do
UnliftIO runInIO <- m (UnliftIO m)
forall (m :: * -> *). MonadUnliftIO m => m (UnliftIO m)
askUnliftIO
Bracket{allocate,cleanup,action} <- runParallel
(Right f)
(mapHandler runInIO hndl)
threads
(runInIO . workFun)
body
bracketOnError allocate cleanup action
where
body :: (a -> m ()) -> (a -> m (Maybe b)) -> m r
body :: (a -> m ()) -> (a -> m (Maybe b)) -> m r
body a -> m ()
send a -> m (Maybe b)
sendRecv = (Int, r) -> r
forall a b. (a, b) -> b
snd ((Int, r) -> r) -> m (Int, r) -> m r
forall (m :: * -> *) a1 r. Monad m => (a1 -> r) -> m a1 -> m r
`liftM` ((Int, r) -> a -> m (Int, r)) -> (Int, r) -> f a -> m (Int, r)
forall (t :: * -> *) (m :: * -> *) b a.
(Foldable t, Monad m) =>
(b -> a -> m b) -> b -> t a -> m b
foldlM (Int, r) -> a -> m (Int, r)
wrappedFoldFun (Int
0, r
z) f a
input
where
wrappedFoldFun :: (Int, r) -> a -> m (Int, r)
wrappedFoldFun :: (Int, r) -> a -> m (Int, r)
wrappedFoldFun (Int
i, r
x) a
a
| Int
i Int -> Int -> Bool
forall a. Eq a => a -> a -> Bool
== Int
threads = (r -> (Int, r)) -> m r -> m (Int, r)
forall (m :: * -> *) a1 r. Monad m => (a1 -> r) -> m a1 -> m r
liftM (Int
i,) (m r -> m (Int, r)) -> m r -> m (Int, r)
forall a b. (a -> b) -> a -> b
$ a -> m (Maybe b)
sendRecv a
a m (Maybe b) -> (Maybe b -> m r) -> m r
forall a b. m a -> (a -> m b) -> m b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= m r -> (b -> m r) -> Maybe b -> m r
forall b a. b -> (a -> b) -> Maybe a -> b
maybe (r -> m r
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return r
x) (r -> b -> m r
f r
x)
| Bool
otherwise = (Int, r) -> () -> (Int, r)
forall a b. a -> b -> a
const (Int
iInt -> Int -> Int
forall a. Num a => a -> a -> a
+Int
1, r
x) (() -> (Int, r)) -> m () -> m (Int, r)
forall (m :: * -> *) a1 r. Monad m => (a1 -> r) -> m a1 -> m r
`liftM` a -> m ()
send a
a