Handling large amounts of data in Flex

I wanted to look into the issue of loading and processing large amount of data for some time, but never had the time. It wasn’t really a requirement so far in my work, so I kept postponing it. Once when I got quite close, loaded some start-up data for a project and had an unresponsive GUI when the data was passed to the views, but as it turned out it was a just a testing scenario put to extreme and the amount of data send in production was handled by a DB filter which would not allow large amounts to pass. On the other hand I see lots of people running into this issue on both Flexcoders Yahoo group and Adobe forums. If I decide to answer I usually start by asking “Do you really need all the data once?”. Normally the answer should be no, so data paging will be your friend here (some good resources out there: James Ward‘s post just to mention one). However there might be several reasons when you are required to have that data with one call. Flex will load the data and let you display a busy cursor. Parsing the data is a different beast though. If you are lucky and don’t get the “#1502 A script has executed for longer than the default timeout period of 15 seconds.” error, you will get your GUI frozen during the parsing operation, which in turn would prevent the user from being able to do anything until processing completes. But you probably know this story already…

What I am doing here is basically just reinventing the wheel and taking advantage of what other people came up with so far (see references at the end of the post). I think that an extra example would not hurt anyone. Outside the Flash Player world (Java, .Net) several threads (callable-s, I am not really a Java dev) will be given the task to process chunks of data in parallel (taking advantage of the multicores) and report back with the results, while other thread will handle GUI updates. Due to single threaded nature of Actionscript execution model this is not possible in Flex. What is possible though, is to break the processing in several chunks and run them over time (frames), giving at the same time the GUI the chance to be responsive. The terms of pseudo-threading or green threading will frequently pop-up in this context.

The following example will handle the loading and parsing of CSV files, and at the same time display an animated progress overlay on top of the GUI. I came up with the idea of parsing a CSV file when I wanted to be able to put my telephone bill details on a chart, to have a better idea of what I am being charged for. In doing that it seemed like a good opportunity to add a couple of extra lines (700000) into the file to see how Flex behaves.

Running application is below, the .fxp file is also available for DOWNLOAD. The code was built for demo purposes so is “not quite” production-ready – usual caveats apply. I am using a generated CSV file with 700000 lines . A stripped version 50K lines is also AVAILABLE (the app is asking you to load the CSV from local hard drive).

Load the csv file.

ICSVFileService is a wrapper over the lower level FileReference class. The app will held a weak reference to the service, so it can easily switch the local loading with a remote one if necessary. The service grabs the file starts loading it into memory, dispatches progress events (optional), and signals the completion of the load operation. The data is available in the csvData variable.

ICSVFileService

package ro.a223.csv.infrastructure.service
{
	import flash.events.IEventDispatcher;

	public interface ICSVFileService extends IEventDispatcher
	{
		function getCSVFile():void;
		function get csvData():String;
	}
}

//usage
private var _csvService:ICSVFileService = new LocalCSVFileService();
_csvService.addEventListener(Event.INIT, onLoadStarted);
_csvService.addEventListener(ProgressEvent.PROGRESS, onProgress);
_csvService.addEventListener(Event.COMPLETE, onCSVData);
_csvService.getCSVFile();

//LocalCSVFileService is inside the .fxp file

Parse the csv file.
Now that you have the data it needs to be parsed. A generic loop for parsing the the CSVfile looks similar to this:

//HUGE_ NUMBER is the number of CSV lines
for (var i:uint = 0; i < HUGE_NUMBER; i++)
{
	parse(line[i]);
}

The idea is to break out of that iteration after some time, e.g. when some exit condition is met, save the index that the current iteration has reached and reiterate from that index next time.

//HUGE_ NUMBER is the number of CSV lines
var _breakIndex:uint;
for (i = _breakIndex; i < HUGE_NUMBER; i++ )
{
	if (exitCondition)
	{
		_breakIndex = i;
		return;
	}

	parse(line[i])

}

//set the work as complete

There are two kind of exit condition: time based - let the loop run for a specified amount of time and break out after that, or based on the number of iteration, the choice being ultimately up to the developer and the use-case. There are also more than one way to break out of the loop, mainly 3 techniques all of them explained here.

My choise was to use the ENTER_FRAME technique and to have a percent of the total frame rate allowed for the iteration. Initially all the code was handled inside one class but soon I realized that I need to generate also a large CSV file and I need to use a different algorithm. I split the code in 2 classes, one having the managing responsibility of time and progress tracking (AsyncThreadCaller), while the other one focusing on performing the work (RunnableCSVParser). The working class implements the IRunnable interface, so different work can be perform by different implementations managed by the same class. Since the approach uses a time based exit condition, the worker get’s the exit condition set by the manager class (AsyncThreadCaller). The amount of time that every worker is allowed to run is specified as a property - runningTimeShare - on the IRunnable. Therefore different workers can have different running times. AsyncThreadCaller gets a reference to the main stage and register a listener for the Event.ENTER_FRAME event on that reference. Inside the handler function (run()), it computes the exit time and then delegates processing to the worker. This goes on until the time slice for the current frame is exceeded or the runner completes the work.

IRunnable interface defines API for a generic runner. The names are self explanatory.

package ro.a223.csv.infrastructure.pseudothreading
{
	/**
	 * This is called by the manager/caller class repeatedly to prefor work until
	 * the time share expires or the amount of work is completed
	 **/
	public interface IRunnable extends IData
	{
		/**
		 * The function performing the actual work
		 * The implementor class will supply the processing logic
		 **/
		function run():void;

		/**
		 * Called by the manager class to inspect the competion of the processing operation
		 * @return  boolean true/false indicates whether the processing operation comleted
		 **/
		function get isComplete():Boolean;

		/**
		 * Called by the manager class to inspect the progress of the completion
 		 * @return uint total amount of work done so far
		 **/
		function get progress():uint;

		/**
		 * Called by the manager class to get a hold of the total amount of work that needs to
		 * be carried out
		 * @return uint the total amount of work to be carried out
		 **/
		function get total():uint;

		/**
		 * time share alocated for this thread %
		 **/
		function get runningTimeShare():Number;

		/**
		 * This reference is set by the implementing class to allow to inquire
		 * the manager if it is still allowed to run or it needs to exit
		 *
		 * @param refernece to the caller's exit condition computing function
		 **/
		function set needToExit(val:Function):void;

	}
}

RunnableCSVParser performs paring work within the run() method, asking the caller on every iteration if it is still allowed to run (line 103). Upon exiting, it updates its progress property so that the caller can make use of that to dispatch progress events. When it finishes the work it sets it’s isComplete property to true so then the caller stops calling.

package ro.a223.csv.infrastructure.pseudothreading.impl
{
	import ro.a223.csv.infrastructure.pseudothreading.IRunnable;
	import ro.a223.csv.infrastructure.pseudothreading.IAsyncThreadCaller;

	public class RunnableCSVParser implements IRunnable
	{
		/**
		 * @private
		 * Percent of the frame rate allowed for this thread to run
		 **/
		private var _runningTimeShare:Number;

		/**
		 * @private
		 * CSV line separator defaults to RETURN
		 **/
		private var _lineSeparator:String;

		/**
		 * @private
		 * CSV column separator defaults to comma
		 **/
		private var _columnSeparator:String;

		/**
		 * @private
		 * refrence to the caller need to exit function to be able to
		 * query if there is still time left for running
		 **/
		private var _needToExit:Function;

		/**
		 * @private
		 * helds information about the completion of the parsing operation
		 **/
		private var _isComplete:Boolean;

		/**
		 * @private
		 * helds information about the progress of the parsing operation
		 **/
		private var _progress:uint;

		/**
		 * @private
		 * helds information about the total amount of work that needs to
		 * be performed, in this case it is the number of the csv lines inside the file
		 **/
		private var _total:uint;

		/**
		 * @private
		 * start index for each loop
		 **/
		private var _breakIndex:uint;
		/**
		 * @private
		 * helds the column names e.g. the info from first CSV line
		 **/
		private var _properties:Array;

		/**
		 * @private
		 * left columns without the headers
		 **/
		private var _rawColumns:Array;

		/**
		 * @private
		 * result data
		 **/
		private var _data:Array = [];

		/**
		 * Constructor - invokes the preprocess function which will make some adjustments on the csv file
		 * to preapare it for the asynchronous parsing
		 * @param String - the CSV file
		 * @param Number the allow time share
		 * @param the line separator
		 * @param separator for the data inside a line
		 **/
		public function RunnableCSVParser(content:String, runningTimeShare:Number = .5, lineSeparator:String = null, columnSeparator:String = null)
		{
			_runningTimeShare = runningTimeShare;
			this.lineSeparator = lineSeparator;
			this.columnSeparator = columnSeparator;
			preprocess(content);
		}

		/**
		 * the worker function
		 * iterates through the lines and parse them into an Object
		 * the result gets pushed into the data array
		 **/
		public function run():void
		{
			var i:uint;
			var propsLen:uint = _properties.length;

			for (i = _breakIndex; i < _total; i++)
			{
				if (_needToExit())
				{
					_breakIndex = i;
					_progress = i;
					return;
				}

				var lineItems:Array = _rawColumns[i].split(",");

				if (lineItems.length == propsLen)
				{
					var obj:Object = {};
					var j:uint = 0;
					for (j = 0; j < propsLen; j++)
					{
						obj[_properties[j]] = lineItems[j];
					}
					data.push(obj);
				}
			}

			_needToExit = null;
			_rawColumns = null;
			_properties = null;

			_isComplete = true;
		}

		/**
		 * caller needToExit setter,
		 * this allows the caller to set the exit condition function as a
		 * worker can then invoke the needTo exit on caller which is handling the time management
		 *
		 * @param function reference;
		 **/
		public function set needToExit(val:Function):void
		{
			_needToExit = val;
		}

		/**
		 * Indicate whether the parsing operation has completed;
		 **/
		public function get isComplete():Boolean
		{
			return _isComplete;
		}

		/**
		 * indicates the progress of the operation in this case the
		 * current line that is being parsed
		 **/
		public function get progress():uint
		{
			return _progress;
		}

		/**
		 * indicates the total amount of worked that needs to be performed
		 * in this case the number of csv lines
		 **/
		public function get total():uint
		{
			return _total;
		}

		/**
		 * helds the output data array
		 **/
		public function get data():*
		{
			return _data;
		}

		public function get runningTimeShare():Number
		{
			return _runningTimeShare;
		}

		/**
		 * @private
		 * CSV line separator defaults to return
		 **/
		protected function set lineSeparator(value:String):void
		{
			_lineSeparator = value ? value : String.fromCharCode(13,10);
		}

		/**
		 * @private
		 * CSV column separator defaults to comma
		 **/
		protected function set columnSeparator(value:String):void
		{
			_columnSeparator = value ? value : ",";
		}

		/**
		 * used to stript the header of the csv files and retirives the comun names
		 * @param The csv file
		 **/
		private function preprocess(content:String):void
		{
			_rawColumns = content.split(_lineSeparator);
			_properties = _rawColumns[0].split(_columnSeparator);
			_rawColumns.splice(0,1);
			_total = _rawColumns.length;
		}

	}
}

AsyncThreadCaller recievs the runner as input parameter and set's the exit condition property to it's needToExit function reference (line 52). This is how the runner is able to ask it's caller if is still allowed to run. It dispatches dispatches events for complete and progress events based on information exposed by the worker (lines 92, 96).

package ro.a223.csv.infrastructure.pseudothreading
{
	import flash.display.Stage;
	import flash.events.Event;
	import flash.events.EventDispatcher;
	import flash.events.IEventDispatcher;
	import flash.events.ProgressEvent;
	import flash.utils.getTimer;

	[Event(name="complete", type="flash.events.Event")]
	[Event(name="progress", type="flash.events.ProgressEvent")]
	public class AsyncThreadCaller extends EventDispatcher implements IAsyncThreadCaller
	{

		/**
		 * helds the computed exit time
		 **/
		private var _workerExitTime:Number;

		/**
		 * worker weak reference
		 **/
		private var _runnableWorker:IRunnable;

		/**
		 * stage reference
		 **/
		private var _stageRef:Stage;

		/**
		 * @private
		 * processed data storage
		 **/
		private var _data:*;

		/**
		 * @private
		 * the number of time the worker is called
		 **/
		private var _runs:uint;

		/**
		 * Constructor
		 * @param reference to the main stage
		 * @param reference to the runnable worker
		 **/
		public function AsyncThreadCaller(stageRef:Stage, runnable:IRunnable)
		{
			super();
			_stageRef = stageRef;
			_runnableWorker = runnable;
			_runnableWorker.needToExit = this.needToExit;
		}

		/**
		 * Used to start the runner
		 * @param none
		 * @return none
		 **/
		public function start():void
		{
			_stageRef.addEventListener(Event.ENTER_FRAME, run, false, 100);
			run(null);
		}

		/**
		 * @param none
		 * @return BooleanIndicates wheather the worker run method needs to exit
		 **/
		public function needToExit():Boolean
		{
			return (getTimer() >= _workerExitTime);
		}

		/**
		 * cycle function wuns on every enter frame and
		 * commands the runnable pot perform some work.

		 **/
		protected function run(event:Event):void
		{
			var frameRate:Number = Math.floor(1000 / _stageRef.frameRate);
			_workerExitTime = getTimer() + frameRate * _runnableWorker.runningTimeShare;

			_runnableWorker.run();
			_runs++;

			if (_runnableWorker.isComplete)
			{
				_data = _runnableWorker.data;
				dispose();
				dispatchEvent(new Event(Event.COMPLETE));
			}
			else
			{
				dispatchEvent(new ProgressEvent(ProgressEvent.PROGRESS, false, false, _runnableWorker.progress, _runnableWorker.total));
			}
		}

		/**
		 * performs cleanup as the processing operation has completed
		 * @param none
		 * @return none
		 **/
		protected function dispose():void
		{
			_stageRef.removeEventListener(Event.ENTER_FRAME, run);
			_stageRef = null;
			_runnableWorker = null;
		}

		/**
		 * expose public access for data
		 **/
		public function get data():*
		{
			return _data;
		}

		public function get runs():uint
		{
			return _runs;
		}

	}
}

Usage

var parser:IRunnable = new RunnableCSVParser(_csvService.csvData, .6);
 _asyncCaller = new AsyncThreadCaller(systemManager.stage, parser);
 _asyncCaller.addEventListener(Event.COMPLETE, onParseComplete);
 _asyncCaller.addEventListener(ProgressEvent.PROGRESS, onProgress);
 _asyncCaller.start();

The parsed data is available in the data property of the AsyncThreadCaller.

Render the data.

Now that the data has been loaded it needs to be displayed on the screen. The DataGrid component does a really good job in handling such a large amount of data at once. But id you need data to be display visually it is bit of a problem. The code uses the Bitmap Based Charting describe by Andrew Trice in here. Basically you draw the points on the screen yourself. As he states in there, the Flex chart components in Flex will be able to handle a couple of thousand records. If you use the drawing API, you will be able to render a couple of ten thousand. For larger amounts you need to go to low level and paint the screen using setPixel() method of the BitmapData class. I went one step ahead and modified the drawing loop to happend over frames the same as parsing happens. As a result rendering time increases considerably but the application is still responsive during data rendering.

Below is the RunnableDataRenderer class. It takes the the parsed data, the display surface width and height as well as the running timeshare (runningTimeShare). The class will create an BitmapData outpu inside it's constructor and then for every point in the parsed data this clas will draw a pixel (or a cross made of pixels). When the time share expires it will exit and then resume on the next frame. Upon completion the BitmapData will be set the source of an Image.

package ro.a223.csv.infrastructure.pseudothreading.impl
{
	import flash.display.BitmapData;
	import flash.geom.Rectangle;

	import ro.a223.csv.infrastructure.pseudothreading.IRunnable;
	import ro.a223.csv.infrastructure.pseudothreading.IAsyncThreadCaller;

	public class RunnableDataRenderer implements IRunnable
	{
		private var _runningTimeShare:Number;
		private var _data:*;
		private var _isComplete:Boolean;
		private var _progress:uint;
		private var _total:uint;
		private var _needToExit:Function;
		private var _breakIndex:uint;

		private var _inData:Array;
		private var _width:Number;
		private var _height:Number;

		/**
		 * Constructor - sets the data to be rendered
		 * @param in data
		 * @param the chart/graphic width
		 * @param the chart/graphic height
		 */
		public function RunnableDataRenderer(data:*, width:Number, height:Number, runningTimeShare:Number = .5)
		{
			_inData = data;
			_total = _inData.length;
			_width = width;
			_height = height;
			_runningTimeShare = runningTimeShare;
			//create a new bitmapdata object
			_data = new BitmapData(_width, _height, false, 0xcccccc);
		}

		//===============================================
		// implement the IRunnable interface
		//===============================================

		/**
		 * the worker function
		 * iterates through the lines and parse them into an Object
		 * the result gets pushed into the data array
		 */
		public function run():void
		{
			//render each data point by setting a pixel
			var itr:uint;
			var len:uint = _inData.length;
			for (itr = _breakIndex; itr < len; itr++)
			{

				// check for return condition:
				if (_needToExit())
				{
					_breakIndex = itr;
					//set the progress made so far
					_progress = itr;
					return;
				}

				//generate a random color, adjust data to fi screen and render a cross
				var color:uint = _inData[itr].y * 0xFFFFFF;

				_inData[itr].x = _inData[itr].x * _width;
				_inData[itr].y = _height -(_inData[itr].y * _height);

				_data.setPixel(_inData[itr].x, _inData[itr].y, color);
				_data.setPixel(_inData[itr].x + 1, _inData[itr].y, color);
				_data.setPixel(_inData[itr].x - 1, _inData[itr].y, color);
				_data.setPixel(_inData[itr].x + 2, _inData[itr].y, color);
				_data.setPixel(_inData[itr].x - 2, _inData[itr].y, color);
				_data.setPixel(_inData[itr].x, _inData[itr].y + 1, color);
				_data.setPixel(_inData[itr].x, _inData[itr].y - 1, color);
				_data.setPixel(_inData[itr].x, _inData[itr].y + 2, color);
				_data.setPixel(_inData[itr].x, _inData[itr].y - 2, color);
			}

			//render the grid overlay
			var segments:int = 40;
			var interval:Number = _width / segments;
			var i:Number;

			for (i = 0; i <= _width; i += interval)
			{
				for (var j:int = 0; j <= _width; j++)
				{
					_data.setPixel(i, j, 0xEFEFEF);
				}
			}

			interval = _height / segments;
			for (i = 0; i <= _height; i += interval)
			{
				for (j = 0; j < _width; j++)
				{
					_data.setPixel(j, i, 0xEFEFEF);
				}
			}

			_inData = null;
			_needToExit = null;
			//set complete
			_isComplete = true;
		}

		/**
		 * caller needToExitSetter, this allows the caller to set the exit condition function as a
		 * worker can then invoke the needTo exit on caller which is handling the time management
		 *
		 * @param function reference;
		 */
		public function set needToExit(val:Function):void
		{
			_needToExit = val;
		}

		/**
		 * Indicate whether the parsing operation has completed;
		 */
		public function get isComplete():Boolean
		{
			return _isComplete;
		}

		/**
		 * indicates the progress of the operation in this case the
		 * current line that is being parsed
		 */
		public function get progress():uint
		{
			return _progress;
		}

		/**
		 * indicates the total amount of worked that needs to be performed
		 * in this case the number of csv lines
		 */
		public function get total():uint
		{
			return _total;
		}

		/**
		 * helds the output data array
		 */
		public function get data():*
		{
			return _data;
		}

		public function get runningTimeShare():Number
		{
			return _runningTimeShare;
		}

	}
}

References:

Sorry if I forgot anyone!

2 Comments

  1. Hello Claudiu,

    First of all, thanks a lot for this masterpiece of code… It’s solving a lot of AS3 problems… Really impressive work…

    I developed an AIR application in Flex, for manipulation of some chemical file formats ( more precisely *.sdf). The aim of this application is to read/parse *.sdf files (textual content where each record is separated with “$$$$” delimiter). After reading such a file, I’m splitting the file using the delimiter mentioned above, and after that using a similar pseudo-thread class I’m exporting each record as separate file locally. The pseudo-threader for saving is working perfectly, but I’m trying to implement it also for reading/parsing. Small files (~ 100MB) can be parsed without a problem without using pseudo-threading, but the big problem is to read bigger files ( > 200MB – 1GB or more). I tried already to adapt somehow your CSV parser for loading/parsing my files, but still with no success.

    Can you be so kind and give me an example of a modified version of the code above. Some big files of the *.sdf type can be found here (http://www.asinex.com/download-zone.html) as SD. Moreover, if it is possible, I want to store the loaded/parsed string in a TextArea, and not DataGrid.

    Thank you in advance and best regards,
    Nikola

Leave a Reply

Your email address will not be published. Required fields are marked *

*