"""rdfn3.[g|py] -- A Yapps grammar for RDF Notation 3 Based on Dan Connolly's rdfn3.g:- http://www.w3.org/2000/10/swap/rdfn3.g $Id: rdfn3.g,v 1.14 2002/01/12 23:37:14 connolly Exp $ Copyright (c) 2001 W3C (MIT, INRIA, Keio) http://www.w3.org/Consortium/Legal/copyright-software-19980720 Modifications by Sean B. Palmer under GPL 2 or later. REFERENCES Yapps: Yet Another Python Parser System http://theory.stanford.edu/~amitp/Yapps/ Sat, 18 Aug 2001 16:54:32 GMT Last modified 13:21 Sun 26 Nov 2000 , Amit Patel http://www.w3.org/DesignIssues/Notation3 """ import sys, re, urlparse # for urljoin URI, EXI, UNI, LIT = 'URIRef', 'Exivar', 'Univar', 'Lit' N3_forAll = ('http://www.w3.org/2000/10/swap/log#forAll', URI) N3_forSome = ('http://www.w3.org/2000/10/swap/log#forSome', URI) RDF_type = ('http://www.w3.org/1999/02/22-rdf-syntax-ns#type', URI) DAML_equivalentTo = ('http://www.daml.org/2001/03/daml+oil#equivalentTo', URI) DAML_first = ('http://www.daml.org/2001/03/daml+oil#first', URI) DAML_rest = ('http://www.daml.org/2001/03/daml+oil#rest', URI) DAML_nil = ('http://www.daml.org/2001/03/daml+oil#nil', URI) from string import * import re from yappsrt import * class _ParserScanner(Scanner): def __init__(self, str): Scanner.__init__(self,[ ('"}"', '}'), ('"{"', '{'), ('"\\\\]"', '\\]'), ('"\\\\["', '\\['), ('"\\\\)"', '\\)'), ('"\\\\("', '\\('), ('"="', '='), ('"a"', 'a'), ('"this"', 'this'), ('","', ','), ('"of"', 'of'), ('"is"', 'is'), ('";"', ';'), ('"\\\\."', '\\.'), ('"@prefix"', '@prefix'), ('\\s+', '\\s+'), ('[ \\t]*#[^\\r\\n]*\\r?\\n', '[ \\t]*#[^\\r\\n]*\\r?\\n'), ('URIREF', '<[^ >]*>'), ('PREFIX', '([A-Za-z][A-Za-z0-9]*)?:'), ('QNAME', '([A-Za-z][A-Za-z0-9_]*)?:[A-Za-z0-9_]+'), ('EXIVAR', '_:[A-Za-z0-9_]+'), ('UNIVAR', '\\?[A-Za-z0-9_]+'), ('STRLITA', '"[^"\\\\]*(\\\\.[^"\\\\]*)*"'), ('STRLITB', '"""[^"\\\\]*((\\\\.|"(?!""))[^"\\\\]*)*"""'), ('END', '\\Z'), ], ['\\s+', '[ \\t]*#[^\\r\\n]*\\r?\\n'], str) class _Parser(Parser): def document(self): scp = self.this while self._peek('END', '"@prefix"', '"\\\\["', 'URIREF', 'QNAME', 'EXIVAR', 'UNIVAR', '"a"', '"="', '"this"', 'STRLITB', 'STRLITA', '"\\\\("', '"{"') != 'END': _token_ = self._peek('"@prefix"', '"\\\\["', 'URIREF', 'QNAME', 'EXIVAR', 'UNIVAR', '"a"', '"="', '"this"', 'STRLITB', 'STRLITA', '"\\\\("', '"{"') if _token_ == '"@prefix"': directive = self.directive() else: statement = self.statement(scp) END = self._scan('END') def directive(self): self._scan('"@prefix"') PREFIX = self._scan('PREFIX') URIREF = self._scan('URIREF') self._scan('"\\\\."') self.bind(PREFIX[:-1], URIREF[1:-1]) def statement(self, scp): clause = self.clause(scp) self._scan('"\\\\."') def clause(self, scp): _token_ = self._peek('"\\\\["', 'URIREF', 'QNAME', 'EXIVAR', 'UNIVAR', '"a"', '"="', '"this"', 'STRLITB', 'STRLITA', '"\\\\("', '"{"') if _token_ == '"\\\\["': phrase = self.phrase(scp) if self._peek('";"', '"is"', '","', '"\\\\."', 'URIREF', 'QNAME', 'EXIVAR', 'UNIVAR', '"a"', '"="', '"\\\\("', '"\\\\["', '"\\\\]"', '"}"') not in ['";"', '","', '"\\\\."', '"\\\\]"', '"}"']: popair = self.popair(scp, phrase) while self._peek('";"', '","', '"\\\\."', '"\\\\]"', '"}"') == '";"': self._scan('";"') popair = self.popair(scp, phrase) elif 1: term = self.term(scp) popair = self.popair(scp, term) while self._peek('";"', '","', '"\\\\."', '"\\\\]"', '"}"') == '";"': self._scan('";"') popair = self.popair(scp, term) def term(self, scp): _token_ = self._peek('URIREF', 'QNAME', 'EXIVAR', 'UNIVAR', '"a"', '"="', '"this"', 'STRLITB', 'STRLITA', '"\\\\("', '"\\\\["', '"{"') if _token_ not in ['"this"', 'STRLITB', 'STRLITA', '"{"']: expr = self.expr(scp) return expr else: # in ['"this"', 'STRLITB', 'STRLITA', '"{"'] name = self.name(scp) return name def popair(self, scp, subj): pred = self.pred(scp) objects = self.objects(scp, subj, pred) def pred(self, scp): _token_ = self._peek('"is"', 'URIREF', 'QNAME', 'EXIVAR', 'UNIVAR', '"a"', '"="', '"\\\\("', '"\\\\["') if _token_ != '"is"': expr = self.expr(scp) return (1, expr) else: # == '"is"' self._scan('"is"') expr = self.expr(scp) self._scan('"of"') return (-1, expr) def objects(self, scp, subj, pred): term = self.term(scp) self.gotStatement(subj, pred, term, scp) while self._peek('","', '";"', '"\\\\."', '"\\\\]"', '"}"') == '","': self._scan('","') term = self.term(scp) self.gotStatement(subj, pred, term, scp) def name(self, scp): _token_ = self._peek('"this"', 'STRLITB', 'STRLITA', '"{"') if _token_ == '"this"': self._scan('"this"') return scp elif _token_ == 'STRLITB': STRLITB = self._scan('STRLITB') return self.strlit(STRLITB) elif _token_ == 'STRLITA': STRLITA = self._scan('STRLITA') return self.strlit(STRLITA) else: # == '"{"' formula = self.formula() return formula def expr(self, scp): _token_ = self._peek('URIREF', 'QNAME', 'EXIVAR', 'UNIVAR', '"a"', '"="', '"\\\\("', '"\\\\["') if _token_ == 'URIREF': URIREF = self._scan('URIREF') return self.uriref(URIREF) elif _token_ == 'QNAME': QNAME = self._scan('QNAME') return self.qname(QNAME) elif _token_ == 'EXIVAR': EXIVAR = self._scan('EXIVAR') return self.bNode(EXIVAR) elif _token_ == 'UNIVAR': UNIVAR = self._scan('UNIVAR') return self.univar(UNIVAR) elif _token_ == '"a"': self._scan('"a"') return RDF_type elif _token_ == '"="': self._scan('"="') return DAML_equivalentTo elif _token_ == '"\\\\("': list = self.list(scp) return list else: # == '"\\\\["' phrase = self.phrase(scp) return phrase def list(self, scp): self._scan('"\\\\("') members = [] while self._peek('"\\\\)"', 'URIREF', 'QNAME', 'EXIVAR', 'UNIVAR', '"a"', '"="', '"this"', 'STRLITB', 'STRLITA', '"\\\\("', '"\\\\["', '"{"') != '"\\\\)"': term = self.term(scp) members.append(term) self._scan('"\\\\)"') return self.makeList(self.something("list"), members, scp) def phrase(self, scp): self._scan('"\\\\["') subj = self.something() if self._peek('";"', '"\\\\]"', '"is"', '","', 'URIREF', 'QNAME', 'EXIVAR', 'UNIVAR', '"a"', '"="', '"\\\\("', '"\\\\["', '"\\\\."', '"}"') not in ['";"', '"\\\\]"', '","', '"\\\\."', '"}"']: popair = self.popair(scp, subj) while self._peek('";"', '","', '"\\\\]"', '"\\\\."', '"}"') == '";"': self._scan('";"') popair = self.popair(scp, subj) self._scan('"\\\\]"') return subj def formula(self): self._scan('"{"') scp = self.newScope() clause = self.clause(scp) while self._peek('";"', '"\\\\."', '"}"', '","', '"\\\\]"') == '"\\\\."': self._scan('"\\\\."') clause = self.clause(scp) if self._peek('"\\\\."', '"}"') == '"\\\\."': self._scan('"\\\\."') self._scan('"}"') return scp def parse(rule, text): P = _Parser(_ParserScanner(text)) return wrap_error_reporter(P, rule) def scanner(text): return _ParserScanner(text) class BadSyntax(SyntaxError): pass class Parser(_Parser): def __init__(self, scanner, sink, baseURI): _Parser.__init__(self, scanner) self._sink = sink self._baseURI = baseURI self._serial = 1 self._prefixes = {} self._bNodes = {} self._univars = {} self._vars = {} # maps URIrefs to vars self.this = 'RootFormula' # no _formula hack! def bind(self, prefix, uriref): if uriref.endswith('#'): sep = '#' uriref = uriref[:-1] else: sep = '' uriref = urlparse.urljoin(self._baseURI, uriref) + sep self._sink.bind(prefix, (uriref, URI)) if prefix in self._prefixes.keys(): if self._prefixes[prefix] != uriref: raise "Prefix redeclared" else: self._prefixes[prefix] = uriref def uriref(self, str): return (urlparse.urljoin(self._baseURI, str[1:-1]), URI) def qname(self, qname): prefix, name = qname.split(':') try: ns = self._prefixes[prefix] except KeyError: raise BadSyntax, "prefix %s not bound" % prefix else: return (ns + name, URI) def bNode(self, str): label = str[2:] if label in self._bNodes.keys(): return self._bNodes[label] else: x = (label, EXI) while x in self._bNodes.values(): x = self.something(label) self._bNodes[label] = x return x def univar(self, str): label = str[1:] if label in self._univars.keys(): return self._univars[label] else: x = (label, UNI) while x in self._univars.values(): x = self.something(label, quant=N3_forAll) self._univars[label] = x return x def strlit(self, str): if str[:3] == '"""': return (str[3:-3].replace('\r', '\\r').replace('\n', '\\n'), LIT) else: return (str[1:-1], LIT) def newScope(self): return self.something("formula") def makeList(self, list, members, scp): if len(members) == 1: self.gotStatement(list, (0, DAML_first), members[0], scp) self.gotStatement(list, (0, DAML_rest), DAML_nil, scp) return list if len(members) > 0: first = list[:] for i in range(len(members)): self.gotStatement(first, (0, DAML_first), members[i], scp) if i != (len(members)-1): rest = self.something("rest") self.gotStatement(first, (0, DAML_rest), rest, scp) first = rest[:] self.gotStatement(rest, (0, DAML_rest), DAML_nil, scp) return list else: return DAML_nil def gotStatement(self, subj, pred, obj, scp): dir, pred = pred if dir < 0: subj, obj = obj, subj if subj in self._vars.keys(): subj = self._vars[subj] if pred in self._vars.keys(): pred = self._vars[pred] if obj in self._vars.keys(): obj = self._vars[obj] if ((subj == self.this) and (scp == self.this) and (pred == N3_forAll)): if '#' in obj[0]: x = (obj[0].split('#')[-1], UNI) else: x = ('x', UNI) while x in self._univars.values(): x = self.something("var", N3_forAll) self._vars[obj] = x else: self._sink.makeStatement((subj, pred, obj, scp)) def something(self, hint="thing", quant=N3_forSome): if quant == N3_forSome: it = ('%s%s' % (hint, self._serial), EXI) else: it = ('%s%s' % (hint, self._serial), UNI) self._serial = self._serial + 1 return it def preProcess(s): s = s.replace('\r\n', '\n').replace('\r', '\n') URIREF = r'<[^ >]*>' STRLITA = r'"[^"\\]*(?:\\.[^"\\]*)*"' STRLITB = r'"""[^"\\]*(?:(?:\\.|"(?!""))[^"\\]*)*"""' tokes = re.compile(r'(%s|%s|%s|#|\n|[ \t]+|\S+)' % \ (URIREF, STRLITB, STRLITA)).findall(s) + ['\n'] while '#' in tokes: cp = tokes.index('#') np = tokes[cp:].index('\n') + cp tokes = tokes[:cp] + tokes[np:] if tokes[:-1] == '\n': return ''.join(tokes[:-1]) else: return ''.join(tokes) if __name__=="__main__": print __doc__ # [EOF]