Extensible operations

Operations in SpatialHadoop are implemented as regular MapReduce programs. The main difference between spatial operations and regular operations is that the input file is spatially indexed. To read a spatially indexed file, you need to provide the correct InputFormat and RecordReader. In addition, to the regular map and reduce functions, SpatialHadoop allows you to provide a filter function that performs and early pruning step that prunes away file blocks that do not contribute to answer based on their minimal bounding rectangles (MBRs). This can be useful to decrease the number of map tasks for a job. In this tutorial, we will take you with a step by step intructions to write a spatial operation.

Range Query

We will use the range query as an example to describe how spatial operations are implemented. In range query, we have an input file that contains a set of shapes and a rectangular query area (A). The output is all shapes that overlap with the query area (A).

Design the operation

Before writing the MapReduce program for this operation, we need to think about it and decide how it should work. A naive implementation would scan over all shapes in the input file and select the shapes that overlap the query area. In SpatialHadoop, since the input file is indexed, we can utilize this index to avoid scanning the whole file. The input file is partitioned and each partition is stored in a separate block. If the boundaries of a partition is disjoint with the query area, it indicates that all shapes inside this query area are also disjoing. Hence, an initial filter step is to remove all blocks that are disjoint with the query area. This leaves only the blocks that overlap with the query area. For each overlappping block, we need to find shapes that overlap the query area. This simple algorithm is almost correct. There is one glitch that needs to be handled. As some shapes in the input file might overlap two partitions, they are replicated to each of these partitions. If the query area overlaps these two partitions and overlaps this shape, the shape will be reported twice in the answer. To avoid this situation, we implement a duplicate avoidance technique which ensures that each shape is reported once. This is done by calculating the intersection of the query area and the block boundaries (cell intersection) and the intersection of the query area and the shape (shape intersection). If the top left point of the shape intersection falls inside the cell intersection, the answer is reported, otherwise the answer is skipped.

Filter function

The spatial filter function takes as input all blocks in an input file, and outputs the subset of blocks that needs to be processed. For range query, it selects the blocks that overlap the query area. The code will look like the following.

RangeFilter.java:

      
public class RangeFilter extends DefaultBlockFilter {
  public void selectBlocks(SimpleSpatialIndex gIndex,
      ResultCollector output) {
    gIndex.rangeQuery(queryRange, output);
  }
}
      

This code simply selects and returns all blocks that overlap the query range. The MBR of each block was calculated earlier when the file was indexed. Note that to access the query area in the filter function, it needs to be set in the job configuration file and read in RangeFilter#configure method. This is not shown here for brevity but you can check the soruce code for more informaiton.

Map function

The map function takes as input the contents of one block, and it selects and output all shapes overlapping the query area. We will show here how the map function looks like if the blocks are indexed as R-tree.

RangeQuery.java:

      
public void map(final CellInfo cellInfo, RTree shapes,
final OutputCollector output, Reporter reporter) {
shapes.search(queryShape.getMBR(), new ResultCollector() {
public void collect(T shape) {
  try {
    boolean report_result = false;
    if (cellInfo.cellId == -1) {
      report_result = true;
    } else {
      Rectangle intersection =
          queryShape.getMBR().getIntersection(shape.getMBR());
      report_result = cellInfo.contains(intersection.x, intersection.y);
    }
    if (report_result)
      output.collect(dummy, shape);
  } catch (IOException e) {
    e.printStackTrace();
  }
}
});
}      
      

The above code simple issues a range query against the R-tree built in this block to find all shapes overlapping the query area. For matching shapes, the duplicate avoidance test is carried out to decide whether to report this shape in answer or not. If the cell ID is -1, this indicates that there is no MBR associated with this block. This means that records in input file are not partitioned, hence, no replication and the answer should be reported. Otherwise, the test described earlier is done. Depending on the result of this test, the answer is finally reported.

Reduce function

Since the output of the map function is the final answer, no reduce step is needed. The reduce function is not provided for this operation.

Job configuration

The final step is how to configure the job. We will focus on the parts that are specific to the range query and/or SpatialHadoop. Regular configuration (e.g., number of map tasks or input files) are not mentioned for brevity.

RangeQuery.java:

 
job.setNumReduceTasks(0);
job.setClass(SpatialSite.FilterClass, RangeFilter.class, BlockFilter.class);
RangeFilter.setQueryRange(job, queryShape);
job.setMapperClass(Map.class);
job.setInputFormat(RTreeInputFormat.class);
job.set(QUERY_SHAPE_CLASS, queryShape.getClass().getName());
job.set(QUERY_SHAPE, queryShape.toText(new Text()).toString());
job.set(SpatialSite.SHAPE_CLASS, shape.getClass().getName());
      

Setting number of reduce jobs to zero ensures that the output of the map function goes directly to output. The filter function is set using job.setClass method. The query range used by the filter function is set using the RangeFilter.setQueryRange method. Then, the map function is set as normal. The input format is set to RTReeInputFormat since blocks are R-tree indexed. After that, the query shape is set in job configuration to make it accessible to the map function. Finally, the SHAPE_CLASS is set to indicate the type of shapes stored in input file.

Once the job is configured correctly, it is submitted to SpatialHadoop for processing as a normal MapReduce job. The output is stored to HDFS in the configured output file. You can check the job counters to see how many splits were created and see that only the subset of blocks overlapping the query range were processed.