Skip to content

Commit

Permalink
Fix and integrate signature validation. It's also optional now and wa…
Browse files Browse the repository at this point in the history
…rns if packages are missing.
  • Loading branch information
Hagen Fritsch committed Jan 15, 2022
1 parent 9a3a67f commit d9e049f
Show file tree
Hide file tree
Showing 3 changed files with 175 additions and 106 deletions.
28 changes: 23 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,12 @@ Hintergrund: https://itooktheredpill.irgendwo.org/2010/onlinetickets-der-bahn/

## Weitere Quellen

2010 war das Format der Tickets weitgehend unbekannt und dieser
*proof of concept* Parser ist hauptsächlich durch reverse engineering
entstanden.

Im Laufe der Jahre sind weitere Quellen hinzugekommen
(siehe [#4](https://github.com/rumpeltux/onlineticket/issues/4):
(siehe [#4](https://github.com/rumpeltux/onlineticket/issues/4)):

* [Kontrolle des UIC 918.3*-Barcodes](https://web.archive.org/web/20180905231149/https://www.bahn.de/p/view/angebot/regio/barcode.shtml)
mit weiteren interessanten Informationen und Links.
Expand All @@ -26,10 +30,24 @@ deren relevante Downloads offenbar allerdings nur nach einer Registierung oder
## Installation & Abhängigkeiten

Das Skript muss nicht gesondert installiert werden.
Es wird das Paket `python-pyasn1` benötigt.
`parsepdfs.sh` benötigt zusätzlich `poppler-utils` and `imagemagick` um die
Bilder aus den PDFs zu extrahieren, sowie eine funktionierende
Java-Installation um den Barcode mithilfe von zxing zu dekodieren.

### Barcode Dekodierung

`parsepdfs.sh` extrahiert dekodiert den Barcode aus PDF Dateien.
Es benötigt `poppler-utils` and `imagemagick` um die Bilder aus den PDFs zu
extrahieren, sowie eine funktionierende Java-Installation um den Barcode
mithilfe von zxing zu dekodieren.

### Signaturprüfung

Die Signaturprüfung ist optional und hat folgende Abhängigkeiten:

* `python3-pyasn1` (Dekodierung der Signatur)
* `python3-pycryptodome` (Verifizierung der DSA Signatur).

Public-keys der bekannten Ticketanbieter müssen zunächst heruntergeladen werden:

python3 download_keys.py

## Benutzung

Expand Down
32 changes: 32 additions & 0 deletions download_keys.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
import csv
import itertools
import xml.etree.ElementTree as ET
import requests

def get_uic_certs():
certurl = 'https://railpublickey.uic.org/download.php'
xmlpubkeys = requests.get(certurl).text

root = ET.fromstring(xmlpubkeys)

for child in root:
issuer = int(child.find('issuerCode').text)
xid = int(child.find('id').text)
pubkey = child.find('publicKey').text
pubkey = f'-----BEGIN CERTIFICATE-----\n{pubkey}\n-----END CERTIFICATE-----'
yield (issuer, xid, pubkey)

def get_db_certs():
base_url = 'https://sourceforge.net/p/dbuic2vdvbc/code/ci/master/tree/certs/production/'
issuer = 80
for xid in [1,6,7,8]:
pubkey = requests.get(f'{base_url}{issuer:04}{xid:05}.pem?format=raw').text
yield (issuer, xid, pubkey)

if __name__ == '__main__':
with open('certs.csv', 'w') as certsfile:
certwriter = csv.writer(certsfile, delimiter='\t')
for issuer, xid, pubkey in itertools.chain(
get_uic_certs(),
get_db_certs()):
certwriter.writerow((issuer, xid, pubkey))
221 changes: 120 additions & 101 deletions onlineticket.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,20 +3,36 @@
# Parser für Online-Tickets der Deutschen Bahn nach ETF-918.3
# Copyright by Hagen Fritsch, 2009-2017

import base64
import csv
import datetime
import os
import re
import struct
import zlib
import base64
from Crypto.Hash import SHA1
from Crypto.PublicKey import DSA
from Crypto.Signature import DSS
from Crypto.Math.Numbers import Integer

import os
import asn1 # pip install asn1
import requests # for downloading public key
import xml.etree.ElementTree as ET # for parsing public key file
try: # pip install pycryptodome
from Cryptodome.Hash import SHA1
from Cryptodome.PublicKey import DSA
from Cryptodome.Signature import DSS
from Cryptodome.Math.Numbers import Integer
except:
try:
from Crypto.Hash import SHA
print('Please remove the deprecated python3-crypto package and install python3-pycryptodome instead.',
file=sys.stderr)
exit(1)
except:
print('Note: signature verification is disabled due to missing pycryptodome package.',
file=sys.stderr)
SHA1, DSA, DSS, Integer = None, None, None, None

try: # pip install pyasn1
import pyasn1.codec.der.decoder as asn1
except:
print('Note: signature verification is disabled due to missing pyasn1 package.',
file=sys.stderr)
asn1 = None

#utils
dict_str = lambda d: "\n"+"\n".join(["%s:\t%s" % (k, str_func(v).replace("\n", "\n")) for k,v in d.items()])
Expand Down Expand Up @@ -397,23 +413,86 @@ def __init__(self, *args, **kwargs):
raise


class SignatureVerificationError(Exception):
pass

def get_pubkey(issuer, keyid):
if get_pubkey.certs is None:
certs_filename = os.path.join(os.path.dirname(__file__), 'certs.csv')
if not os.path.exists(certs_filename):
raise SignatureVerificationError(
f'certificate store not found: {certs_filename}\n'
'Use download_keys.py to create it.')
get_pubkey.certs = {}
with open(certs_filename) as certsfile:
certreader = csv.reader(certsfile, delimiter='\t')
for cert_issuer, xid, pubkey in certreader:
get_pubkey.certs[(int(cert_issuer), int(xid))] = pubkey
try:
return get_pubkey.certs[(int(issuer), int(keyid))]
except KeyError:
raise SignatureVerificationError(f'Public key not found (issuer={issuer}, keyid={keyid})')

get_pubkey.certs = None

def verifysig(message, signature, pubkey):
if DSS is None or asn1 is None: # pycryptodome package is missing
raise SignatureVerificationError('Signature verification disabled')
if not signature:
raise SignatureVerificationError('Signature asn1 parsing error.')

r, s = signature

rbytes = Integer(r).to_bytes()
sbytes = Integer(s).to_bytes()

verifykey = DSA.import_key(pubkey)
h = SHA1.new(message)
verifier = DSS.new(verifykey, 'fips-186-3')

try:
verifier.verify(h, rbytes+sbytes)
return True
except ValueError as e:
raise SignatureVerificationError("Signature NOT valid: " + str(e))

class OT(DataBlock):
def signature_decode(self, res):
'''Parses the asn1 signature and extracts the (r,s) tuple.'''
if not asn1: return None
decoded = asn1.decode(self.read(50))[0]
return (int(decoded[0]), int(decoded[1]))

def signature_validity(self, res):
if len(self.stream) - self.offset - res['data_length'] > 0:
return 'INVALID (trailing data)'
if len(self.stream) - self.offset - res['data_length'] < 0:
return 'INVALID (incomplete ticket data)'

try:
pubkey = get_pubkey(issuer=res['carrier'],
keyid=res['key_id'])
result = verifysig(self.stream[self.offset:], res['signature'], pubkey)
except SignatureVerificationError as e:
return str(e)

return 'VALID' if result else 'INVALID'

generic = [
('header', 3),
('version', 2),
('carrier', 4),
('key_id', 5),
('signature', 50),
# ('signature', 0, None,
# lambda self, res: decoder.decode(self.read(50))),
#('padding', 0, None, lambda self, res: self.read(4 - self.offset%4)) #dword paddng
]
fields = [
('signature', 0, None, signature_decode),
('data_length', 4, int),
('ticket', 0, None,
lambda self, res: read_blocks(
zlib.decompress(self.read(res['data_length'])), read_block))
]
('signature_validity', 0, None, signature_validity),
]

fields = [
('ticket', 0, None, lambda self, res: read_blocks(
zlib.decompress(self.read(self.header['data_length'])), read_block)),
]


def read_block(data, offset):
block_types = {b'U_HEAD': OT_U_HEAD,
Expand Down Expand Up @@ -442,96 +521,36 @@ def fix_zxing(data):
ZXing parser seems to return utf-8 encoded binary data.
See also http://code.google.com/p/zxing/issues/detail?id=1260#c4
"""
return data.decode('utf-8').encode('latin1')


def get_pubkey(issuer, keyid, force_update=False):
keyfilename = 'keys.xml'
if (not os.path.isfile(keyfilename)) or force_update:
print("Downloading new keys.")
certurl = 'https://railpublickey.uic.org/download.php'
xmlpubkeys = requests.get(certurl).text
with open(keyfilename, 'w') as xmlout:
xmlout.write(xmlpubkeys)
else:
print("Reading existing keys from disk.")
with open(keyfilename, 'r') as xmlin:
xmlpubkeys = xmlin.read()

root = ET.fromstring(xmlpubkeys)

issuer = issuer.decode('utf-8').lstrip('0')
keyid = keyid.decode('utf-8').lstrip('0')

for child in root:
ic = child.find('issuerCode').text
xid = child.find('id').text
if ic == issuer and xid == keyid:
return child.find('publicKey').text

sys.stderr.write("Public key not found!")
return None

def verifysig(message, signature, pubkey):
# get r and s out of the ASN-1
decoder = asn1.Decoder()
decoder.start(signature)
tag, seq = decoder.read()
decoder.start(seq)
tag, r = decoder.read()
tag, s = decoder.read()

rbytes = Integer(r).to_bytes()
sbytes = Integer(s).to_bytes()

verifykey = DSA.import_key(base64.b64decode(pubkey))

h = SHA1.new(message)
verifier = DSS.new(verifykey, 'fips-186-3')

try:
verifier.verify(h, rbytes+sbytes)
print("Signature is valid.")
return True
except ValueError:
print("Signature NOT valid.")
return False


data = data.decode('utf-8').encode('latin1')
# zxing parsing also adds a newline to the end of the file. remove that.
if data.endswith(b'\n'):
data = data[:-1]
return data

if __name__ == '__main__':
import sys
if len(sys.argv) < 2:
print('Usage: %s [ticket_files]' % sys.argv[0])
ots = {}
for ticket in sys.argv[1:]:
try:
tickets = [readot(i) for i in open(ticket)]
except:
content = open(ticket, 'rb').read()
tickets = [content]
for binary_ticket in tickets:
try:
tickets = [readot(i) for i in open(ticket)]
except:
tickets = [open(ticket, 'rb').read()]
for ot in tickets:
try:
ots.setdefault(ticket, []).append(OT(ot))
except Exception as e:
try:
ots.setdefault(ticket, []).append(OT(fix_zxing(ot)))
except Exception as f:
sys.stderr.write('ORIGINAL: %s\nZXING: %s\n%s: Error: %s (orig); %s (zxing)\n' %
(repr(ot), repr(fix_zxing(ot)), ticket, e, f))
raise
print(dict_str(ots))


for ot in ots:
for ticket in ots[ot]:
issuer = ticket.header['carrier']
keyid = ticket.header['key_id']
pubkey = get_pubkey(issuer, keyid)
if pubkey:
signature = ticket.header['signature']
rawticket = ticket.stream[68:]
verifysig(rawticket, signature, pubkey)

ot = OT(binary_ticket)
except Exception as e:
try:
fixed = fix_zxing(binary_ticket)
ot = OT(fixed)
except Exception as f:
sys.stderr.write('ORIGINAL: %s\nZXING: %s\n%s: Error: %s (orig); %s (zxing)\n' %
(repr(ot), repr(fixed), ticket, e, f))
raise
print(ot)
ots.setdefault(ticket, []).append(ot)

# Some more sample functionality:
# 1. Sort by date
Expand Down

0 comments on commit d9e049f

Please sign in to comment.