My i3 dual screen workflow

Using the i3 tiling window manager on two screens („outputs“) can be challenging. The number of workspaces grows twice as fast than with a single screen setup and it is easy to lose track of the numbers and contents of workspaces. For me personally it is more intuitive to remember a certain sequence of workspaces on each screen, which seemingly extends above and below the workspace presently displayed. (It may be that this intuition has been coined by using the GNOME Shell over extended periods.)

In order to achieve a comparable user experience within i3, I use the following Python script with the keybindings presented below. The script is inspired by an article on the i3 homepage by user captnfab. It has one dependency: ziberna/i3-py, which can be installed with pip3 install i3-py. As is apparent from the keybindings, Ctrl+Alt+Up/Down are used to switch the present workspace on the focused output. With the same keys together with +Shift you can take the focused window with you.

# by Fabian Stanke
# Sequentially switch workspaces on present output

import i3
import argparse

parser = argparse.ArgumentParser(
    description='i3 workspace switcher.')

    '--move', action='store_true',
    help='take the focused container with you when moving.')
    'direction', choices=['next', 'prev'],
    help='defines in which direction to switch.')

args = parser.parse_args()

workspaces = i3.get_workspaces()

# Determine focused workspace (and thus the focused output)
focused_ws = next((w for w in workspaces if w['focused']))

# Collect all workspaces of the focused output
ws_names = list(w['name'] 
                for w in workspaces 
                if w['output'] == focused_ws['output'])

# Determine position of focused worspace in that collection
idx = ws_names.index(focused_ws['name'])
target = focused_ws['name']

if args.direction == 'next':
	# Determine next workspace	

	if (idx + 1 < len(ws_names)):
		target = ws_names[idx + 1]
		# Determine last number used on this output
		maxidx = 1
		# Determine unused numbers 
		used = {}
		for w in workspaces:
				widx = int(w['name'])
				used[widx] = True
				if w['output'] == focused_ws['output']:
					maxidx = max(widx, maxidx)
		# Increment to create new name
		while used.get(maxidx, False):
			maxidx += 1
		target = str(maxidx)

elif args.direction == 'prev':
	# Determine previous workspace

	if (idx - 1 >= 0):
		target = ws_names[idx - 1]
	#else remain at first workspace

if args.move:
	# Move the focused container to the target workspace first
	i3.command('move', 'container to workspace ' + target)

# Switch
#print("switch to " + target)

My preferred keybindings to actually use the above script are:

bindsym Ctrl+Mod1+Down exec next
bindsym Ctrl+Mod1+Up exec prev
bindsym Ctrl+Mod1+Shift+Down exec --move next
bindsym Ctrl+Mod1+Shift+Up exec --move prev

More parrallel programming

As a faithful follower of the Fedora Planet, today I stumbled upon a post about parallel programming in Python. Having made similar experiences myself, I would like to add another alternative for parallel programming in Python. I could have posted this in the comments of the original post, but this way the formatting is nicer.

My point is, that Parallel Python is a really nice library, but the functionality (at least at the level demonstrated here) is also provided by the multiprocessing module included with Python.

Here is my slightly modified implementation of the same program:

Another asynchronous python example
import multiprocessing
import time
def background_stuff(num):
  return "%s I'm done" % num
if __name__ == "__main__":
    print "Start at:" , time.asctime(time.localtime())
    pool = multiprocessing.Pool()
    print "Start doing something"
    it = pool.imap(background_stuff, [(1,), (2,)])
    print "Do something..."
    print " ... do something else..."
    print "End at:", time.asctime(time.localtime())

How-to reduce fail2ban memory usage

This morning, when I did the routinely scan of the server’s resource usage history, I noticed a suspicious network activity between 1 and 5 am. Some reading of the latest log files soon identified the traffic to have been caused by a dictionary attack on my SSH server. I took the opportunity to extend my current setup for the script-kiddie enemy called fail2ban. This program monitors potentially any service’s log file for failed login attempts and if their number exceeds a certain limit, it blocks the issuing host using iptables rules.

Unfortunately the first start of the new service turned out to blow up the memory usage by about 100 MB which is unacceptable regarding the tight resources of my virtual private server. As I found out, others had similar experience and switched to DenyHosts due to this issue. My experience with setting up Trac two weeks ago taught me that a Python application (like fain2ban) might consume a lot of memory only because of the relatively oversized default stack size on Linux.

The means to reduce the default stack size in Linux are widely known to be the limits.conf file and the ulimit command. But how to use those two in my situation? The solution turns out to be a one-liner on Debian Lenny: All I had to do was to append the ulimit command to my /etc/default/fail2ban file.

This is the changed /etc/default/fail2ban file:

# This file is part of Fail2Ban.
# Fail2Ban is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
# Fail2Ban is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# GNU General Public License for more details.
# You should have received a copy of the GNU General Public License
# along with Fail2Ban; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
# Author: Cyril Jaquier
# $Revision: 1.2 $

# Command line options for Fail2Ban. Refer to "fail2ban-client -h" for
# valid options.


ulimit -s 256

Using this sets the default stack size for the Python instances running fail2ban to 256 KB and lowers the memory consumption of fail2ban approximately by a factor of 10.

Find overlapping polygons with Python

Yesterday I was confronted with a seemingly simple problem: how can one find out if the rectangles or, more general, polygons on a surface are overlapping or not? Surprisingly, because they have an amazing collection of tools, the matplotlib library used in this context to actually draw the rectangles, doesn’t seem to have this kind of functionality. Of course there are loads of examples on the web for 2D collision detection, but I couldn’t find one written in Python. But nevertheless, I found lecture notes on Robert Pless’s website, which taught me the quadrant method to check if one point lies inside a polygon. Then it is only a small step to find if polygons are intersecting.

After writing a Python module providing the functionality, it naturally turned out to be rather slow on big numbers of polygons to be checked. So I continued to write a function which checks which polygons in a given set are overlapping or touching. To use the resources on my machine efficiently, I made two versions of this later function: one for conservative, serial processing and a second which uses Parallel Python to distribute the workload among the CPUs found in the system.

If someone else needs this kind of functionaltiy as well or simply is interested in how I did it, here is the interesting part. The whole file with unit tests and documentation can be downloaded as well:

import pylab

class PolygonsTouching( Exception ):
    """ This exception is triggered when two polygons touch at one point.

    This is for internal use only and will be caught before returning.

    def __init__( self, x=0, y=0 ):
        self.x, self.y = x, y
    def __str__( self ):
        return 'The tested polygons at least touch each other at (%f,%f)'\
               % ( self.x, self.y )
    def shift( self, dx, dy ):
        self.x += dx
        self.y += dy

def pair_overlapping( polygon1, polygon2, digits = None ):
    """ Find out if polygons are overlapping or touching.

    The function makes use of the quadrant method to find out if a point is
    inside a given polygon.

    polygon1, polygon2 -- Two arrays of [x,y] pairs where the last and the
        first pair is the same, because the polygon has to be closed.
    digits -- The number of digits relevant for the decision between
        separate and touching or touching and overlapping

    Returns 0 if the given polygons are neither overlapping nor touching,
    returns 1 if they are not overlapping, but touching and
    returns 2 if they are overlapping


    def calc_walk_summand( r1, r2, digits = None ):
        """ Calculates the summand along one edge depending on axis crossings.

        Follows the edge between two points and checks if one or both axes are
        being crossed. If They are crossed in clockwise sense, it returns +1
        otherwise -1. Going through the origin raises the PolygonsTouching

        Returns one of -2, -1, 0, +1, +2 or raises PolygonsTouching

        x, y = 0, 1 # indices for better readability
        summand = 0 # the return value
        tx, ty = None, None # on division by zero, set parameters to None
        if r1[x] != r2[x]:
            ty = r1[x] / ( r1[x] - r2[x] ) # where it crosses the y axis
        if r1[y] != r2[y]:
            tx = r1[y] / ( r1[y] - r2[y] ) # where it crosses the x axis
        if tx == None: tx = ty
        if ty == None: ty = tx
        rsign = pylab.sign
        if digits != None:
            rsign = lambda x: pylab.sign( round( x, digits ) )
        sign_x = rsign( r1[x] + tx * ( r2[x] - r1[x] ) )
        sign_y = rsign( r1[y] + ty * ( r2[y] - r1[y] ) )
        if ( tx >= 0 ) and ( tx < 1 ):
            if ( sign_x == 0 ) and ( sign_y == 0 ):
                raise PolygonsTouching()
            summand += sign_x * pylab.sign( r2[y] - r1[y] )
        if ( ty >= 0 ) and ( ty < 1 ):
            if ( sign_x == 0 ) and ( sign_y == 0 ):
                raise PolygonsTouching()
            summand += sign_y * pylab.sign( r1[x] - r2[x] )
        return summand

    def current_and_next( iterable ):
        """ Returns an iterator for each element and its following element.

        iterator = iter( iterable )
        item =
        for next in iterator:
            yield ( item, next )
            item = next

    def point_in_polygon( xy, xyarray, digits = None ):
        """ Checks if a point lies inside a polygon using the quadrant method.

        This moves the given point to the origin and shifts the polygon
        accordingly. Then for each edge of the polygon, calc_walk_summand is
        called. If the sum of all returned values from these calls is +4 or -4,
        the point lies indeed inside the polygon. Otherwise, if a
        PolygonsTouching exception has been caught, the point lies on ond of
        the edges of the polygon.

        Returns the number of nodes of the polygon, if the point lies inside,
        otherwise 1 if the point lies on the polygon and if not, 0.

        moved = xyarray - xy # move currently checked point to the origin (0,0)
        touching = False # this is used only if no overlap is found
        walk_sum = 0
        for cnxy in current_and_next( moved ):
                walk_sum += calc_walk_summand( cnxy[0], cnxy[1], digits )
            except PolygonsTouching, (e):
                e.shift( *xy )
                touching = True
        if ( abs( walk_sum ) == 4 ):
            return len( xyarray )
        elif touching:
            return 1
            return 0

    def polygons_overlapping( p1, p2, digits = None ):
        """ Checks if one of the nodes of p1 lies inside p2.

        This repeatedly calls point_in_polygon for each point of polygon p1
        and immediately returns if it is the case, because then the polygons
        are obviously overlapping.

        Returns 2 for overlapping polygons, 1 for touching polygons and 0

        degree_of_contact = 0
        xyarrays = [ p1, p2 ]
        for xy in xyarrays[0]:
            degree_of_contact += point_in_polygon( xy, xyarrays[1], digits )
            if degree_of_contact >= len( xyarrays[1] ):
                return 2
        if degree_of_contact > 0:
            return 1
            return 0

    way1 = polygons_overlapping( polygon1, polygon2, digits )
    way2 = 0
    if way1 < 2: # Only if the polygons are not already found to be overlapping
        way2 = polygons_overlapping( polygon2, polygon1, digits )
    return max( way1, way2 )

def collection_overlapping_serial( polygons, digits = None ):
    """ Similar to the collection_overlapping function, but forces serial

    result = []
    pickle_polygons = [p.get_xy() for p in polygons]
    for i in xrange( len( polygons ) ):
        for j in xrange( i+1, len( polygons ) ):
            result.append( ( i, j, \
                pair_overlapping( pickle_polygons[i], pickle_polygons[j], \
                                  digits ) ) )
    return result

def __cop_bigger_job( polygons, index, digits = None ):
    """ This is a helper to efficiently distribute workload among processors.

    result = []
    for j in xrange( index + 1, len( polygons ) ):
        result.append( ( index, j, \
            pair_overlapping( polygons[index], polygons[j], digits ) ) )
    return result

def collection_overlapping_parallel( polygons, digits = None, \
        ncpus = 'autodetect' ):
    """ Like collection_overlapping, but forces parallel processing.

    This function crashes if Parallel Python is not found on the system.

    import pp
    ppservers = ()
    job_server = pp.Server( ncpus, ppservers=ppservers )
    pickle_polygons = [p.get_xy() for p in polygons]
    jobs = []
    for i in xrange( len( polygons ) ):
        job = job_server.submit( __cop_bigger_job, \
                                 ( pickle_polygons, i, digits, ), \
                                 ( pair_overlapping, PolygonsTouching, ), \
                                 ( "pylab", ) )
        jobs.append( job )
    result = []
    for job in jobs:
        result += job()
    return result

def collection_overlapping( polygons, digits = None ):
    """ Look for pair-wise overlaps in a given list of polygons.

    The function makes use of the quadrant method to find out if a point is
    inside a given polygon. It invokes the pair_overlapping function for each
    combination and produces and array of index pairs of these combinations
    together with the overlap number of that pair. The overlap number is 0 for
    no overlap, 1 for touching and 2 for overlapping polygons.

    This function automatically selects between a serial and a parallel
    implementation of the search depending on whether Parallel Python is
    installed and can be imported or not.

    polygons -- A list of arrays of [x,y] pairs where the last and the first
        pair of each array in the list is the same, because the polygons have
        to be closed.
    digits -- The number of digits relevant for the decision between
        separate and touching or touching and overlapping polygons.

    Returns a list of 3-tuples

        import pp # try if parallel python is installed
    except ImportError:
        return collection_overlapping_serial( polygons, digits )
        return collection_overlapping_parallel( polygons, digits )