ÿØÿà JFIF    ÿÛ „ !.%+&8&+/1555$;@;4?.451 4,$,44444444444414444444444444444444444444444444444444ÿÀ  á á" ÿÄ     ÿÄ ?    !1AQaq"2‘¡±ÁðBRbrÑá#‚’¢²3S CñÿÄ   ÿÄ !    !1QAa‘2ÿÚ   ? 5˜Z¯V¦cø)›t/? z¨±>Õ5€¶‹Á¤·¼z¼Ü¬+ñ®v¤¨_ˆR­BFn©—˜ý®ç̝P8gýt·ÉSTŦˆìät?þé¼íìN/Þa)ì–í6ô… Ï¿øÃj´¿KÇü]ÿ ªô¹-eKànëÕHTx}ýSÜ›ÿ ”7Ø×&µ<¦  ¥ÑO¶[Ù¯ä¨ÞÃÿ PZ-¬;#õ|•oaÿ ©CìÞz3˜öː/¤­ñTûIØ}š^ mÓ%ªxˆ¥ÉŸu=Z+ISe¿45™¼u;ú&WØ÷€æßQ™®{|íx*TC“#ZŠìZ§²‹ 6pv…³¿¡äª*áZÐ%ÒOáˆo"x«OHk w±æ+¬V(kMúŸ5Vö«$ ÁrÏbàb57/luR ¸ÑÛj Òµì`Мq­û žICÀÊ•©4€Âcà¨Ï€O´<èÐ:›ù(Ë^L8þ‘ÍÌ#¸Ð_Ì©ÙK(Öz 4¬û+¸;ü’V’84‘¬ÃŽ:[â‡ÔÌáõp¢~§ªlæ£ö{®G>J¼"°‡7¯ÆÉèßû ‹É‹§ÁòÃýâßî ^ƾÙõ‹×óH#«LP½ïX=xÑÍ$|W?•~• îëÔ©ª‹ {ÝT…Kÿ ”hûâá)J*ö˜–ÔU;iÇ€/ ÆþjóZ\ýwØ=Ìm ºèËL9 ýèÆð/¨’¥öo=nË.%Îì ŽÕ¯È|{Oj²ƒE6e/ßdÄõ²Ìâ1O®ò×TsəԸhOMýíMˆ¿¼H˜l²,7Â¥#MF/Úf°Ö½± ¸–dr‹NýÊ íjqx{œÉ ä-È ¦ øÄër¨q°ð †nцýÑÄÆ’mä…n<0È™;ÁÝá¯ÁZƒ7FÀmì­ É&9ˆîéi¶ùN§Y• ÃZãAâ?•‡©‰ , ó¾IŸŠc1 4â&y­&pŠ­6;M À 0¹qç»p.á …ŸÅáK@%6·y6ƒ‰3?”úºŽ‰éX5ªPT §µ!=Mž«Ú½‹ÅgÂSâÉaþÓoö–¯ÁÔìR>5éÿ üs¶ÆUcÌ kÇR ]ÿ ù¬¼«VŽ;Â|‡~¢¦”ÏŰæ {L™Õ°Óv¹ò¸írޡעCÃ!íVÕ {¶»sŒNPg/ "uÕbkm²“$ďå¿é¹§°½æz¯6 †s¿!s–wÚÝ“™Œ °.ûj>·+™Òa…©Œ&rÝÎtÛë긪Ît’LAVp%c Úý[ÄzJ¾ÇàXXç@˜ó<êL]·T˜¾¥1Ó©V‡g´æ½¦Ý@¹óø!_@´ÞâSÁ —S3™•& ]@JHÚý©ZŽ €×æÔr»Áf!‡yÞ4Mv*èÓã_{‘åóUuљØ«Oïé*®EvÑ Œ÷‡U \"㪒ÍK+À 4“M¡ï:0¥5í!'<@î´”>Ç»&Z–ïCCV˜Ì5Šo&îhè.žû |ÓK©h$s6KìŒëã)¹hI¦GïOåóI;ììü#É$Š0…Ææ¥TØ.5­¾gn´ “ÂÖ\:hœ89G)J@„}œ:’Ò{/Š"¦_Æ×7Æ3VÇŠÊa]ÚŒÙ€Ä–=®uÁßâACZƒ§§£ Qnâ:«,×{tyø¬iÛcœÜÄ€H½ÄÍCk´÷šß .W'b¤Íåh]÷€=,Žv×cÚEÚHXJX¶îo¨FÒtèöŸ>ªª6[J®Fµ£sGÁeqõfe\íjÒÐïÄÐGˆe1Ø‹.Ø”‘Ëuø Y­ˆÜ ŽG|zùªüMpDnQWÄ”%JŠ™)â*p@Örš«ÕT2Ð%ˆG#ª„ ·¤!°ŸOTÂT¸aÚ%4&h™LµšØüÐ.F¿²ÐÞ_Ç‚¾ÅÃaÜ÷09Æ q€öy˜v‡85õN÷]¬äѼóS{°_MެúÔ#°Ç¸0åÞè2ëôPcvÆw9®ií1Ä8F™˜à‰´+‰Ik1òÝ7“Ñ×ÒsÝ\x‚h`ÞÑ`ó"|µEcý£n˜h`}GÞ !±ù²Ápü²ß6 0ïi󜵩SÈÇ7˜-ÕURO˜¦´f$ªž-Í6(œ}<„ éc øs]ŽŽ„*—¾ ìdŽ„)méª\¿êÎIg¾ØÞ~I#C/¼¼´EÁÈŽi8“©õådô·>euä ƒ'Ê×लR1ÉJE1ÐAát`t;ÇР%Ý<‡¥„ÍÆ`×Oyó)õiI€ñQaŸ4Ûù\áàaÃÔ¹HÃu¹*k€¦<„e S‡&õÏ B!ŽhüÞ`yj}mªf×\¿ Ç~æ­9‡û\՞Ǖg²1Žû5V7 !àöšm° c`ܬøÇìµÒ'P"?…´Ö,"§^•õލsÔ)6˜sæéÍR¼ ò|Sl”‹7 nPW Gòú÷½§O¯‡„l¡kSÞŒr½PÊ@æ¢pŽ-mÿ #Ÿ˜Àº¶Áä¦;ïÔæ$1££`“Õ>„—·ž)ßð³ñ#Ï Ô$¶œ‰ÊE‹À;÷º ¯«P:Ñ”8–IÊtpÞ3ª“>ê“þës4ò2OÏÕ­±zô†Õ§‰.÷ä¸;¿˜“'œ›žª}«Œ{ª±Ì 9ÔóÞÕ‡0 $íWV3Üì¬ —@kÝ4@¿r¼±½¬™›?øØæ´'Áé®CË3-g$˜ö‡×auÚi´Žp/êÛ æF›Ú2v‹ã¿¿,nB1̨ƃqÞa5͝@&Æû“él÷ \C²½UÍc ¯k×¢U ÖéQå™—-r wô ÞÏ<Ò=&=ÿ Ôê Òêˈt,i—;LîÜ á¸*ÚÃ1$êL•LÍ <É)ýÐà’ ;F™{ƒ™˜€&'}‚ãÄK`¡ÞT@I;®žZóè‚s’7®°›+§O­Åq©é»²9<Ô J ¼9O’HL»Ùïì¸rk¼Ž_ý‘TŸu[²ßÚŒ·ü÷B%¯E ŸÔX5êO´ Ç•€’I0 ÉJX` ñ¹õ%;µŸD‘«´€àwÒ™U ûئžÖö\×®×´8 ½‡ºÐÆÓ§?Àkmœ=;d5*@-ì0F Rªýš[Ü6âö̃ڸr*KA9· u*µæ£?U¸Âêí†8@¦X4 e-ò„0s{ HâUpU?¼mñRa°®a%Ð'tÉ×’\¾ÊÉ]t›h>·(Ë@R¼¡Ãt h}’O÷au<+nT…Ö…MӐ??Óe95 q>í/;&JSû °¯ÊéÞ øƒ*Ã2½Ài&:nôUl=¾¿5eˆ3”ñc|Ú2V”>„»&eE;«ÚäC p¢Û úy 9š[ŒÌx¼擼A&DåÒ¯ˆ¤ÀÌ;"˜ ÏQä¸åhÊ}Ûq«Û0WžÒ|»€ø®öCm5•\ÇÀ§Pe3£]0ÃàLDÉ‰1øªxjgwT‚÷¿LΨK‹›ùs—xˆÜ±µ kæ¸f‰‰ÜGk/LÛØ6d9ò¶ùA{ƒA3š/¬D¬khÓk‰`˜"㯒r¿±Óã jx‡°e}<Ñø\3y:'À•/h½Í€Ç4~g ?Û(¼]v‘ªlKÎâ~?O‚W%{Ì:“'©úNq¾›úo(X’¥¯ˆ nFê{Ç€ü?º'ë ø‹ì Þ09ŒÌç9Æ —ËC`j@ÓÄ(+a‹un¸#ÂꟋ{K`‘ÑÍÍ'à´»/Û,KW;Þ4²þð ï Nm|~fGÏ(…³Ã)«1ö­Õ ¥‡¨©ƒÃ™ü-s=à=U66Ï«Ýc蓦W¹íž®›nÔ%êÇìŒ<#Ü×84ån®Ð ÒåOC` ñânÑs‡¢ç 1õ%Îhì½Ã½® e:ݼUZo™`  ÅZŸŒÊ«ê1ÏÄo$q¹Þ€©ˆhÐÉä¯ñ[!…Ú˜àJ:x2$Íß&PåT£6ç— ‡Í*4Ýšçjÿ ‰É nófÐ ó(L5C•åÆ\rMÒ@ò }y-W}™üýVù—ú¢=Ù”c®‘< M ž ´Phr ¦©TD ‘ù.$´÷O‡‘V2Æò.=IUŒ=ž‡â¬i™aþÓåÙ?òUø'ØÖ•.~* šTŒ!•-×áºTâ®ä#õü'´ eýlYÅÓeÕKÂrT"CÚ@u!Óxƒ{š3€}1¿(r}%«nËamjÑ%ÑNEò v ˜à  σöK³,*º.àzù¨™Ó ÚçâU¦*¿ 9{%Ö¹ njûdaXöb) kÛÆ±ûÓ\°M7ˆÂ=û›ç¿Ã‚­V»Cg–8ÙêE- j)k$º`Ã-ùEýeBÆÇ]c¡°ñty&Òd0nõ'¡W+ƒ*|–øµFa\GQªEAÔp5\Ǽ·¼Ç8·õ -â§Ú[ ‡ uZeÖ 3}×d'+¹:ð+K†Û®s!Ï$úe€<Û”x)1»a­¡LC]¸µík…ÚàA»AYº{†ªS[¦5HÒ7ù --,ísòDØ€èk ÞÀîÜ ò@â( ËNˆë›4ô½•/¦o‡€Û7 ê•ÆêòðÜy'Án½µ á˜ݦ ndeo…[ì¶Ê,¥R³Ä=À±—–ß;£™´ñSâ*g§”ïaið‘Jå~™ÓÞ ß³Õ¢»8x埒²52>AÊb&-÷\7´éÄù€T˜,w;3{ï˜k…à¹ÄqÀ«œ{€\ ˆ¾[´¨јr &Úé„Ívˆ±8†¿]|¬ņ4I×pÞS1ÈÖz‰#Ìv‡G!YNògñ:màTz¢Ý1ô©^O=~ë|5Bã™ç•¼µõ•bÆ@úÕS¬ÈŒ#¬zünrŸ û” Z²•èðV"ÁHÚý©wÝ €7¼Ìu1hÑa3Éä û f$o¿É ™Ú›ÝçnpÒ3äÌ3†Í§,Äï]$‰/pê †«À¼¸e9­Æê_C]žƒ·ý·frÁN«, E=›Çq -‰öŒ:aÏ¿±í&£Í:-} 84‘ÿ eƒQÑeëSsuiA ³g㟥ú£?ÿ ʼn*”“÷aühe:ÊWa@ÒÞk±eØ] F Ô—r.åä˜ @ö¥ªZoÐýYL·¥S²G/‡ñ <~*ZÆ´è>JlòàÛÆ½ÿ 窘ìGN¢:I®KšJp/`íIÁÀõ#Ä-€ö­šµŒoF4|ÆQØÆ@Ì|£Ô…¢À{9˜è½Üó›€ôYÒÎYsið;ís¤€à²ˆ‚4qÉVŒI$ ‰"° æµ8cXGjœˏ¡Aâý•ËÜ¢ûï e·çLx']á"oÅÎê3¯Ç—¹”ó0nå‚âg{Œñ> S´˜îè°g238‚ãköÝfÚd´6Ò€;ò÷±¢™¼›º ¢Æ'¥Ðx'e¬ç ]bÈÆV¢ó‹kýBO ðÊâ$Ÿ!×T 3Mýמ žìٍàÌü‘8÷€àæØ8æ©6‰©L´«…oãpð„~Çk‰!ñ;‹”ÛžÍ àž±z Ÿôû øŸÝužÏ;ÿ #|u6™Þ¬ÚˆÐõA4¶â|ôl|Ê2ŽÇ¤ÝÅÇY.<#Aí.k§hóF‚”Y; M½Ö4hŸ4&›­¿tès´%FìL¥£Ãk‰ÇT¤haÁ¤ÚxfÉ`ÑìË›>i 3t‚:,–+^÷´–{Û–Nxi"x‘Ûg î¨>¥Õ܁ùZH,2Û“:8xÊ¢Çí9.É-Ìâã-=çjwµS˜dütžçwýGòú®®ûº_ˆýx$–¡ãøO EÚÛÏ÷R„×w+3£Á£öUMyR²¹âŒ°š›¸Ñãò9§Ó_Dl+Ùßc›úšGÅÌc†Ž!Ko=¶.‘Îÿ c²(2®V mª.ÿ ¹B›¹å ù„öŸSV>™ü¯$y:G¢Z×àøúdî¹û­·ýÇ´:•c LÍõi_‹ö+ÎæGÊè>OŠ•äž´§Þ{X}¨1ÚTc›»Qþ•êô°t¿OP?eæ~É{5]•ÙR£r5†nZ\ã@ &îJõ ¾àC°þV>fé¥/ü5ñÊIº_é5 ;e­h<@ Ä&æÃëE%;X,ÒãÆÞ`Oò¦kŸm#˜!ÀyÄ¢| óLšò¥Ä` ¶R=|ÈCâh5ò3DˆïF†ðÒ#ÅìÛœ?¸yhBãœí ZxßÎÄhºRK„`Þödvײ™ÀÈÑÒgŒuY w³%†ƒÓzõ ÖÏp‚dH®¦A´ù§»ÓÇMæ~)ˆð‡û:ù&Ä •vGD´À n ݇¼Ö8Fö óáà£~Ë¥x`oK|Ä?fxiØü%pìR>éò+Û±éÎ>núlFŤ'tq8LZÏvÃ?„¡ß±È⽆¯³íü@x|PöUäèØã¡ð‚ŒAìÏ"vÍwóŸÍ{ ý0.z È•Ö{,N¡£¡ŸKÕÙž>Ýœþ ÍÀ°<×EA!Å‚D™IúOÍ¡>ôG}Â` ÍßkÜL™Ž Þð™ {IøF²¹òQ3&!ÃÂÞz.d&Ï-sH¸,Ôõ˜ŽP€ 77ˆÝ¼ÊëÜw =cÕ Ú,ØÐ5ÎYÐ)ì´öœgŒ[¤ßv㙑8心>h]§µháYš£²ºÑ.{Ï7Sð•?´~×SÃKýJÛ˜ ™Íäiúu<µX¶1õ^kâçIÑ£sZ4h>j*ÔšD:4­¿_ ÷¸ Õxæÿ ¸?Mù _•­ÊÐ ä ÷ý ÑwL œ­ïnTkÛUÍN©ë:¦fV ¶ÜÔÜMªÅâA½–¿R×TXš-%iTÊT•‡Ù‚JôϐZxWÑè‰f‰òG º ×Õû2aZ7OU3[“×AT–ÞŒ…-‘¤”Ì ì&(ˆ¿­•ƒkï’:ðY¦W‘ Å)“†‘˜³Åtcø˜ñTÂwÚÇ4|üLÇªí–v- qˆèU qPE.†â‘˜µ Æ,ÐÅs]8¾„oúÑ i>ÜxxÈó)ƒ ´æÁâØ$À‰vžŸf$Ž |ãw;ÀÁIJ»b` {¦Ó¤Ú$©YÀ‘n@Óïž«9J¼êG m¤ ܯ¹ÌW4€ÐÒÅÛ‡#褕Ÿn-?í|с¥÷Ú¹¬'´ÞÜ9ÓK `hê£SÄSà?7—Wí_´…óB›»:=Ãïq`<8ñÓŒÑlú2d¬ê³£hÖ[l|$vÝro~'R®‰§°ñmY ͧäP |PUª¹·:3Œ[Û{Xÿ ºâ@‚W–Äé u‚ ¯´*=íή.pûÒdt @G‰¬ s¸ ëÉücr ÞæÑ¨Ê@>¤¢Ö±. Þ'¯°ÌME[YéïĵÂCå½ Ué©Áû'Ê9%eÔðNU”ë‘ÌsD3/®+UI˜9h.WC”빓$#:pz:YÓ ¿xž* ³$Í +$kñAŠ‹†¢ Uê>¸)_š¬÷©ßAÂÔb9ÇU ¯¾á•9¯ÏÏ÷O÷¼¼Fähal1‰3Ì[Ïr•´UCksNÐ] R‘¸¥H+§Šé†c©vÖÞ0iÓ76s†î!§=ß ¼~Ô'°Ãmäoäš³ªøi1úÉ)³yV8 CLÄØÁ‘WYïi€H6ÖÑiámø^ÈY´°Ñ7¥Û*—Ñ©L«Qƒï—Ùrÿ ›£Ð*š¸ˆL©ˆ$ˆ ÷¾D§9È®«qbqC)–ˆïv´çñsÑVT­Ø, <àïºÀO«Jý·õ àfPìð .wFšir´þ’2_Y *Æ€x\« ì€9š@ Ž|F⇥ˆkZ@hÖÄ0t¿-<“‹qµ¾*ZL¤Ú)&BJpÓF5=$„at*Zš$’ÑtdûÝRI1 2މ$€$I$#‰SÞ’Hë¬ï;Á$¡t$’`<(ñÇt)$‡Ð.Êf¢X’Kt=Éé$‚ˆªè¢oÝëòI%Rgcª÷ŠyI%¡‰ÿ !ñ)´õ $¤ Ô’IIGÿÙimport os import sys import time import stat import socket import email import email.message import re import io import tempfile from test import support import unittest import textwrap import mailbox import glob class TestBase: all_mailbox_types = (mailbox.Message, mailbox.MaildirMessage, mailbox.mboxMessage, mailbox.MHMessage, mailbox.BabylMessage, mailbox.MMDFMessage) def _check_sample(self, msg): # Inspect a mailbox.Message representation of the sample message self.assertIsInstance(msg, email.message.Message) self.assertIsInstance(msg, mailbox.Message) for key, value in _sample_headers.items(): self.assertIn(value, msg.get_all(key)) self.assertTrue(msg.is_multipart()) self.assertEqual(len(msg.get_payload()), len(_sample_payloads)) for i, payload in enumerate(_sample_payloads): part = msg.get_payload(i) self.assertIsInstance(part, email.message.Message) self.assertNotIsInstance(part, mailbox.Message) self.assertEqual(part.get_payload(), payload) def _delete_recursively(self, target): # Delete a file or delete a directory recursively if os.path.isdir(target): support.rmtree(target) elif os.path.exists(target): support.unlink(target) class TestMailbox(TestBase): maxDiff = None _factory = None # Overridden by subclasses to reuse tests _template = 'From: foo\n\n%s\n' def setUp(self): self._path = support.TESTFN self._delete_recursively(self._path) self._box = self._factory(self._path) def tearDown(self): self._box.close() self._delete_recursively(self._path) def test_add(self): # Add copies of a sample message keys = [] keys.append(self._box.add(self._template % 0)) self.assertEqual(len(self._box), 1) keys.append(self._box.add(mailbox.Message(_sample_message))) self.assertEqual(len(self._box), 2) keys.append(self._box.add(email.message_from_string(_sample_message))) self.assertEqual(len(self._box), 3) keys.append(self._box.add(io.BytesIO(_bytes_sample_message))) self.assertEqual(len(self._box), 4) keys.append(self._box.add(_sample_message)) self.assertEqual(len(self._box), 5) keys.append(self._box.add(_bytes_sample_message)) self.assertEqual(len(self._box), 6) with self.assertWarns(DeprecationWarning): keys.append(self._box.add( io.TextIOWrapper(io.BytesIO(_bytes_sample_message)))) self.assertEqual(len(self._box), 7) self.assertEqual(self._box.get_string(keys[0]), self._template % 0) for i in (1, 2, 3, 4, 5, 6): self._check_sample(self._box[keys[i]]) _nonascii_msg = textwrap.dedent("""\ From: foo Subject: Falinaptár házhozszállítással. Már rendeltél? 0 """) def test_add_invalid_8bit_bytes_header(self): key = self._box.add(self._nonascii_msg.encode('latin-1')) self.assertEqual(len(self._box), 1) self.assertEqual(self._box.get_bytes(key), self._nonascii_msg.encode('latin-1')) def test_invalid_nonascii_header_as_string(self): subj = self._nonascii_msg.splitlines()[1] key = self._box.add(subj.encode('latin-1')) self.assertEqual(self._box.get_string(key), 'Subject: =?unknown-8bit?b?RmFsaW5hcHThciBo4Xpob3pzeuFsbO104XNz' 'YWwuIE3hciByZW5kZWx06Ww/?=\n\n') def test_add_nonascii_string_header_raises(self): with self.assertRaisesRegex(ValueError, "ASCII-only"): self._box.add(self._nonascii_msg) self._box.flush() self.assertEqual(len(self._box), 0) self.assertMailboxEmpty() def test_add_that_raises_leaves_mailbox_empty(self): def raiser(*args, **kw): raise Exception("a fake error") support.patch(self, email.generator.BytesGenerator, 'flatten', raiser) with self.assertRaises(Exception): self._box.add(email.message_from_string("From: Alphöso")) self.assertEqual(len(self._box), 0) self._box.close() self.assertMailboxEmpty() _non_latin_bin_msg = textwrap.dedent("""\ From: foo@bar.com To: báz Subject: Maintenant je vous présente mon collègue, le pouf célèbre \tJean de Baddie Mime-Version: 1.0 Content-Type: text/plain; charset="utf-8" Content-Transfer-Encoding: 8bit Да, они летят. """).encode('utf-8') def test_add_8bit_body(self): key = self._box.add(self._non_latin_bin_msg) self.assertEqual(self._box.get_bytes(key), self._non_latin_bin_msg) with self._box.get_file(key) as f: self.assertEqual(f.read(), self._non_latin_bin_msg.replace(b'\n', os.linesep.encode())) self.assertEqual(self._box[key].get_payload(), "Да, они летят.\n") def test_add_binary_file(self): with tempfile.TemporaryFile('wb+') as f: f.write(_bytes_sample_message) f.seek(0) key = self._box.add(f) self.assertEqual(self._box.get_bytes(key).split(b'\n'), _bytes_sample_message.split(b'\n')) def test_add_binary_nonascii_file(self): with tempfile.TemporaryFile('wb+') as f: f.write(self._non_latin_bin_msg) f.seek(0) key = self._box.add(f) self.assertEqual(self._box.get_bytes(key).split(b'\n'), self._non_latin_bin_msg.split(b'\n')) def test_add_text_file_warns(self): with tempfile.TemporaryFile('w+') as f: f.write(_sample_message) f.seek(0) with self.assertWarns(DeprecationWarning): key = self._box.add(f) self.assertEqual(self._box.get_bytes(key).split(b'\n'), _bytes_sample_message.split(b'\n')) def test_add_StringIO_warns(self): with self.assertWarns(DeprecationWarning): key = self._box.add(io.StringIO(self._template % "0")) self.assertEqual(self._box.get_string(key), self._template % "0") def test_add_nonascii_StringIO_raises(self): with self.assertWarns(DeprecationWarning): with self.assertRaisesRegex(ValueError, "ASCII-only"): self._box.add(io.StringIO(self._nonascii_msg)) self.assertEqual(len(self._box), 0) self._box.close() self.assertMailboxEmpty() def test_remove(self): # Remove messages using remove() self._test_remove_or_delitem(self._box.remove) def test_delitem(self): # Remove messages using __delitem__() self._test_remove_or_delitem(self._box.__delitem__) def _test_remove_or_delitem(self, method): # (Used by test_remove() and test_delitem().) key0 = self._box.add(self._template % 0) key1 = self._box.add(self._template % 1) self.assertEqual(len(self._box), 2) method(key0) self.assertEqual(len(self._box), 1) self.assertRaises(KeyError, lambda: self._box[key0]) self.assertRaises(KeyError, lambda: method(key0)) self.assertEqual(self._box.get_string(key1), self._template % 1) key2 = self._box.add(self._template % 2) self.assertEqual(len(self._box), 2) method(key2) self.assertEqual(len(self._box), 1) self.assertRaises(KeyError, lambda: self._box[key2]) self.assertRaises(KeyError, lambda: method(key2)) self.assertEqual(self._box.get_string(key1), self._template % 1) method(key1) self.assertEqual(len(self._box), 0) self.assertRaises(KeyError, lambda: self._box[key1]) self.assertRaises(KeyError, lambda: method(key1)) def test_discard(self, repetitions=10): # Discard messages key0 = self._box.add(self._template % 0) key1 = self._box.add(self._template % 1) self.assertEqual(len(self._box), 2) self._box.discard(key0) self.assertEqual(len(self._box), 1) self.assertRaises(KeyError, lambda: self._box[key0]) self._box.discard(key0) self.assertEqual(len(self._box), 1) self.assertRaises(KeyError, lambda: self._box[key0]) def test_get(self): # Retrieve messages using get() key0 = self._box.add(self._template % 0) msg = self._box.get(key0) self.assertEqual(msg['from'], 'foo') self.assertEqual(msg.get_payload(), '0\n') self.assertIsNone(self._box.get('foo')) self.assertIs(self._box.get('foo', False), False) self._box.close() self._box = self._factory(self._path) key1 = self._box.add(self._template % 1) msg = self._box.get(key1) self.assertEqual(msg['from'], 'foo') self.assertEqual(msg.get_payload(), '1\n') def test_getitem(self): # Retrieve message using __getitem__() key0 = self._box.add(self._template % 0) msg = self._box[key0] self.assertEqual(msg['from'], 'foo') self.assertEqual(msg.get_payload(), '0\n') self.assertRaises(KeyError, lambda: self._box['foo']) self._box.discard(key0) self.assertRaises(KeyError, lambda: self._box[key0]) def test_get_message(self): # Get Message representations of messages key0 = self._box.add(self._template % 0) key1 = self._box.add(_sample_message) msg0 = self._box.get_message(key0) self.assertIsInstance(msg0, mailbox.Message) self.assertEqual(msg0['from'], 'foo') self.assertEqual(msg0.get_payload(), '0\n') self._check_sample(self._box.get_message(key1)) def test_get_bytes(self): # Get bytes representations of messages key0 = self._box.add(self._template % 0) key1 = self._box.add(_sample_message) self.assertEqual(self._box.get_bytes(key0), (self._template % 0).encode('ascii')) self.assertEqual(self._box.get_bytes(key1), _bytes_sample_message) def test_get_string(self): # Get string representations of messages key0 = self._box.add(self._template % 0) key1 = self._box.add(_sample_message) self.assertEqual(self._box.get_string(key0), self._template % 0) self.assertEqual(self._box.get_string(key1).split('\n'), _sample_message.split('\n')) def test_get_file(self): # Get file representations of messages key0 = self._box.add(self._template % 0) key1 = self._box.add(_sample_message) with self._box.get_file(key0) as file: data0 = file.read() with self._box.get_file(key1) as file: data1 = file.read() self.assertEqual(data0.decode('ascii').replace(os.linesep, '\n'), self._template % 0) self.assertEqual(data1.decode('ascii').replace(os.linesep, '\n'), _sample_message) def test_get_file_can_be_closed_twice(self): # Issue 11700 key = self._box.add(_sample_message) f = self._box.get_file(key) f.close() f.close() def test_iterkeys(self): # Get keys using iterkeys() self._check_iteration(self._box.iterkeys, do_keys=True, do_values=False) def test_keys(self): # Get keys using keys() self._check_iteration(self._box.keys, do_keys=True, do_values=False) def test_itervalues(self): # Get values using itervalues() self._check_iteration(self._box.itervalues, do_keys=False, do_values=True) def test_iter(self): # Get values using __iter__() self._check_iteration(self._box.__iter__, do_keys=False, do_values=True) def test_values(self): # Get values using values() self._check_iteration(self._box.values, do_keys=False, do_values=True) def test_iteritems(self): # Get keys and values using iteritems() self._check_iteration(self._box.iteritems, do_keys=True, do_values=True) def test_items(self): # Get keys and values using items() self._check_iteration(self._box.items, do_keys=True, do_values=True) def _check_iteration(self, method, do_keys, do_values, repetitions=10): for value in method(): self.fail("Not empty") keys, values = [], [] for i in range(repetitions): keys.append(self._box.add(self._template % i)) values.append(self._template % i) if do_keys and not do_values: returned_keys = list(method()) elif do_values and not do_keys: returned_values = list(method()) else: returned_keys, returned_values = [], [] for key, value in method(): returned_keys.append(key) returned_values.append(value) if do_keys: self.assertEqual(len(keys), len(returned_keys)) self.assertEqual(set(keys), set(returned_keys)) if do_values: count = 0 for value in returned_values: self.assertEqual(value['from'], 'foo') self.assertLess(int(value.get_payload()), repetitions) count += 1 self.assertEqual(len(values), count) def test_contains(self): # Check existence of keys using __contains__() self.assertNotIn('foo', self._box) key0 = self._box.add(self._template % 0) self.assertIn(key0, self._box) self.assertNotIn('foo', self._box) key1 = self._box.add(self._template % 1) self.assertIn(key1, self._box) self.assertIn(key0, self._box) self.assertNotIn('foo', self._box) self._box.remove(key0) self.assertNotIn(key0, self._box) self.assertIn(key1, self._box) self.assertNotIn('foo', self._box) self._box.remove(key1) self.assertNotIn(key1, self._box) self.assertNotIn(key0, self._box) self.assertNotIn('foo', self._box) def test_len(self, repetitions=10): # Get message count keys = [] for i in range(repetitions): self.assertEqual(len(self._box), i) keys.append(self._box.add(self._template % i)) self.assertEqual(len(self._box), i + 1) for i in range(repetitions): self.assertEqual(len(self._box), repetitions - i) self._box.remove(keys[i]) self.assertEqual(len(self._box), repetitions - i - 1) def test_set_item(self): # Modify messages using __setitem__() key0 = self._box.add(self._template % 'original 0') self.assertEqual(self._box.get_string(key0), self._template % 'original 0') key1 = self._box.add(self._template % 'original 1') self.assertEqual(self._box.get_string(key1), self._template % 'original 1') self._box[key0] = self._template % 'changed 0' self.assertEqual(self._box.get_string(key0), self._template % 'changed 0') self._box[key1] = self._template % 'changed 1' self.assertEqual(self._box.get_string(key1), self._template % 'changed 1') self._box[key0] = _sample_message self._check_sample(self._box[key0]) self._box[key1] = self._box[key0] self._check_sample(self._box[key1]) self._box[key0] = self._template % 'original 0' self.assertEqual(self._box.get_string(key0), self._template % 'original 0') self._check_sample(self._box[key1]) self.assertRaises(KeyError, lambda: self._box.__setitem__('foo', 'bar')) self.assertRaises(KeyError, lambda: self._box['foo']) self.assertEqual(len(self._box), 2) def test_clear(self, iterations=10): # Remove all messages using clear() keys = [] for i in range(iterations): self._box.add(self._template % i) for i, key in enumerate(keys): self.assertEqual(self._box.get_string(key), self._template % i) self._box.clear() self.assertEqual(len(self._box), 0) for i, key in enumerate(keys): self.assertRaises(KeyError, lambda: self._box.get_string(key)) def test_pop(self): # Get and remove a message using pop() key0 = self._box.add(self._template % 0) self.assertIn(key0, self._box) key1 = self._box.add(self._template % 1) self.assertIn(key1, self._box) self.assertEqual(self._box.pop(key0).get_payload(), '0\n') self.assertNotIn(key0, self._box) self.assertIn(key1, self._box) key2 = self._box.add(self._template % 2) self.assertIn(key2, self._box) self.assertEqual(self._box.pop(key2).get_payload(), '2\n') self.assertNotIn(key2, self._box) self.assertIn(key1, self._box) self.assertEqual(self._box.pop(key1).get_payload(), '1\n') self.assertNotIn(key1, self._box) self.assertEqual(len(self._box), 0) def test_popitem(self, iterations=10): # Get and remove an arbitrary (key, message) using popitem() keys = [] for i in range(10): keys.append(self._box.add(self._template % i)) seen = [] for i in range(10): key, msg = self._box.popitem() self.assertIn(key, keys) self.assertNotIn(key, seen) seen.append(key) self.assertEqual(int(msg.get_payload()), keys.index(key)) self.assertEqual(len(self._box), 0) for key in keys: self.assertRaises(KeyError, lambda: self._box[key]) def test_update(self): # Modify multiple messages using update() key0 = self._box.add(self._template % 'original 0') key1 = self._box.add(self._template % 'original 1') key2 = self._box.add(self._template % 'original 2') self._box.update({key0: self._template % 'changed 0', key2: _sample_message}) self.assertEqual(len(self._box), 3) self.assertEqual(self._box.get_string(key0), self._template % 'changed 0') self.assertEqual(self._box.get_string(key1), self._template % 'original 1') self._check_sample(self._box[key2]) self._box.update([(key2, self._template % 'changed 2'), (key1, self._template % 'changed 1'), (key0, self._template % 'original 0')]) self.assertEqual(len(self._box), 3) self.assertEqual(self._box.get_string(key0), self._template % 'original 0') self.assertEqual(self._box.get_string(key1), self._template % 'changed 1') self.assertEqual(self._box.get_string(key2), self._template % 'changed 2') self.assertRaises(KeyError, lambda: self._box.update({'foo': 'bar', key0: self._template % "changed 0"})) self.assertEqual(len(self._box), 3) self.assertEqual(self._box.get_string(key0), self._template % "changed 0") self.assertEqual(self._box.get_string(key1), self._template % "changed 1") self.assertEqual(self._box.get_string(key2), self._template % "changed 2") def test_flush(self): # Write changes to disk self._test_flush_or_close(self._box.flush, True) def test_popitem_and_flush_twice(self): # See #15036. self._box.add(self._template % 0) self._box.add(self._template % 1) self._box.flush() self._box.popitem() self._box.flush() self._box.popitem() self._box.flush() def test_lock_unlock(self): # Lock and unlock the mailbox self.assertFalse(os.path.exists(self._get_lock_path())) self._box.lock() self.assertTrue(os.path.exists(self._get_lock_path())) self._box.unlock() self.assertFalse(os.path.exists(self._get_lock_path())) def test_close(self): # Close mailbox and flush changes to disk self._test_flush_or_close(self._box.close, False) def _test_flush_or_close(self, method, should_call_close): contents = [self._template % i for i in range(3)] self._box.add(contents[0]) self._box.add(contents[1]) self._box.add(contents[2]) oldbox = self._box method() if should_call_close: self._box.close() self._box = self._factory(self._path) keys = self._box.keys() self.assertEqual(len(keys), 3) for key in keys: self.assertIn(self._box.get_string(key), contents) oldbox.close() def test_dump_message(self): # Write message representations to disk for input in (email.message_from_string(_sample_message), _sample_message, io.BytesIO(_bytes_sample_message)): output = io.BytesIO() self._box._dump_message(input, output) self.assertEqual(output.getvalue(), _bytes_sample_message.replace(b'\n', os.linesep.encode())) output = io.BytesIO() self.assertRaises(TypeError, lambda: self._box._dump_message(None, output)) def _get_lock_path(self): # Return the path of the dot lock file. May be overridden. return self._path + '.lock' class TestMailboxSuperclass(TestBase, unittest.TestCase): def test_notimplemented(self): # Test that all Mailbox methods raise NotImplementedException. box = mailbox.Mailbox('path') self.assertRaises(NotImplementedError, lambda: box.add('')) self.assertRaises(NotImplementedError, lambda: box.remove('')) self.assertRaises(NotImplementedError, lambda: box.__delitem__('')) self.assertRaises(NotImplementedError, lambda: box.discard('')) self.assertRaises(NotImplementedError, lambda: box.__setitem__('', '')) self.assertRaises(NotImplementedError, lambda: box.iterkeys()) self.assertRaises(NotImplementedError, lambda: box.keys()) self.assertRaises(NotImplementedError, lambda: box.itervalues().__next__()) self.assertRaises(NotImplementedError, lambda: box.__iter__().__next__()) self.assertRaises(NotImplementedError, lambda: box.values()) self.assertRaises(NotImplementedError, lambda: box.iteritems().__next__()) self.assertRaises(NotImplementedError, lambda: box.items()) self.assertRaises(NotImplementedError, lambda: box.get('')) self.assertRaises(NotImplementedError, lambda: box.__getitem__('')) self.assertRaises(NotImplementedError, lambda: box.get_message('')) self.assertRaises(NotImplementedError, lambda: box.get_string('')) self.assertRaises(NotImplementedError, lambda: box.get_bytes('')) self.assertRaises(NotImplementedError, lambda: box.get_file('')) self.assertRaises(NotImplementedError, lambda: '' in box) self.assertRaises(NotImplementedError, lambda: box.__contains__('')) self.assertRaises(NotImplementedError, lambda: box.__len__()) self.assertRaises(NotImplementedError, lambda: box.clear()) self.assertRaises(NotImplementedError, lambda: box.pop('')) self.assertRaises(NotImplementedError, lambda: box.popitem()) self.assertRaises(NotImplementedError, lambda: box.update((('', ''),))) self.assertRaises(NotImplementedError, lambda: box.flush()) self.assertRaises(NotImplementedError, lambda: box.lock()) self.assertRaises(NotImplementedError, lambda: box.unlock()) self.assertRaises(NotImplementedError, lambda: box.close()) class TestMaildir(TestMailbox, unittest.TestCase): _factory = lambda self, path, factory=None: mailbox.Maildir(path, factory) def setUp(self): TestMailbox.setUp(self) if (os.name == 'nt') or (sys.platform == 'cygwin'): self._box.colon = '!' def assertMailboxEmpty(self): self.assertEqual(os.listdir(os.path.join(self._path, 'tmp')), []) def test_add_MM(self): # Add a MaildirMessage instance msg = mailbox.MaildirMessage(self._template % 0) msg.set_subdir('cur') msg.set_info('foo') key = self._box.add(msg) self.assertTrue(os.path.exists(os.path.join(self._path, 'cur', '%s%sfoo' % (key, self._box.colon)))) def test_get_MM(self): # Get a MaildirMessage instance msg = mailbox.MaildirMessage(self._template % 0) msg.set_subdir('cur') msg.set_flags('RF') key = self._box.add(msg) msg_returned = self._box.get_message(key) self.assertIsInstance(msg_returned, mailbox.MaildirMessage) self.assertEqual(msg_returned.get_subdir(), 'cur') self.assertEqual(msg_returned.get_flags(), 'FR') def test_set_MM(self): # Set with a MaildirMessage instance msg0 = mailbox.MaildirMessage(self._template % 0) msg0.set_flags('TP') key = self._box.add(msg0) msg_returned = self._box.get_message(key) self.assertEqual(msg_returned.get_subdir(), 'new') self.assertEqual(msg_returned.get_flags(), 'PT') msg1 = mailbox.MaildirMessage(self._template % 1) self._box[key] = msg1 msg_returned = self._box.get_message(key) self.assertEqual(msg_returned.get_subdir(), 'new') self.assertEqual(msg_returned.get_flags(), '') self.assertEqual(msg_returned.get_payload(), '1\n') msg2 = mailbox.MaildirMessage(self._template % 2) msg2.set_info('2,S') self._box[key] = msg2 self._box[key] = self._template % 3 msg_returned = self._box.get_message(key) self.assertEqual(msg_returned.get_subdir(), 'new') self.assertEqual(msg_returned.get_flags(), 'S') self.assertEqual(msg_returned.get_payload(), '3\n') def test_consistent_factory(self): # Add a message. msg = mailbox.MaildirMessage(self._template % 0) msg.set_subdir('cur') msg.set_flags('RF') key = self._box.add(msg) # Create new mailbox with class FakeMessage(mailbox.MaildirMessage): pass box = mailbox.Maildir(self._path, factory=FakeMessage) box.colon = self._box.colon msg2 = box.get_message(key) self.assertIsInstance(msg2, FakeMessage) def test_initialize_new(self): # Initialize a non-existent mailbox self.tearDown() self._box = mailbox.Maildir(self._path) self._check_basics() self._delete_recursively(self._path) self._box = self._factory(self._path, factory=None) self._check_basics() def test_initialize_existing(self): # Initialize an existing mailbox self.tearDown() for subdir in '', 'tmp', 'new', 'cur': os.mkdir(os.path.normpath(os.path.join(self._path, subdir))) self._box = mailbox.Maildir(self._path) self._check_basics() def _check_basics(self, factory=None): # (Used by test_open_new() and test_open_existing().) self.assertEqual(self._box._path, os.path.abspath(self._path)) self.assertEqual(self._box._factory, factory) for subdir in '', 'tmp', 'new', 'cur': path = os.path.join(self._path, subdir) mode = os.stat(path)[stat.ST_MODE] self.assertTrue(stat.S_ISDIR(mode), "Not a directory: '%s'" % path) def test_list_folders(self): # List folders self._box.add_folder('one') self._box.add_folder('two') self._box.add_folder('three') self.assertEqual(len(self._box.list_folders()), 3) self.assertEqual(set(self._box.list_folders()), set(('one', 'two', 'three'))) def test_get_folder(self): # Open folders self._box.add_folder('foo.bar') folder0 = self._box.get_folder('foo.bar') folder0.add(self._template % 'bar') self.assertTrue(os.path.isdir(os.path.join(self._path, '.foo.bar'))) folder1 = self._box.get_folder('foo.bar') self.assertEqual(folder1.get_string(folder1.keys()[0]), self._template % 'bar') def test_add_and_remove_folders(self): # Delete folders self._box.add_folder('one') self._box.add_folder('two') self.assertEqual(len(self._box.list_folders()), 2) self.assertEqual(set(self._box.list_folders()), set(('one', 'two'))) self._box.remove_folder('one') self.assertEqual(len(self._box.list_folders()), 1) self.assertEqual(set(self._box.list_folders()), set(('two',))) self._box.add_folder('three') self.assertEqual(len(self._box.list_folders()), 2) self.assertEqual(set(self._box.list_folders()), set(('two', 'three'))) self._box.remove_folder('three') self.assertEqual(len(self._box.list_folders()), 1) self.assertEqual(set(self._box.list_folders()), set(('two',))) self._box.remove_folder('two') self.assertEqual(len(self._box.list_folders()), 0) self.assertEqual(self._box.list_folders(), []) def test_clean(self): # Remove old files from 'tmp' foo_path = os.path.join(self._path, 'tmp', 'foo') bar_path = os.path.join(self._path, 'tmp', 'bar') with open(foo_path, 'w') as f: f.write("@") with open(bar_path, 'w') as f: f.write("@") self._box.clean() self.assertTrue(os.path.exists(foo_path)) self.assertTrue(os.path.exists(bar_path)) foo_stat = os.stat(foo_path) os.utime(foo_path, (time.time() - 129600 - 2, foo_stat.st_mtime)) self._box.clean() self.assertFalse(os.path.exists(foo_path)) self.assertTrue(os.path.exists(bar_path)) def test_create_tmp(self, repetitions=10): # Create files in tmp directory hostname = socket.gethostname() if '/' in hostname: hostname = hostname.replace('/', r'\057') if ':' in hostname: hostname = hostname.replace(':', r'\072') pid = os.getpid() pattern = re.compile(r"(?P