Loading

  1. Configuration
  2. Handler documentation and Hello World

UDP Plugin

HTTP Plugin is build on top of HTTP protocol. UDP is a low layer plugin, designed for really custom protocols.

For better performances, we strongly advise you to use SHM extension to build buffers for your incoming data over UDP.

TCP Text plugin and UDP plugin are really close: TCP Text plugin triggers WarpScript on \n in the stream. UDP plugin triggers WarpScript on the datagram end. The input of the macro is not a string, but a byte array (BYTES).

This plugin was designed for high rate data processing with a complex GTS model in memory...

Configuration

Your custom configurations can be stored in a new etc/conf.d/99-myWarp10.conf file. Your custom configurations will override all the predefined Warp 10 configuration.

// activate the UDP plugin
warp10.plugin.udp = io.warp10.plugins.udp.UDPWarp10Plugin
// Directory where UDP 'spec' files will be located
udp.dir = ${standalone.home}/UDP_plugin
// Period (in ms) at which to rescan 'udp.dir' for spec files changes (defaults to 60s)
udp.period = 5000

Create /opt/warp10/UDP_plugin, and make sure warp10 user has the rights to read it. You can create sub directories if you have several applications, the plugin will crawl them too.

Restart your Warp 10 instance to activate the plugin.

Handler documentation and Hello World

To run test below, you need to activate the Debug Extension to write in Warp 10 logs.

For each port you want to listen on, create a new WarpScript file in your /opt/warp10/UDP_plugin. It will contain the queue configuration and the WarpScript macro to execute when a new datagram is received.

For example, create UDPport45711.mc2, and make sure warp10 user can read it :

{
  'mode' 'server'
  'host' '0.0.0.0'
  'port' 45711
  'timeout' 5000    // Optional, ms, defaults to 0 (no timeout)
                    //  when timeout is reached, the macro will be called anyway, 
                    //  with NULL on top of the stack if nothing was received in the meantime
  'parallelism' 4   // Optional, defaults to 1
  'qsize' 2048      // Optional, defaults to 1024
  'partitioner'     // Optional, the macro must return a LONG which will be a queue number.
  <% 
    // on the stack, you will find the packet content : 
    //  from bottom to top: IP (STRING), Source port (LONG), and datagram content  (BYTES)
    // you may partition up to content received.
    SNAPSHOT 'udpDatagram' STORE
    'UDP partitionner input stack snapshot=' $udpDatagram  + STDOUT
    RAND 4 * TOLONG //return a random number.  
  %> 
  'maxClients' 10    // Optional, defaults to 1
  'maxMessages' 1024 // Optional, defaults to 1024
  'macro' <%  // the macro executed on new lines received 
    // on top of the stack, there is a list of packet.
    'datagramLIST' STORE
    <% $datagramLIST ISNULL %>
    <%
      'timeout reached, nothing received' STDOUT
    %>
    <%
      $datagramLIST
      <%
        'datagram' STORE
        //each datagram is a list of three item: IP (STRING), Source port (LONG), and datagram content (BYTES)
        'UDP: line from IP:' $datagram 0 GET + ' port:' + $datagram 1 GET TOSTRING + 
        ' content=' + $datagram 2 GET 'utf-8' BYTES-> + 
        STDOUT
      %> FOREACH
    %> IFTE
  %>
}


Wait for the refresh time (udp.period), then check that the 43211 port is opened on your system:

sudo nmap -sU -p 45711 localhost

Then, tail the warp10.log to display STDOUT in real time:

tail -F /opt/warp10/logs/warp10.log

You should see 4 timeout messages every 5second (parallelism = 4, timeout = 5000).

In another terminal, send data to the opened port:

printf 'Hallo Welt!\n\nDaß is ein UDP datagram!' |  nc -u localhost 45711 -s 127.1.2.3 -p 42000

You may need to exit the process manually with nc in udp mode (Ctrl+C).

You should see one message for each datagram received:

timeout reached, nothing received
timeout reached, nothing received
UDP partitionner input stack snapshot='127.1.2.3' 42000 'H54gQ5wVKqKgS1391ZGWktwVPMBVOL_i74K3J10ZNMGWOr8WQH3' OPB64-> 
TCP: line from IP:127.0.0.1 port:46483 content=Hallo Welt!

Daß is ein UDP datagram!
timeout reached, nothing received
timeout reached, nothing received
timeout reached, nothing received

To avoid blowing up your log file, don't forget to comment out the timeout STDOUT every five second!

The parallelism level is the number of worker thread. The partitionner can add several queues to distribute the workload differently, up to line content. The schematic below explains the configuration:

UDP config explained