San dataflow engine
Data flow concept The proposed San language is a dynamic data flow language. A simple way to understand the difference between procedural programs and data-flow programs is to look at the following flow chart fragment. _____ ______ | | | | ...--->| A |--->| B |--->... |_____| |_____| Diagram 1. If this fragment is from the flow chart of a procedural program it signifies that the program first executes block A and then block B. Procedural programs execute sequentially; the flow chart represents the flow of control. If, however, it is from the flow chart of a data flow program blocks A and B are executed concurrently. Data flow programs execute in parallel; the flow chart represents the flow of data. (Parallel in principle – actual implementations are necessarily messier.) In the San model blocks have 0 or more input ports and 0 or more output ports. Here is an example: __________________________________________ | | | _______ ________ _______ | ________ _______ | | 1|--->|1 1|--->|1 1|---->|--->|1 | | | -->|1 A | | B | | C | | D | | S 1|------>|2 | |->|2 | | 2|->| |________| |_______| |_______| | |_______| |_______| | |__________________________| Diagram 2. Block S is a source, i.e., it autonomously produces output. Blocks A and B have two input ports and one output port whereas block C has one input port and two output ports. The output from port 1 in block C tees, i.e., it is sent both to block A and block D. Finally, block D is a sink, i.e., it has inputs but no outputs. One way to think about data flow programming is that it inverts traditional imperative programming. In typical imperative programs an application is a single main thread of control with the program structure embedded in the program code. The application makes calls into entry points in the OS and into subsidiary packages. From the perspective of the application, the application is unitary and the packages are fragmented. In dataflow programming the dataflow engine is the main thread of control. The program structure is distinct from the application code; instead it is given by the data flow description. The user application is fragmented into many small fragments that act as mini-threads. These fragments accept inputs and produces outputs. The engine controls where the inputs come from and where the outputs go. A data flow language is dynamic if blocks and their connections can be created and deleted during the course of execution. In other words, the data flow chart for the program is not fixed in advance; it change over time.
San engine architecture The San engine software is a preliminary implementation of the data flow engine to be used by a San language interpreter. Although it was created to be part of a San interpreter, it can be used as a stand alone C package. The San engine software is consistent with use in an implementation distributed across multiple processes and computers; however the current code is written for a single process environment. One way to think about the engine architecture is that it inverts traditional imperative programming. In ordinary imperative programs the application is a single main thread of control that is supported by various packages. The application makes calls into entry points in the package. From the perspective of the application, the application is unitary and the packages are fragmented. In the San dataflow model the dataflow engine is the main thread of control. The user application is fragmented into many small functions that do not talk directly to each other. Instead each fragment accepts inputs and produces outputs. The engine controls where the inputs come from and where the outputs go. The engine structure upper level is one or more common resource frames. A common resource frame (CRF) contains a tree of agents and resources used by the engine to maintain the agents. common resource frame | |--------> resources (e.g., free lists, storage pools) | |--------> tree of agents An agent contains administrative data used by the engine, user supplied initialization, autonomous source, and input response mini-threads, and global data shared by the user mini threads. agent | |--------> administrative data |--------> shared global data |--------> initialization mini thread |--------> autonomous source mini thread |--------> input response mini thread
Within an agent there is a separate application supplied response function for each input port. (By default it is a no-op, i.e., an empty function.) An agent can also have application supplied source functions that execute autonomously. The agents receive inputs and emit outputs; however they do not “know” where the inputs come from and where the outputs go. This information is contained in separate connection tables. Here is the connection table for the data flow chart (schematic) shown in diagram 2. From To S 1 A 2 A 1 B 1 B 1 C 1 C 1 A 1 C 1 D 1 C 2 B 2
Application superstructure Most of the interface entry points into the engine are functions that manipulate the data flow superstructure, e.g., functions that create and delete agents, functions that create and delete connections, and functions to load application code. However they are only called when the superstructure is actually being altered.
Application supplied mini-threads Input response functions are called when there is new data in an agent’s input ports. Usually these functions perform some operations on their input, emit some output, and exit. They have the syntactic structure of event handlers. Agents can have sources; these are autonomous data generators and are executed once each scheduler cycle. Each source has an associated source function that generates the source output data. A source function can be associated with more than one source. Most application code only uses two engine API functions – san_emit and san_get_agent_data. The former is the output function – it sends data to an output port. The engine is responsible for seeing that the data gets to its proper destination. San_get_agent_data is a utility for getting metadata for the agent that application code is associated with.
Agent state data An agent’s persistent data structure ties the agent’s associated functions together. They can pass information back and forth via the persistent data structure. The declaration of the persistent data structure must be visible to the code for the associated functions. Space for the structure is allocated with the san_make_globals and san_allocate functions. San_make_globals allocates space for the structure itself; san_allocate is used for any further allocations needed to flesh out the structure. The creation of an agent’s persistent data structure must be done in the birth function
San engine interface The Interface overview section has, not surprisingly, a general overview of the engine interface. The Interface functions provides a description for each interface function.
Interface overview There are two categories of entry points into the engine. The first are the set up functions; these are the functions that connect the engine to the main program. The second are the user application interface functions; these are the functions that connect the user application code to the engine. The set up functions are listed in table I. Typically these functions are called in “main” to initialize the engine and to load the initial user code. Table I - Setup functions san_ctl_init_sysinfo Initializes the system information struct san_ctl_create_crframe Creates a common resource frame san_ctl_scheduler Handles scheduling of execution 'threads' The prototypes for the setup functions are in a separate include file, san_ctl.h, and are meant to only be used by main. San_ctl_init_sysinfo calls the optproc options processing package; it can be replaced provided that the replacement produces the SYSINFO struct. Here is a sample main program: #include The first three functions called are initialization routines. Trace_init initializes a calling sequence trace utility. San_ctl_init_sysinfo processes the command line options. The options currently available are: OPTION TYPE DESCRIPTION context context Default context err error file Error file log output file Log file d flag Debug flag ev flag Event log flag def flag Use defaults flag rpt flag Write a terminal report term int Exiting epoch numberThe CRF creation function, san_ctl_create_crframe, creates a common resource frame and initializes it. This includes creating a root agent for the tree of agents in the CRF. The second argument is the function that initializes the master (root) agent. This user defined function creates the initial agents and their connections. It also loads user application code into the agents. The scheduler (san_ctl_scheduler) is the heart of the engine. The general idea is that user code elements emit data that is sent to other user code elements as specified by the connection tables. The scheduler takes care of moving data from emitters to receivers; it also schedules the activation of agent source functions and agent input response functions as needed. The majority of the application interface functions alter the schematic (the data flow structure) either by creating or deleting agents, by altering connections, or by altering user application code. There are three output functions, san_emit, san_event_logger, and san_rpt_agent. San_emit emits output from an agent output port. One of the program options is the ability to create an event log; the event logger writes messages to the event log. Finally, the agent reporter writes an agent report. Table II - engine/application interface routines san_allocate Allocates agent space during initialization san_clone_agent Creates a clone of an existing agent san_connect Creates a pipe connection san_create_child Creates a child agent san_create_sibling Creates a sibling agent san_delete_agent Deletes an agent san_delete_source Deletes a source element san_disconnect Eliminates a pipe connection san_emit Emits a data packet san_event_logger Logs an event in the event log san_get_agent_data Gets user visible agent data san_get_child_no Gets child ref for a given index san_get_source_no Gets source ref for a given index san_load_inport Loads an execution function into an inport san_load_source Adds a source element to an agent san_make_globals Creates global space for a new agent san_rpt_agent Creates an agent report Internally the data flow engine uses a package of utility services. These need not be used by application code; however they are available. The utilities are: errexit.c Entry points for an instrumented error exit errmgr.c Entry points for error recording getfline.c Reads variable length lines from a file getspace.c Storage allocation package listpkg.c List management package optproc.c Options processing package stgpool.c Specialized storage management package urtree.c Key/value data storage and retrieval
Interface details There are currently seventeen application interface functions. Most of these functions are used to alter the connectivity of the data flow schematic or to initialize the loading of application functions. In many applications they are only called during initialization. The function interfaces are described below, along with a brief description of their usage. Links to the application interface functions descriptions
Function: san_allocate sigil_s Identifying sigil for the calling code size_t Size in bytes of the requested space Function san_allocate allocates space. The space is taken from the agent’s storage pool. It should not be deallocated; attempting to deallocate it is an error that is probably fatal. This function can only be called during the execution of an agent’s birth function. It typically is used to get space for pointers in an agent’s global space.
Function: san_clone_agent sigil_s Identifying sigil for the calling code agent_rs Reference for the agent being cloned.Function san_clone_agent creates a clone of the agent referenced in the argument list. The agent creating the clone is called the self agent. The agent being cloned cannot be the master agent; otherwise it can be the self agent, a child of the self agent, or a sibling of the self agent. If the agent being cloned is the self agent or a sibling of the self agent the clone is added to the self agent’s parent’s list of children. If it is a child of the self agent it is added to the self agent’s list of children. San_clone returns a reference to the newly created clone. If the cloning operation was unsuccessful a nil reference {0,0} is returned. The clone is newly born, i.e., it has the state that the donor agent had when it was born. Changes made to the donor agent’s state during its life time will not be present in the clone.
Function: san_connect sigil_s Identifying sigil for the calling code agent_rs Reference for the emitting agent agent_rs Reference for the receiving agent port_v Emitting port number port_v Receiving port numberThis function creates a data flow connection between an output port of one agent and an input port of another agent. The agent creating the connection is called the self agent. The source and destination agents must satisfy one of the following cases:
Function: san_create_child sigil_s Identifying sigil for the calling code birth_f The user function called when the child is created death_f The function to be called when the child is deletedFunction san_create_child creates a child agent and returns a reference to the created agent. The birth function will be executed when the child agent is created.
Function: san_create_sibling sigil_s Identifying sigil for the calling code birth_f The user function called when the sibling is created death_f The function to be called when the sibling is deletedFunction san_create_sibling creates a sibling agent and returns a reference to the created agent. The birth function will be executed when the sibling agent is created. Different functions are used for creating children and siblings because the parents are different.
Function: san_delete_agent sigil_s Identifying sigil for the calling code agent_rs Reference for the agent being deleted.Function san_delete_agent is used to delete agents. An agent can only delete itself and its children; however it cannot delete itself in its death function. Also, the master agent cannot delete itself. The children of a deleted agent become the children of the deleted agent’s parent.
Function: san_delete_source sigil_s Identifying sigil for the calling code source_rs Reference for the source function being deleted.Function san_delete_source is used to delete source elements (generators) from agents. An agent can delete sources from itself or from its children.
Function: san_disconnect sigil_s Identifying sigil for the calling code agent_rs Reference for the emitting agent agent_rs Reference for the receiving agent port_v Emitting port number port_v Receiving port numberThis function removes a data flow connection between an output port of one agent and an input port of another agent. The agent creating the connection is called the self agent. The source and destination agents must satisfy one of the following cases:
Function: san_emit sigil_s Identifying sigil for the calling code port_v Output port number size_t Size of data packet being sent void * Pointer to the data packetFunction san_emit attempts to emit data at the specified port. It will fail if nothing is connected to the port. If either the data size or the data pointer is zero an empty packet will be sent; otherwise it is assumed that the size is correct.
Function: san_event_logger
Function: san_get_agent_data
The idea is that user code can walk through the lists of children and sources
using san_get_child_no and san_get_source_no to access item by index.
Function: san_get_child_no
Function: san_get_source_no
Function: san_load_inport
Function: san_load_source
Function: san_make_globals
Function: san_rpt_agent
Sigils and References All user code is embedded within agents, either as initialization code, as termination code, as input port response code, or as originary data source code. When user code is activated it is passed a sigil. This is a struct that acts as a password token. User code interacts with the engine via interface functions. The first argument to an interface routine always is the sigil. A fundamental requirement is that the presented sigil be valid, i.e., the data in the sigil is legitimate. In this implementation the sigil is not encrypted but it could be in a future implementation. The sigil isolates user code from the engine code. Instead of containing pointers to engine data sigils contain opaque references. There are three user visible handles. They are: CRFID This identifies the current common resource frame. It is a size_t integer; it actually is an index into a table of pointers to common resource frames; however the table is not visible to the user. agent_rs This is a robust handle for agents. It is a struct containing two size_t integers. source_rs This is a handle for a source (an autonomous data generator.) This is a struct containing an agent reference and a source serial number. It points to a struct defining a source, where a source is code that autonomously generates output.
Scheduling This implementation uses a simple scheduling model. Each cycle of the scheduler loop represents a unit of time called an epoch. In each epoch the emissions from the previous epoch are processed and each source is executed once.There are two termination conditions for the main loop. They are:
The main loop has three major sub loops. They are:
The scheduler maintains two lists, a list of items to be processed in the current cycle and a list of items to be processed in the next cycle. The lists contain instances of a struct called a delivery that looks like this: struct delivery_s { agent_rs agent_r; /* Agent receiving delivery */ port_v port; /* Port receiving delivery */ size_t seqno; /* Expected inport serial number */ inport_s * inport; /* Pointer to agent inport struct */ };The agent_r field is a reference for the receiving agent; port is the input port number. Seqno is a verification sequence number. Finally, inport is a point to a structure in the agent that holds the input port. The queue of items being delivered is in a queue maintained in inport. The items in the queue are structures called packages that look like this; struct package_s { size_t len; /* Size of package */ char * data; /* Contents of package */ emitstg_s * xpool; /* Storage pool supplying space */ };Len and data specify the serialized data being transmitted; xpool specifies the storage pool being used.
Loop 1 The emissions created are added to list crf->emissions. Here is the definition for an emission: struct emission_s { package_s pkg; /* Data package emitted */ delivery_s d; /* Delivery package data */ };Pkg contains the data to be transmitted. D contains the delivery struct as it was at the time of emission; nothing is loaded into the inport queue at this time.
Loop 2
Loop 3 Constraints on interface functions All of the interface functions require a valid sigil as an argument. Sigils have embedded within them a reference for the agent containing the user code currently being executed. The containing agent is called the self agent. The sigil is the only argument for the following functions: san_create_child: Create a child agent san_create_sibling: Create a sibling agent san_get_agent_data: Gets user visible agent dataThere are two functions that have a data array index argument; they return an agent datum by index from an array. Each returns a reference; if the index is out of range they return a null reference. The functions are: san_get_child_no: Gets child ref for a given index san_get_source_no: Gets source ref for a given indexThere are two deletion functions. Each has a argument specifying the entity (source or agent) to be deleted. The specified agent (or containing agent if a source is being deleted) must either be the agent itself or a child agent. The functions are: san_delete_agent: Deletes an agent san_delete_source: Deletes a source elementThere are two load functions that associate a user supplied function with an agent, san_load_inport and san_load_source. The first two arguments for each is the sigil and the agent being supplied with a user function. The last argument is the user function. San_load_inport has an additional argument, the input port number. The specified agent must either be the agent itself or a child. The supplied function pointer cannot be a null function pointer. The functions are: san_load_inport: Loads an execution function into an inport san_load_source Loads a generator (source) function into an agentThere are two connection management functions, one to create a connection and one to delete a connection. Each has source and destination agents as arguments along with the output and input port numbers. The source and destination agents must satisfy one of the following cases:
san_connect: Creates a data flow connection san_disconnect: Deletes a data flow connectionThe san_emit command emits data out an output port. The port must be a valid port; i.e., it must be part of a data flow connection. The san_rpt_agent writes an agent report to the report file; it must be passed a valid agent. These functions are: san_emit: Emits an emission san_rpt_agent: Creates an agent report
User supplied code The engine recognizes four kinds of user supplied code, initialization code, termination code, autonomous data generation code, and input port response code. User code is connected to the engine via user written functions that plug into the engine. The user supplied function pointers are arguments to engine API functions. The typedefs for these functions are given in san.h; they are: typedef void (*birth_f) (sigil_s); typedef void (*death_f) (sigil_s); typedef void (*source_exec_f)(sigil_s, void *); typedef void (*inport_exec_f)(sigil_s, void *, size_t, void *);The first argument for a user supplied function is the sigil that must be used when calling engine API function.
Birth and death agent_rs san_clone_agent (sigil_s sigil,agent_rs agent); agent_rs san_create_child (sigil_s sigil,birth_f birth,death_f death); agent_rs san_create_sibling (sigil_s sigil,birth_f birth,death_f death);
Birth The birth function has two major uses. The first is to create the structure holding the agent globals and to populate it. The API has two functions for this purpose, san_make_globals and san_allocate. San_make_globals allocates space for the structure holding the globals; san_allocate allocates space for sub structures. This space does not have to be freed; the engine takes care of freeing it. (It actually comes from the agent’s storage pool.) The second is to create program structure, i.e., to create child agents and specify their connections. In particular, the birth function for the master agent of a common resource frame creates a program superstructure.
Death
Execution functions Each source is executed once during an epoch. (Different sources may share a source function.) The inport_exec function is executed whenever there is content in an agent’s input queues. Each port has its own queue and a pointer to an inport_exec function. The pointer may be nil; if it is, any input to that port will be lost. Different inports can share an inport_exec function. Each agent has a list of input port structures called inports. An inport looks like this: struct inport_s { port_v port; /* Input port number */ listhdr_s queue; /* List of queued input data */ void * globals; /* Ptr to agent globals */ size_t n_feeds; /* No. of connections to por */ size_t seqno; /* Verification sequence number */ inport_exec_f exec; /* Function to execute */ };Conceptually connection tables belong to the parent agent; as a matter of convenience they are stored with the emitting agents. Entries in the connection tables look like this: struct connex_s { port_v outport; /* Port no in the emitter */ port_v port; /* Port no in the receiver */ agent_rs agent_ref; /* Ref for receiving agent */ inport_s * dest; /* Ptr to receiver inport struct */ };If there is no inport for a receiving agent and port, one is created if either san_load_inport or san_connect references a non-existent inport. When an emitter generates an emission it scans the emitter’s connection table to find all entries for that output port. It checks the agent reference to make sure that the agent still exists. If it does, the emission is appended to a central emissions list that is used by the scheduler. The n_feeds fields in the INPORT struct holds the number of connections to the agent input port. It is initialized to 0 when the inport is created. It is incremented each time a connection is made to the port and decremented each time a connection to the port is disconnected. When the count reaches zero during a disconnect the inport struct is moved from the agent inport list to resource frame inport free list. The disconnect routine also removes the connection struct (CONNEX) from the CONNEX table in the emitting agent. When an agent is deleted its list of inports are added to the resource frame inport free list. The corresponding entries in the CONNEX table are not deleted at that time. They are deleted later when an attempt is made to send data to the deleted agent. Input and output ports 0 are reserved for communication with the parent agent; input and output ports 1 are reserved for communication with child agents. Ports 2 and greater are not reserved; they are available for ordinary connections. When an agent is created two connections are made: parent.1 -> agent.0 agent.0 -> parent.1These connections cannot be deleted. Emissions from a parent to output port 1 are broadcast, i.e., they to all of the parent’s children. In addition a parent can create an ordinary connection to a child’s port 0; this connection is not broadcast and can be deleted.
Inter agent data transfer – storage The San engine uses a copy data protocol for inter agent data transfer rather than a data reference protocol. In other words when an agent emits data the emit code makes a copy of the data for each port to which the data will be delivered. Transfer by reference is faster than transfer by copy, but it is dangerous unless it can be guaranteed that (a) the emitter will not alter the data after sending it, and (b) the receiver(s) do not alter the data after receiving it. The code uses a double storage pool technique so as to avoid excessive storage allocation and deallocation. The pools hold queued data that has not yet been processed. There are two pools, old and current. No space is taken from the old pool; all space for emitted items is taken from the current pool. When space taken from a pool is released the count of items in the pool is decremented. When a pool count reaches 0 the pool is cleared. At the end of a scheduling cycle the pool count for the old pool is checked. If it is 0 the old is cleared, the current pool becomes the old pool, and a new current pool is created. Function get_xpool_space is the gateway for getting a block of space from the current xpool. Each call into get_xpool_space increments the current xpool usage count by 1. Note that the block length can be zero. There are four places that get_xpool_space can be called. They are: In emit it is called twice, once to get space for an emission (em) and once to get space for the data in the emission package (em->pkg->data). There are two additional potential calls in function update_delivery. The first gets space for a delivery. The timeliness of the delivery is checked. If it is timely, an additional call is made to get space for the package being delivered. If is not timely the xpool is decreased by 3 (2 for the emission, 1 for the delivery) and the delivery is discarded. (This function can be tightened up.) When a package is finally delivered to exec_inport it represents 4 blocks from the xpool. Exec_inport decrements the xpool count by 4.
Tracking source structs A source (in the sense of sources and sinks) is an autonomous generator of data streams. An agent can host several sources. A source is represented by a source_s struct defined as follows: struct source_s { sigil_s sigil; /* Sigil for the host agent */ source_exec_f exec; /* Function executed by the source */ size_t index; /* Index in the long list array */ size_t srcno; /* Id # relative to the host agent */ source_rs s_ref; /* Object tag used in user code */ source_s * link; /* Link used in linked lists */ };Each agent that hosts sources has a linked list of source_s structs corresponding to the hosted sources. These lists are the only locations where source_s structs are stored. In addition to the source structs themselves there is an array of pointers to the entire suite of source structs. This array is used internally by the scheduler to schedule source activation. The source structs are not visible to user code. User code accesses sources via source_rs references. The array of struct pointers is not user accessible. When a new source is added to an agent the storage for the struct is taken from a free list. Each agent maintains a counter that supplies a unique id for each source struct. The new source struct is linked into the existing agent source list and a pointer to the struct is added to the end of the pointer array. When a source is deleted from an agent things are slightly more complicated. The agent’s list of sources is scanned and the source is snipped from the list. (The assumption is that agent source lists will be small.) The deleted source is added to the free list of sources. The list of pointers is divided into two segments, processed and unprocessed when the list is being scanned. If the deleted source is unprocessed the last pointer in the array replaces the deleted pointer and the index in the struct is changed to point to the new location. If the deleted source is processed two replacements are done; the last processed replaces the deleted pointer, and the last of the array replaces the last processed. The index fields in the structs pointed to by the moved pointers is updated to reflect the new location.
Suffix tags _s Jointly used by structures and their typedefs _f Function typedef _v Variable typedef _rs Reference structure typedef[ top | prev | next | last ]
Things on the to do list This section is a list of things to do and things to thing about.
This page was last updated June 11, 2009. |