Using TChan with Timeout

Use registerDelay, an STM function, to signal a TVar when the timeout is reached. You can then use the orElse function or the Alternative operator <|> to select between the next TChan value or the timeout.

import Control.Applicative
import Control.Monad
import Control.Concurrent
import Control.Concurrent.STM
import System.Random

-- write random values after a random delay
packetWriter :: Int -> TChan Int -> IO ()
packetWriter maxDelay chan = do
  let xs = randomRs (10000 :: Int, maxDelay + 50000) (mkStdGen 24036583)
  forM_ xs $ \ x -> do
    threadDelay x
    atomically $ writeTChan chan x

-- block (retry) until the delay TVar is set to True
fini :: TVar Bool -> STM ()
fini = check <=< readTVar

-- Read the next value from a TChan or timeout
readTChanTimeout :: Int -> TChan a -> IO (Maybe a)
readTChanTimeout timeoutAfter pktChannel = do
  delay <- registerDelay timeoutAfter
  atomically $
        Just <$> readTChan pktChannel
    <|> Nothing <$ fini delay

-- | Print packets until a timeout is reached
readLoop :: Show a => Int -> TChan a -> IO ()
readLoop timeoutAfter pktChannel = do
  res <- readTChanTimeout timeoutAfter pktChannel
  case res of
    Nothing -> putStrLn "timeout"
    Just val -> do
      putStrLn $ "packet: " ++ show val
      readLoop timeoutAfter pktChannel

main :: IO ()
main = do
  let timeoutAfter = 1000000

  -- spin up a packet writer simulation
  pktChannel <- newTChanIO
  tid <- forkIO $ packetWriter timeoutAfter pktChannel

  readLoop timeoutAfter pktChannel

  killThread tid

The thumb rule of concurrency is: if adding a sleep in some point inside an IO action matters, your program is not safe.

To understand why the code timeout 1000000 $ atomically $ readTChan pktChannel does not work, consider the following alternative implementation of atomically:

atomically' :: STM a -> IO a
atomically' action = do
  result <- atomically action
  threadDelay someTimeAmount
  return result

The above is equal to atomically, but for an extra innocent delay. Now it is easy to see that if timeout kills the thread during the threadDelay, the atomic action has completed (consuming a message from the channel), yet timeout will return Nothing.

A simple fix to timeout n $ atomically ... could be the following

smartTimeout :: Int -> STM a -> IO (Maybe a)
smartTimeout n action = do
   v <- atomically $ newEmptyTMvar
   _ <- timeout n $ atomically $ do
          result <- action
          putTMvar v result
   atomically $ tryTakeTMvar v

The above uses an extra transactional variable v to do the trick. The result value of the action is stored into v inside the same atomic block in which the action is run. The return value of timeout is not trusted, since it does not tell us if action was run or not. After that, we check the TMVar v, which will be full if and only if action was run.