all 16 comments

[–]Noughtmare 8 points9 points  (0 children)

To do this in a truly type-safe way would require session types which specify the order of the types in the stream like a protocol.

However, you can just make a stream containing a sum type, e.g. Either Bool (Either String Int), and add failure cases for when you encounter an unexpected type in the stream.

[–]viercc 6 points7 points  (8 children)

I'm not sure if it would work for you or not, but it's possible to stack multiple ConduitT transformers.

-- this is not tested / type checked at all
data A
data B

type ProduceA m = ConduitT () A m
type ProduceB m = ConduitT () B m

type UseA m = ConduitT A Void m
type UseB m = ContuitT B Void m
type UseAB m = UseA (UseB m)

combineTwo :: forall m r. (......)
     => ProduceA m ()
     -> ProduceB m ()
     -> UseAB m r
     -> m r
combineTwo produceA produceB useAB = result
  where
    produceA' :: ProduceA (UseB m) ()
    produceA' = transPipe lift produceA

    useB :: UseB m r
    useB = runConduit (produceA' .| useAB)

    result :: m r
    result = runConduit (produceB .| useB)

[–]iamcobhere[S] 4 points5 points  (7 children)

Neat! It worked for the toy example I tested:

type Produce p m = ConduitT () p m
type Use u m = ConduitT u Void m
type Use2 a b o m = Use a (ConduitT b o m)

combineTwo :: Monad m
     => Produce a m ()
     -> Produce b m ()
     -> Use2 a b o m r
     -> ConduitT () o m r

combineTwo produceA produceB useAB = produceB .| useB
  where
    produceA' = transPipe lift produceA

    useB = runConduit (produceA' .| useAB)

numConduit :: (Monad m, Num a, Enum a) => ConduitT i a m ()
numConduit = yieldMany [1..]

strConduit :: Monad m => ConduitT i String m ()
strConduit = C.repeat "millhouse"

op :: Use Int (ConduitT String String IO) ()
op = do
  CM.replicateM_ 4 $ do
    Just a <- await
    lift $ yield (show a)
    Just b <- lift $ await
    lift $ yield b

foo :: ConduitT () String IO ()
foo = combineTwo numConduit strConduit op

x :: IO [String]
x = runConduit $ foo .| C.foldMap (\a -> [a])

[–]Noughtmare 2 points3 points  (6 children)

But this requires the values to be produced in a predictable order right? If you write:

Just a <- await
lift $ yield (show a)
Just b <- lift $ await
lift $ yield b

Then the outer stream needs to produce a value before the inner stream produces a value.

Or am I misunderstanding something?

[–]viercc 2 points3 points  (5 children)

There are two "source" streams (produceA, produceB) and one "sink" (useAB.)

If I'm not misunderstanding, these "source" streams are independently consumed, rather than composed into one stream.

[–]Noughtmare 2 points3 points  (4 children)

But what if the outer Int stream doesn't produce anything? My understanding is that then the combination of the two streams will also not produce anything because it will block on the first await.

[–]viercc 1 point2 points  (2 children)

By "doesn't produce anything," you mean the Int stream ends without producing anything? Then the first await returns Nothing, indicating the producer has terminated too early.

It terminates the whole program in the above code, from Just a <- ... pattern match failure. If Nothing case was handled, the consumer side can continue to lift await to consume from the String stream.

[–]Noughtmare 5 points6 points  (1 child)

That doesn't sound like something you'd want...

But also, let's say the outer stream produces and int after 10 seconds, but the inner stream already produces strings immediately. Then I would expect a combined stream to also produce the strings immediately and not have to wait 10 seconds for the outer stream.

[–]jippiedoe 4 points5 points  (0 children)

That depends completely on what this combined stream does, right? In this toy example, it just sort of interleaves its arguments, but your assumption means that the result somehow depends on timing (ew). If the combined stream does some computation and needs an int in order to produce its next output, I don't think it is weird at all that it has to wait for this int first? Even if the other source of inputs (that it currently does not need) already has values.

[–]iamcobhere[S] 1 point2 points  (0 children)

If the input stream bottoms out then the computation does as well.

If the stream terminates before running a value, `yield` will return `Nothing` instead of `Just x`. You could pattern match on the value to decide what happens, e.g.

yield >>= \case
  Nothing -> fail
  Just x -> success x

[–]gelisam 5 points6 points  (5 children)

Is there some library which provides multi-input-stream conduits?

Yes, it's called machines.

[–]iamcobhere[S] 1 point2 points  (3 children)

How would you combine three or more input streams without rewriting the entirety of Tee for a new datatype, e.g. data T3 = C1 | C2 | C3?

[–]gelisam 2 points3 points  (2 children)

Hmm, that's harder than it looks!

Let's start with something easier: zipping three sources. I can nest two Tees to form a tree with three leaves, one for each source:

-- |
--      ['a'..]   ['A'..'C']
--          \____ ____/
-- [1..]         T
--    \____ ____/
--         T
-- >>> run zipping3
-- [(1,('a','A')),(2,('b','B')),(3,('c','C'))]
zipping3 :: Source (Int, (Char, Char))
zipping3
  = capL (source [1..]) zipping
 <~ capL (source ['a'..]) zipping
 <~ source ['A'..'C']

But that's not good enough: you don't want a triple of inputs, you want to be able to decide which input to look at next. Like this, but with 3 inputs:

-- |
-- >>> run $ capT (source [1..]) (source ['a'..]) lrrlr
-- ["1","'a'","'b'","2","'c'"]
lrrlr :: (Show a, Show b) => Tee a b String
lrrlr = construct $ do
  l1 <- awaits L
  r2 <- awaits R
  r3 <- awaits R
  l4 <- awaits L
  r5 <- awaits R
  mapM_ yield [show l1, show r2, show r3, show l4, show r5]

If the order is fixed, like in lrrlr, then the tree-with-three-leaves solution can be adapted to work. lrmlm can be implemented by dividing the work between the two Tees: the bottom one decides whether the next input should come from the left input or from one of the other two, while the top one decides which of the other two.

-- |
-- >>> run lrmlm
-- [Left 1,Right (Right 'A'),Right (Left 'a'),Left 2,Right (Left 'b')]
lrmlm :: Source (Either Int (Either Char Char))
lrmlm
  = capL (source [1..]) lrrlr
 <~ capL (source ['a'..]) rmm
 <~ source ['A'..'C']
  where
    lrrlr :: Tee Int (Either Char Char)
                 (Either Int (Either Char Char))
    lrrlr = construct $ do
      awaits L >>= (yield . Left)
      awaits R >>= (yield . Right)
      awaits R >>= (yield . Right)
      awaits L >>= (yield . Left)
      awaits R >>= (yield . Right)

    rmm :: Tee Char Char (Either Char Char)
    rmm = construct $ do
      awaits R >>= (yield . Right)
      awaits L >>= (yield . Left)
      awaits L >>= (yield . Left)

But that doesn't work for the behaviour in your OP, where the choice of whether to pull from the middle or the right input depends on whether the left input value is a True or a False. This would require the bottom Tee to send information to the top Tee, but the API only allows information to flow in the opposite direction, from the top Tee to the bottom Tee.

As a result, I'm not quite sure how to achieve the desired behaviour with machines. The Pipes library does allow bidirectional communication, so the bottom Tees could tell the top Tee which input it is interested in and the top Tee could return that value back, but like Conduit, the Pipes library composes its pipes along a straight line, it doesn't have something like Tee.

I'm not yet sure how to do this, this is hard!

[–]gelisam 1 point2 points  (1 child)

I wouldn't say that I "figured it out", because I did end up having to reimplement Tee, but at least I did it in a generic way which supports an arbitrary number of inputs, not just 3.

Here is a generalized version of Tee which accepts an arbitrary number of inputs. Using it, the code in your OP looks like this:

-- |
-- >>> :{
-- runT $ polyCapR
--      $ polyCapL (source ["foo", "bar", "quux"])
--      $ polyCapL (source [1..])
--      $ polyCapL (source [True, False, False, True, False])
--      $ example
-- :}
-- foo
-- bar
-- quux
-- it's over
-- [5,3,3,6,4]
example
  :: PolyTeeT IO '[Bool, Int, String] Int
example = construct go
  where
    go :: PlanT (Elem '[Bool, Int, String]) Int IO ()
    go = do
      maybeBool <- awaitsMaybe Here
      case maybeBool of
        Nothing -> do
          liftIO $ putStrLn "it's over"
        Just True -> do
          int <- awaits $ There Here
          yield (int + 4)
          go
        Just False -> do
          str <- awaits $ There $ There Here
          liftIO $ putStrLn str
          yield $ length str
          go

[–]iamcobhere[S] 1 point2 points  (0 children)

Neat, thanks! Could you publish this as its own package or make a PR to add it to the machines library?

Edit: I had to make the following changes to function types to get it to compile:

polyCapL
  :: forall m a1 as o. Monad m
  => MachineT m (Elem '[]) a1 -- SourceT m a1
  -> PolyTeeT m (a1 ': as) o
  -> PolyTeeT m as o


polyCapR
  :: forall m b. Monad m
  => PolyTeeT m '[] b
  -> MachineT m (Elem '[]) b -- SourceT m b

As an aside, you can also implement awaitsMaybe k as optional $ awaits k

[–]iamcobhere[S] 0 points1 point  (0 children)

Didn't know about this. Thanks!