Credit-based Flow Control
All Articles » Credit-based Flow Control
pieterh wrote on 19 Jul 2011 14:58
Using network buffers - such as 0MQ's queues - has a disadvantage when you are sending data to many readers. Your writer will block when the buffer is full. To avoid this, you can use non-blocking writes, and poll for sockets that are 'ready for writing'. But here's an alternative that dispenses with high-water marks and blocking writes. It's called "credit-based flow control".
The example below runs on 0MQ using the CZMQ API. The principles are portable to any messaging system. I'll let the code speak for itself.
//
// Credit based flow control example
//
// We start some clients that talk to a single server via a DEALER
// to ROUTER setup. Clients say hello to the server and then start
// to receive random data. The server sends as fast as it can, but
// only within credit window created by client.
//
#include "czmq.h"
#define NBR_CLIENTS 1
// The TRANSIT_TOTAL size defines the total data in transit,
// covering 0MQ send and recv queues, TCP send and recv buffers,
// and packets in flight on the network. The client starts by
// sending TRANSIT_TOTAL credit to the server, and thereafter
// sends TRANSIT_SLICE credit after receiving TRANSIT_SLICE bytes.
#define TRANSIT_TOTAL 1024 * 1024
#define TRANSIT_SLICE TRANSIT_TOTAL / 4
// We assert that the flow-control mechanism works by setting a
// HWM on the server send queue, and sequencing messages. If the
// queue hits HWM for any client, 0MQ will drop messages, as this
// is the exception strategy for ROUTER sockets. The client can
// detect this and abort.
//
// Difficulty: 0MQ counts messages, not bytes. So HWM is not an
// accurate measure. To solve this we batch small messages, and
// fragment larger messages into blocks of FRAGMENT_SIZE octets.
//
// For the example we simply generate FRAGMENT_SIZE messages.
// In a more cynical test we would batch and fragment on sending.
// But, once flow control works, we don't need the HWM at all.
#define FRAGMENT_SIZE 65536
// Knowing the TRANSIT_TOTAL and the FRAGMENT_SIZE, we can set the
// HWM to be (TRANSIT_TOTAL / FRAGMENT_SIZE).
#define SERVER_HWM TRANSIT_TOTAL / FRAGMENT_SIZE
// -------------------------------------------------------------------
// Put a long integer to a network buffer
#define PUT_LONG(buffer, value) { \
(buffer) [0] = (byte) (((value) >> 24) & 255); \
(buffer) [1] = (byte) (((value) >> 16) & 255); \
(buffer) [2] = (byte) (((value) >> 8) & 255); \
(buffer) [3] = (byte) (((value)) & 255); \
}
// Put a long long integer to the network buffer
#define PUT_LLONG(buffer, value) { \
(buffer) [0] = (byte) (((value) >> 56) & 255); \
(buffer) [1] = (byte) (((value) >> 48) & 255); \
(buffer) [2] = (byte) (((value) >> 40) & 255); \
(buffer) [3] = (byte) (((value) >> 32) & 255); \
(buffer) [4] = (byte) (((value) >> 24) & 255); \
(buffer) [5] = (byte) (((value) >> 16) & 255); \
(buffer) [6] = (byte) (((value) >> 8) & 255); \
(buffer) [7] = (byte) (((value)) & 255); \
}
// Get a long integer from a network buffer
#define GET_LONG(buffer) \
((buffer) [0] << 24) \
+ ((buffer) [1] << 16) \
+ ((buffer) [2] << 8) \
+ (buffer) [3]
// Get a long long integer from the network buffer
#define GET_LLONG(buffer) \
((int64_t) ((buffer) [0]) << 56) \
+ ((int64_t) ((buffer) [1]) << 48) \
+ ((int64_t) ((buffer) [2]) << 40) \
+ ((int64_t) ((buffer) [3]) << 32) \
+ ((int64_t) ((buffer) [4]) << 24) \
+ ((int64_t) ((buffer) [5]) << 16) \
+ ((int64_t) ((buffer) [6]) << 8) \
+ (int64_t) ((buffer) [7])
// -------------------------------------------------------------------
// Client task
static void *
client_task (void *args)
{
zctx_t *ctx = zctx_new ();
void *dealer = zsocket_new (ctx, ZMQ_DEALER);
zsocket_connect (dealer, "tcp://127.0.0.1:10001");
// Start by sending TRANSIT_TOTAL credit to server
zframe_t *frame = zframe_new (NULL, 4);
PUT_LONG (zframe_data (frame), TRANSIT_TOTAL);
zframe_send (&frame, dealer, 0);
// Now consume and verify incoming messages and refresh
// credit asynchronously as needed
int64_t expected_seq = 0;
int received = 0;
while (TRUE) {
zmsg_t *msg = zmsg_recv (dealer);
if (!msg)
break;
// Message has two frames, sequence number and body
zframe_t *sequence = zmsg_pop (msg);
zframe_t *content = zmsg_pop (msg);
assert (content);
int64_t current_seq = GET_LLONG (zframe_data (sequence));
if (current_seq != expected_seq) {
printf ("E: server dropped %d messages, exit (%d/%d)\n",
(int) (current_seq - expected_seq),
(int) current_seq, (int) expected_seq);
exit (1);
}
expected_seq++;
// Count received data, send top-up credit if needed
received += zframe_size (content);
if (received > TRANSIT_SLICE) {
received -= TRANSIT_SLICE;
zframe_t *frame = zframe_new (NULL, 4);
PUT_LONG (zframe_data (frame), TRANSIT_SLICE);
zframe_send (&frame, dealer, 0);
}
zframe_destroy (&sequence);
zframe_destroy (&content);
zmsg_destroy (&msg);
// Sleep for some random interval up to 100 msecs
zclock_sleep (randof (10));
}
zctx_destroy (&ctx);
return NULL;
}
// -------------------------------------------------------------------
// Server task
// Clients are represented by this data structure
typedef struct {
zframe_t *identity;
int credit;
int64_t sequence;
} client_t;
static void *
server_task (void *args)
{
zctx_t *ctx = zctx_new ();
void *router = zsocket_new (ctx, ZMQ_ROUTER);
zsockopt_set_hwm (router, SERVER_HWM);
zsocket_bind (router, "tcp://*:10001");
// We'll hold the clients on a simple list
zlist_t *clients = zlist_new ();
// We're purely driven by input events
while (1) {
zmsg_t *msg = zmsg_recv (router);
if (!msg)
break;
// PROCESS CLIENT CREDITS
// -------------------------------------------------------
// Only message we accept from clients is a credit message
// - frame data is amount of credit as 4-byte net longint
zframe_t *client_frame = zmsg_pop (msg);
zframe_t *credit_frame = zmsg_pop (msg);
assert (credit_frame);
int credit = GET_LONG (zframe_data (credit_frame));
zframe_destroy (&credit_frame);
// Look for pre-existing client with this identity
client_t *client = (client_t *) zlist_first (clients);
while (client) {
if (zframe_eq (client->identity, client_frame))
break;
client = (client_t *) zlist_next (clients);
}
// If this client is new, create an object and save it
if (client == NULL) {
client = (client_t *) zmalloc (sizeof (client_t));
client->identity = client_frame;
zlist_append (clients, client);
}
else
zframe_destroy (&client_frame);
// Accumulate credit for this client
client->credit += credit;
zmsg_destroy (&msg);
// DISPATCH TO CLIENTS
// -------------------------------------------------------
// We now stream data to all clients with available credit
// until their credit is used up. We then wait for clients
// to send us new credit.
// Process entire client list in turn
client = (client_t *) zlist_first (clients);
while (client) {
while (client->credit >= FRAGMENT_SIZE) {
int msgsize = FRAGMENT_SIZE + randof (1000) - randof (1000);
zframe_t *sequence = zframe_new (NULL, 8);
zframe_t *content = zframe_new (NULL, msgsize);
PUT_LLONG (zframe_data (sequence), client->sequence);
client->sequence++;
// Send fragment of data to the client
zframe_send (&client->identity, router,
ZFRAME_MORE + ZFRAME_REUSE);
zframe_send (&sequence, router, ZFRAME_MORE);
zframe_send (&content, router, 0);
// Discount credit
client->credit -= msgsize;
}
client = (client_t *) zlist_next (clients);
}
}
zctx_destroy (&ctx);
return NULL;
}
int main (void)
{
// Create threads
zctx_t *ctx = zctx_new ();
printf ("I: starting server...\n");
zthread_new (server_task, NULL);
printf ("I: starting %d clients...\n", NBR_CLIENTS);
int client_nbr;
for (client_nbr = 0; client_nbr < NBR_CLIENTS; client_nbr++)
zthread_new (client_task, NULL);
while (!zctx_interrupted)
sleep (1);
zctx_destroy (&ctx);
return 0;
}
Comments
I need such a vehicle for a viewing pipeline (interpreter outputs graphics primitives, consumed be remote UI); however the consumer might pause (fine - credit stays at zero while paused), or exit unexpectedly altogether (not so fine - credit stays at zero forever; am I right this case can't be detected in the above example?).
My current idea would be to combine client-originated heartbeating with the current credit tacked on; the producer could detect a vanished consumer by timing out on the heartbeat/credit update.
Does this make sense, or am missing something?
It makes sense, though the only way to know is make it and try and then improve it until it works as you want it.
Portfolio
I think this idea would work quite well. However, I think it is conceptually simpler to work in terms of messages (whole messages, not just message parts/frames).
The reason is that, IME, most applications that would use a service like 0mq are unconcerned with bytes. The "currency" of my application isn't bytes but messages that contain semantic meaning.
Your example is counting bytes but it could just as easily count messages. I would prefer that.
Do you know what is the appropriate terminology in reference to TCP flow-control 'sliding windows'? The above is basically a simplified simplex form of that, right?
Yes, this is equivalent to sliding windows, but conceptually simpler.
Portfolio
I noted the simplicity. I like it! Your thoughts on using this for UDP?