initial commit

This commit is contained in:
Sergey Obukhov
2014-07-23 21:12:54 -07:00
commit 170f11038b
80 changed files with 7481 additions and 0 deletions

View File

View File

@@ -0,0 +1,238 @@
# -*- coding: utf-8 -*-
from .. import *
import os
from flanker import mime
from talon.signature import bruteforce
def test_empty_body():
eq_(('', None), bruteforce.extract_signature(''))
def test_no_signature():
msg_body = 'Hey man!'
eq_((msg_body, None), bruteforce.extract_signature(msg_body))
def test_signature_only():
msg_body = '--\nRoman'
eq_((msg_body, None), bruteforce.extract_signature(msg_body))
def test_signature_separated_by_dashes():
msg_body = '''Hey man! How r u?
---
Roman'''
eq_(('Hey man! How r u?', '---\nRoman'),
bruteforce.extract_signature(msg_body))
msg_body = '''Hey!
-roman'''
eq_(('Hey!', '-roman'), bruteforce.extract_signature(msg_body))
msg_body = '''Hey!
- roman'''
eq_(('Hey!', '- roman'), bruteforce.extract_signature(msg_body))
msg_body = '''Wow. Awesome!
--
Bob Smith'''
eq_(('Wow. Awesome!', '--\nBob Smith'),
bruteforce.extract_signature(msg_body))
def test_signature_words():
msg_body = '''Hey!
Thanks!
Roman'''
eq_(('Hey!', 'Thanks!\nRoman'),
bruteforce.extract_signature(msg_body))
msg_body = '''Hey!
--
Best regards,
Roman'''
eq_(('Hey!', '--\nBest regards,\n\nRoman'),
bruteforce.extract_signature(msg_body))
msg_body = '''Hey!
--
--
Regards,
Roman'''
eq_(('Hey!', '--\n--\nRegards,\nRoman'),
bruteforce.extract_signature(msg_body))
def test_iphone_signature():
msg_body = '''Hey!
Sent from my iPhone!'''
eq_(('Hey!', 'Sent from my iPhone!'),
bruteforce.extract_signature(msg_body))
def test_mailbox_for_iphone_signature():
msg_body = """Blah
Sent from Mailbox for iPhone"""
eq_(("Blah", "Sent from Mailbox for iPhone"),
bruteforce.extract_signature(msg_body))
def test_line_starts_with_signature_word():
msg_body = '''Hey man!
Thanks for your attention.
--
Thanks!
Roman'''
eq_(('Hey man!\nThanks for your attention.', '--\nThanks!\nRoman'),
bruteforce.extract_signature(msg_body))
def test_line_starts_with_dashes():
msg_body = '''Hey man!
Look at this:
--> one
--> two
--
Roman'''
eq_(('Hey man!\nLook at this:\n\n--> one\n--> two', '--\nRoman'),
bruteforce.extract_signature(msg_body))
def test_blank_lines_inside_signature():
msg_body = '''Blah.
-Lev.
Sent from my HTC smartphone!'''
eq_(('Blah.', '-Lev.\n\nSent from my HTC smartphone!'),
bruteforce.extract_signature(msg_body))
msg_body = '''Blah
--
John Doe'''
eq_(('Blah', '--\n\nJohn Doe'), bruteforce.extract_signature(msg_body))
def test_blackberry_signature():
msg_body = """Heeyyoooo.
Sent wirelessly from my BlackBerry device on the Bell network.
Envoyé sans fil par mon terminal mobile BlackBerry sur le réseau de Bell."""
eq_(('Heeyyoooo.', msg_body[len('Heeyyoooo.\n'):]),
bruteforce.extract_signature(msg_body))
msg_body = u"""Blah
Enviado desde mi oficina móvil BlackBerry® de Telcel"""
eq_(('Blah', u'Enviado desde mi oficina móvil BlackBerry® de Telcel'),
bruteforce.extract_signature(msg_body))
@patch.object(bruteforce, 'get_delimiter', Mock(side_effect=Exception()))
def test_crash_in_extract_signature():
msg_body = '''Hey!
-roman'''
eq_((msg_body, None), bruteforce.extract_signature(msg_body))
def test_signature_cant_start_from_first_line():
msg_body = """Thanks,
Blah
regards
John Doe"""
eq_(('Thanks,\n\nBlah', 'regards\n\nJohn Doe'),
bruteforce.extract_signature(msg_body))
@patch.object(bruteforce, 'SIGNATURE_MAX_LINES', 2)
def test_signature_max_lines_ignores_empty_lines():
msg_body = """Thanks,
Blah
regards
John Doe"""
eq_(('Thanks,\nBlah', 'regards\n\n\nJohn Doe'),
bruteforce.extract_signature(msg_body))
def test_get_signature_candidate():
# if there aren't at least 2 non-empty lines there should be no signature
for lines in [], [''], ['', ''], ['abc']:
eq_([], bruteforce.get_signature_candidate(lines))
# first line never included
lines = ['text', 'signature']
eq_(['signature'], bruteforce.get_signature_candidate(lines))
# test when message is shorter then SIGNATURE_MAX_LINES
with patch.object(bruteforce, 'SIGNATURE_MAX_LINES', 3):
lines = ['text', '', '', 'signature']
eq_(['signature'], bruteforce.get_signature_candidate(lines))
# test when message is longer then the SIGNATURE_MAX_LINES
with patch.object(bruteforce, 'SIGNATURE_MAX_LINES', 2):
lines = ['text1', 'text2', 'signature1', '', 'signature2']
eq_(['signature1', '', 'signature2'],
bruteforce.get_signature_candidate(lines))
# test long lines not encluded
with patch.object(bruteforce, 'TOO_LONG_SIGNATURE_LINE', 3):
lines = ['BR,', 'long', 'Bob']
eq_(['Bob'], bruteforce.get_signature_candidate(lines))
# test list (with dashes as bullet points) not included
lines = ['List:,', '- item 1', '- item 2', '--', 'Bob']
eq_(['--', 'Bob'], bruteforce.get_signature_candidate(lines))
def test_mark_candidate_indexes():
with patch.object(bruteforce, 'TOO_LONG_SIGNATURE_LINE', 3):
# spaces are not considered when checking line length
eq_('clc',
bruteforce._mark_candidate_indexes(
['BR, ', 'long', 'Bob'],
[0, 1, 2]))
# only candidate lines are marked
# if line has only dashes it's a candidate line
eq_('ccdc',
bruteforce._mark_candidate_indexes(
['-', 'long', '-', '- i', 'Bob'],
[0, 2, 3, 4]))
def test_process_marked_candidate_indexes():
eq_([2, 13, 15],
bruteforce._process_marked_candidate_indexes(
[2, 13, 15], 'dcc'))
eq_([15],
bruteforce._process_marked_candidate_indexes(
[2, 13, 15], 'ddc'))
eq_([13, 15],
bruteforce._process_marked_candidate_indexes(
[13, 15], 'cc'))
eq_([15],
bruteforce._process_marked_candidate_indexes(
[15], 'lc'))
eq_([15],
bruteforce._process_marked_candidate_indexes(
[13, 15], 'ld'))

View File

@@ -0,0 +1,148 @@
# -*- coding: utf-8 -*-
from .. import *
import os
from PyML import SparseDataSet
from talon.signature.learning import dataset
from talon import signature
from talon.signature import extraction as e
from talon.signature import bruteforce
def test_message_shorter_SIGNATURE_MAX_LINES():
sender = "bob@foo.bar"
body = """Call me ASAP, please.This is about the last changes you deployed.
Thanks in advance,
Bob"""
text, extracted_signature = signature.extract(body, sender)
eq_('\n'.join(body.splitlines()[:2]), text)
eq_('\n'.join(body.splitlines()[-2:]), extracted_signature)
def test_messages_longer_SIGNATURE_MAX_LINES():
for filename in os.listdir(STRIPPED):
filename = os.path.join(STRIPPED, filename)
if not filename.endswith('_body'):
continue
sender, body = dataset.parse_msg_sender(filename)
text, extracted_signature = signature.extract(body, sender)
extracted_signature = extracted_signature or ''
with open(filename[:-len('body')] + 'signature') as ms:
msg_signature = ms.read()
eq_(msg_signature.strip(), extracted_signature.strip())
stripped_msg = body.strip()[:len(body.strip())-len(msg_signature)]
eq_(stripped_msg.strip(), text.strip())
def test_text_line_in_signature():
# test signature should consist of one solid part
sender = "bob@foo.bar"
body = """Call me ASAP, please.This is about the last changes you deployed.
Thanks in advance,
some text which doesn't seem to be a signature at all
Bob"""
text, extracted_signature = signature.extract(body, sender)
eq_('\n'.join(body.splitlines()[:2]), text)
eq_('\n'.join(body.splitlines()[-3:]), extracted_signature)
def test_long_line_in_signature():
sender = "bob@foo.bar"
body = """Call me ASAP, please.This is about the last changes you deployed.
Thanks in advance,
some long text here which doesn't seem to be a signature at all
Bob"""
text, extracted_signature = signature.extract(body, sender)
eq_('\n'.join(body.splitlines()[:-1]), text)
eq_('Bob', extracted_signature)
body = """Thanks David,
some *long* text here which doesn't seem to be a signature at all
"""
((body, None), signature.extract(body, "david@example.com"))
def test_basic():
msg_body = 'Blah\r\n--\r\n\r\nSergey Obukhov'
eq_(('Blah', '--\r\n\r\nSergey Obukhov'),
signature.extract(msg_body, 'Sergey'))
def test_over_2_text_lines_after_signature():
body = """Blah
Bob,
If there are more than
2 non signature lines in the end
It's not signature
"""
text, extracted_signature = signature.extract(body, "Bob")
eq_(extracted_signature, None)
def test_no_signature():
sender, body = "bob@foo.bar", "Hello"
eq_((body, None), signature.extract(body, sender))
def test_handles_unicode():
sender, body = dataset.parse_msg_sender(UNICODE_MSG)
text, extracted_signature = signature.extract(body, sender)
@patch.object(signature.extraction, 'has_signature')
def test_signature_extract_crash(has_signature):
has_signature.side_effect = Exception('Bam!')
msg_body = u'Blah\r\n--\r\n\r\nСергей'
eq_((msg_body, None), signature.extract(msg_body, 'Сергей'))
def test_mark_lines():
with patch.object(bruteforce, 'SIGNATURE_MAX_LINES', 2):
# we analyse the 2nd line as well though it's the 6th line
# (starting from the bottom) because we don't count empty line
eq_('ttset',
e._mark_lines(['Bob Smith',
'Bob Smith',
'Bob Smith',
'',
'some text'], 'Bob Smith'))
with patch.object(bruteforce, 'SIGNATURE_MAX_LINES', 3):
# we don't analyse the 1st line because
# signature cant start from the 1st line
eq_('tset',
e._mark_lines(['Bob Smith',
'Bob Smith',
'',
'some text'], 'Bob Smith'))
def test_process_marked_lines():
# no signature found
eq_((range(5), None), e._process_marked_lines(range(5), 'telt'))
# signature in the middle of the text
eq_((range(9), None), e._process_marked_lines(range(9), 'tesestelt'))
# long line splits signature
eq_((range(7), [7, 8]),
e._process_marked_lines(range(9), 'tsslsless'))
eq_((range(20), [20]),
e._process_marked_lines(range(21), 'ttttttstttesllelelets'))
# some signature lines could be identified as text
eq_(([0], range(1, 9)), e._process_marked_lines(range(9), 'tsetetest'))
eq_(([], range(5)),
e._process_marked_lines(range(5), "ststt"))

View File

View File

@@ -0,0 +1,51 @@
# -*- coding: utf-8 -*-
from ... import *
import os
from PyML import SparseDataSet
from talon.utils import to_unicode
from talon.signature.learning import dataset as d
from talon.signature.learning.featurespace import features
def test_is_sender_filename():
assert_false(d.is_sender_filename("foo/bar"))
assert_false(d.is_sender_filename("foo/bar_body"))
ok_(d.is_sender_filename("foo/bar_sender"))
def test_build_sender_filename():
eq_("foo/bar_sender", d.build_sender_filename("foo/bar_body"))
def test_parse_msg_sender():
sender, msg = d.parse_msg_sender(EML_MSG_FILENAME)
# if the message in eml format
with open(EML_MSG_FILENAME) as f:
eq_(sender,
" Alex Q <xxx@yahoo.com>")
eq_(msg, f.read())
# if the message sender is stored in a separate file
sender, msg = d.parse_msg_sender(MSG_FILENAME_WITH_BODY_SUFFIX)
with open(MSG_FILENAME_WITH_BODY_SUFFIX) as f:
eq_(sender, u"john@example.com")
eq_(msg, f.read())
def test_build_extraction_dataset():
if os.path.exists(os.path.join(TMP_DIR, 'extraction.data')):
os.remove(os.path.join(TMP_DIR, 'extraction.data'))
d.build_extraction_dataset(os.path.join(EMAILS_DIR, 'P'),
os.path.join(TMP_DIR,
'extraction.data'), 1)
test_data = SparseDataSet(os.path.join(TMP_DIR, 'extraction.data'),
labelsColumn=-1)
# the result is a loadable signature extraction dataset
# 32 comes from 3 emails in emails/P folder, 11 lines checked to be
# a signature, one email has only 10 lines
eq_(test_data.size(), 32)
eq_(len(features('')), test_data.numFeatures)

View File

@@ -0,0 +1,44 @@
# -*- coding: utf-8 -*-
from ... import *
from talon.signature.learning import featurespace as fs
def test_apply_features():
s = '''John Doe
VP Research and Development, Xxxx Xxxx Xxxxx
555-226-2345
john@example.com'''
sender = 'John <john@example.com>'
features = fs.features(sender)
result = fs.apply_features(s, features)
# note that we don't consider the first line because signatures don't
# usually take all the text, empty lines are not considered
eq_(result, [[1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1],
[1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0],
[0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0],
[0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0]])
with patch.object(fs, 'SIGNATURE_MAX_LINES', 4):
features = fs.features(sender)
new_result = fs.apply_features(s, features)
# result remains the same because we don't consider empty lines
eq_(result, new_result)
def test_build_pattern():
s = '''John Doe
VP Research and Development, Xxxx Xxxx Xxxxx
555-226-2345
john@example.com'''
sender = 'John <john@example.com>'
features = fs.features(sender)
result = fs.build_pattern(s, features)
eq_(result, [2, 0, 1, 0, 1, 0, 0, 0, 0, 0, 0, 1])

View File

@@ -0,0 +1,236 @@
# -*- coding: utf-8 -*-
from ... import *
import regex as re
from talon.signature.learning import helpers as h
from talon.signature.learning.helpers import *
# First testing regex constants.
VALID = '''
15615552323
1-561-555-1212
5613333
18008793262
800-879-3262
0-800.879.3262
04 3452488
04 -3452488
04 - 3452499
(610) 310-5555 x5555
533-1123
(021)1234567
(021)123456
(000)000000
+7 920 34 57 23
+7(920) 34 57 23
+7(920)345723
+7920345723
8920345723
21143
2-11-43
2 - 11 - 43
'''
VALID_PHONE_NUMBERS = [e.strip() for e in VALID.splitlines() if e.strip()]
def test_match_phone_numbers():
for phone in VALID_PHONE_NUMBERS:
ok_(RE_RELAX_PHONE.match(phone), "{} should be matched".format(phone))
def test_match_names():
names = ['John R. Doe']
for name in names:
ok_(RE_NAME.match(name), "{} should be matched".format(name))
def test_sender_with_name():
ok_lines = ['Sergey Obukhov <serobnic@example.com>',
'\tSergey <serobnic@example.com>',
('"Doe, John (TX)"'
'<DowJ@example.com>@EXAMPLE'
'<IMCEANOTES-+22Doe+2C+20John+20'
'+28TX+29+22+20+3CDoeJ+40example+2Ecom+3E'
'+40EXAMPLE@EXAMPLE.com>'),
('Company Sleuth <csleuth@email.xxx.com>'
'@EXAMPLE <XXX-Company+20Sleuth+20+3Ccsleuth'
'+40email+2Exxx+2Ecom+3E+40EXAMPLE@EXAMPLE.com>'),
('Doe III, John '
'</O=EXAMPLE/OU=NA/CN=RECIPIENTS/CN=jDOE5>')]
for line in ok_lines:
ok_(RE_SENDER_WITH_NAME.match(line),
'{} should be matched'.format(line))
nok_lines = ['', '<serobnic@xxx.ru>', 'Sergey serobnic@xxx.ru']
for line in nok_lines:
assert_false(RE_SENDER_WITH_NAME.match(line),
'{} should not be matched'.format(line))
# Now test helpers functions
def test_binary_regex_search():
eq_(1, h.binary_regex_search(re.compile("12"))("12"))
eq_(0, h.binary_regex_search(re.compile("12"))("34"))
def binary_regex_match(prog):
eq_(1, h.binary_regex_match(re.compile("12"))("12 3"))
eq_(0, h.binary_regex_match(re.compile("12"))("3 12"))
def test_flatten_list():
eq_([1, 2, 3, 4, 5], h.flatten_list([[1, 2], [3, 4, 5]]))
@patch.object(h.re, 'compile')
def test_contains_sender_names(re_compile):
with patch.object(h, 'extract_names',
Mock(return_value=['bob', 'smith'])) as extract_names:
has_sender_names = h.contains_sender_names("bob.smith@example.com")
extract_names.assert_called_with("bob.smith@example.com")
for name in ["bob", "Bob", "smith", "Smith"]:
ok_(has_sender_names(name))
extract_names.return_value = ''
has_sender_names = h.contains_sender_names("bob.smith@example.com")
# if no names could be extracted fallback to the email address
ok_(has_sender_names('bob.smith@example.com'))
# don't crash if there are no sender
extract_names.return_value = ''
has_sender_names = h.contains_sender_names("")
assert_false(has_sender_names(''))
def test_extract_names():
senders_names = {
# from example dataset
('Jay Rickerts <eCenter@example.com>@EXAMPLE <XXX-Jay+20Rickerts'
'+20+3CeCenter+40example+2Ecom+3E+40EXAMPLE@EXAMPLE.com>'):
['Jay', 'Rickerts'],
# if `,` is used in sender's name
'Williams III, Bill </O=EXAMPLE/OU=NA/CN=RECIPIENTS/CN=BWILLIA5>':
['Williams', 'III', 'Bill'],
# if somehow `'` or `"` are used in sender's name
'Laura" "Goldberg <laura.goldberg@example.com>':
['Laura', 'Goldberg'],
# extract from senders email address
'<sergey@xxx.ru>': ['sergey'],
# extract from sender's email address
# if dots are used in the email address
'<sergey.obukhov@xxx.ru>': ['sergey', 'obukhov'],
# extract from sender's email address
# if dashes are used in the email address
'<sergey-obukhov@xxx.ru>': ['sergey', 'obukhov'],
# extract from sender's email address
# if `_` are used in the email address
'<sergey_obukhov@xxx.ru>': ['sergey', 'obukhov'],
# old style From field, found in jangada dataset
'wcl@example.com (Wayne Long)': ['Wayne', 'Long'],
# if only sender's name provided
'Wayne Long': ['Wayne', 'Long'],
# if middle name is shortened with dot
'Sergey N. Obukhov <serobnic@xxx.ru>': ['Sergey', 'Obukhov'],
# not only spaces could be used as name splitters
' Sergey Obukhov <serobnic@xxx.ru>': ['Sergey', 'Obukhov'],
# finally normal example
'Sergey <serobnic@xxx.ru>': ['Sergey'],
# if middle name is shortened with `,`
'Sergey N, Obukhov': ['Sergey', 'Obukhov'],
# if mailto used with email address and sender's name is specified
'Sergey N, Obukhov [mailto: serobnic@xxx.ru]': ['Sergey', 'Obukhov'],
# when only email address is given
'serobnic@xxx.ru': ['serobnic'],
# when nothing is given
'': [],
# if phone is specified in the `From:` header
'wcl@example.com (Wayne Long +7 920 -256 - 35-09)': ['Wayne', 'Long'],
# from crash reports `nothing to repeat`
'* * * * <the_pod1@example.com>': ['the', 'pod'],
'"**Bobby B**" <copymycashsystem@example.com>':
['Bobby', 'copymycashsystem'],
# from crash reports `bad escape`
'"M Ali B Azlan \(GHSE/PETH\)" <aliazlan@example.com>':
['Ali', 'Azlan'],
('"Ridthauddin B A Rahim \(DD/PCSB\)"'
' <ridthauddin_arahim@example.com>'): ['Ridthauddin', 'Rahim'],
('"Boland, Patrick \(Global Xxx Group, Ireland \)"'
' <Patrick.Boland@example.com>'): ['Boland', 'Patrick'],
'"Mates Rate \(Wine\)" <amen@example.com.com>':
['Mates', 'Rate', 'Wine'],
('"Morgan, Paul \(Business Xxx RI, Xxx Xxx Group\)"'
' <paul.morgan@example.com>'): ['Morgan', 'Paul'],
'"David DECOSTER \(Domicile\)" <decosterdavid@xxx.be>':
['David', 'DECOSTER', 'Domicile']
}
for sender, expected_names in senders_names.items():
extracted_names = h.extract_names(sender)
# check that extracted names could be compiled
try:
re.compile("|".join(extracted_names))
except Exception, e:
ok_(False, ("Failed to compile extracted names {}"
"\n\nReason: {}").format(extracted_names, e))
if expected_names:
for name in expected_names:
assert_in(name, extracted_names)
else:
eq_(expected_names, extracted_names)
# words like `ru`, `gmail`, `com`, `org`, etc. are not considered
# sender's names
for word in h.BAD_SENDER_NAMES:
eq_(h.extract_names(word), [])
# duplicates are not allowed
eq_(h.extract_names("sergey <sergey@example.com"), ["sergey"])
def test_categories_percent():
eq_(0.0, h.categories_percent("qqq ggg hhh", ["Po"]))
eq_(50.0, h.categories_percent("q,w.", ["Po"]))
eq_(0.0, h.categories_percent("qqq ggg hhh", ["Nd"]))
eq_(50.0, h.categories_percent("q5", ["Nd"]))
eq_(50.0, h.categories_percent("s.s,5s", ["Po", "Nd"]))
eq_(0.0, h.categories_percent("", ["Po", "Nd"]))
@patch.object(h, 'categories_percent')
def test_punctuation_percent(categories_percent):
h.punctuation_percent("qqq")
categories_percent.assert_called_with("qqq", ['Po'])
def test_capitalized_words_percent():
eq_(0.0, h.capitalized_words_percent(''))
eq_(100.0, h.capitalized_words_percent('Example Corp'))
eq_(50.0, h.capitalized_words_percent('Qqq qqq QQQ 123 sss'))
eq_(100.0, h.capitalized_words_percent('Cell 713-444-7368'))
eq_(100.0, h.capitalized_words_percent('8th Floor'))
eq_(0.0, h.capitalized_words_percent('(212) 230-9276'))
def test_has_signature():
ok_(h.has_signature('sender', 'sender@example.com'))
ok_(h.has_signature('http://www.example.com\n555 555 5555',
'sender@example.com'))
ok_(h.has_signature('http://www.example.com\naddress@example.com',
'sender@example.com'))
assert_false(h.has_signature('http://www.example.com/555-555-5555',
'sender@example.com'))
long_line = ''.join(['q' for e in xrange(28)])
assert_false(h.has_signature(long_line + ' sender', 'sender@example.com'))
# wont crash on an empty string
assert_false(h.has_signature('', ''))
# dont consider empty strings when analysing signature
with patch.object(h, 'SIGNATURE_MAX_LINES', 1):
ok_('sender\n\n', 'sender@example.com')